# Vehicle injector
code modified after call with Idneo

In [1]:
import os

os.chdir("..")

import argparse
import time
import redis
from redistimeseries.client import Client
from tools.vehicle.src.csv_parser import proccess_csv
from tools.vehicle.src.data_handler import order_by_ts_relative
import pandas as pd

# REDIS_PASSWORD = "Success6G&Idneo"
REDIS_PORT = 6379

## Install and start the redis database (`redis-stack-server` as it includes redis-timeseries)

```
# Install Redis Stack
curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/redis.list
sudo apt-get update
sudo apt-get install redis-stack-server

# Start Redis Stack
redis-stack-server
```

## Code from `tools/vehicle/vehicle_injector.py` 

In [None]:
parser = argparse.ArgumentParser(description='Process a CSV dataset file')
parser.add_argument('-f', '--file', required=True, help='Path to CSV file')

# arguments... (file dataset)
args = parser.parse_args()

# Pre-process csv dataset
data_ordered_by_sensor = proccess_csv(args.file)

# Timestamp sorting
data_ordered_by_time = order_by_ts_relative(data_ordered_by_sensor)

# Redis client connectivity
redisClient = redis.Redis(host="127.0.0.1", password=REDIS_PASSWORD, port=REDIS_PORT)
database = redisClient.ts()


# creating Sensors in database if dont exist
for sensor in data_ordered_by_sensor:

    try:
        database.info(sensor["sensor"])
    except redis.exceptions.ConnectionError as connection_error:
        raise SystemExit(connection_error)
    except redis.exceptions.RedisError as e:
        database.create(sensor["sensor"], labels={"type": "can_bus"})

timestamp_start = time.time()
relative_timestamp_start = 0

# iteration of dataset
for data_sample in data_ordered_by_time:
    # delta from dataset
    delta_timestamp = float(list(data_sample.keys())[0])
    # delta of data injection
    delay_of_next_data = round(delta_timestamp - relative_timestamp_start, 2)
    time.sleep(delay_of_next_data)

    relative_timestamp_start = delta_timestamp
    timestamp_epoch_ms = round(timestamp_start + delta_timestamp, 3)
    injection_timestamp = int(timestamp_epoch_ms * 1e3)

    info = data_sample[delta_timestamp]
    print(timestamp_epoch_ms, "Inserting", info['sensor'], info['value'], )
    database.add(info['sensor'], injection_timestamp, info['value'])

## Proposed approach

In [2]:
data_sets = [
    "DS1_stopped_with_ignition_on_22Feb24_115812.csv",
    "DS1_stopped_with_ignition_on_25Jan24_124019.csv",
    "DS1_stopped_with_ignition_on_25Jan24_151531.csv",
    "DS1_stopped_with_ignition_on_25Mar24_153740.CSV",
    "DS2_national_road_90km_h_max_25Jan24_153019.csv",
    "DS2_national_road_90km_h_max_25Mar24_133516.CSV",
    "DS3_highway_120km_h_max_22Feb24_121145.csv",
    "DS3_highway_120km_h_max_25Mar24_154857.csv",
]
file = "tools/vehicle/datasets/ateca_R4_2.0l_TDI/" + data_sets[0]

# Pre-process csv dataset
data_ordered_by_sensor = proccess_csv(file)

# Timestamp sorting
data_ordered_by_time = order_by_ts_relative(data_ordered_by_sensor)

['Vehicle speed', 'Time since engine start', 'Normed load value', 'Accelerator pedal position', 'Engine torque', 'Oil fill level', 'Engine oil temperature', 'Fuel level', 'Fuel consumption', 'Brake pressure', 'Engaged gear: raw signal-Bits 0-7', 'Efficiency of the SCR catalytic converter']
Dataset has begun after  0 seconds...


In [3]:
data_ordered_by_time[:2]

[{0.14: {'sensor': 'Efficiency_of_the_SCR_catalytic_converter', 'value': 0.0}},
 {0.36: {'sensor': 'Time_since_engine_start', 'value': 0.0}}]

In [4]:
def batch_insert_timeseries(rts_client, data, batch_size=10):
    pipe = rts_client.redis.pipeline()
    
    for i, item in enumerate(data, 1):
        sensor_id = list(item.values())[0]["sensor"]
        # * 1000 to convert to ms
        timestamp = int(time.time()) - 1800 + int(list(item.keys())[0] * 1000)
        value = list(item.values())[0]["value"]
        
        # Create the time series if it doesn't exist
        try:
            rts_client.create(sensor_id)
        except:
            pass  # Time series already exists
        
        # Add to pipeline
        pipe.execute_command('TS.ADD', sensor_id, timestamp, value)
        
        # Execute batch if batch size reached
        if i % batch_size == 0:
            pipe.execute()
            print(f"Inserted {i} data points")
    
    # Execute any remaining commands in the pipeline
    if i % batch_size != 0:
        pipe.execute()
    
    print(f"Total inserted: {i} data points")


def get_all_timeseries_keys(redis_client):
    keys = []
    cursor = 0
    while True:
        cursor, batch = redis_client.scan(cursor, match='*', count=1000)
        for key in batch:
            # Check if the key is a time series
            if redis_client.type(key) == b'TSDB-TYPE':
                keys.append(key.decode('utf-8'))
        if cursor == 0:
            break
    return keys


def transform_sensor_data(sensor_dict, window_size='1H'):
    # Create a dictionary to hold DataFrames for each sensor
    sensor_dfs = {}
    
    for sensor_id, data in sensor_dict.items():
        # Convert data to DataFrame
        df = pd.DataFrame(data, columns=['timestamp', 'value'])
        df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
        df.set_index('timestamp', inplace=True)
        df = df.sort_index()  # Ensure timestamps are sorted
        
        # Resample to the specified window size and calculate mean
        sensor_dfs[sensor_id] = df.resample(window_size).mean()
    
    # Combine all sensor DataFrames
    combined_df = pd.concat(sensor_dfs, axis=1)
    
    # Rename columns to include sensor IDs
    combined_df.columns = [f"{sensor}_{col}" for sensor, col in combined_df.columns]
    
    return combined_df

In [5]:
redisClient = redis.Redis(host="127.0.0.1", port=REDIS_PORT)
rts = Client(redisClient)
redisClient.ping()

True

In [10]:
# Get all time series keys
sensors = get_all_timeseries_keys(redisClient)

In [22]:
# Execute batch insert
batch_insert_timeseries(rts, data_ordered_by_time)

# Query example
end_time = int(time.time())
start_time = end_time - 3600  # Last hour


results = {}
for sensor in sensors:
    result = rts.range(sensor, start_time, end_time)
    results[sensor] = result
    # print(f"Data for {sensor}:")
    # print(result)

Inserted 10 data points
Inserted 20 data points
Inserted 30 data points
Inserted 40 data points
Inserted 50 data points
Inserted 60 data points
Inserted 70 data points
Inserted 80 data points
Inserted 90 data points
Inserted 100 data points
Inserted 110 data points
Inserted 120 data points
Inserted 130 data points
Inserted 140 data points
Inserted 150 data points
Inserted 160 data points
Inserted 170 data points
Inserted 180 data points
Inserted 190 data points
Inserted 200 data points
Inserted 210 data points
Inserted 220 data points
Inserted 230 data points
Inserted 240 data points
Inserted 250 data points
Inserted 260 data points
Inserted 270 data points
Inserted 280 data points
Inserted 290 data points
Inserted 300 data points
Inserted 310 data points
Inserted 320 data points
Inserted 330 data points
Inserted 340 data points
Inserted 350 data points
Inserted 360 data points
Inserted 370 data points
Inserted 380 data points
Inserted 390 data points
Inserted 400 data points
Inserted 

In [43]:
results_pd = transform_sensor_data(results, window_size='5min')
results_pd.head()

Unnamed: 0_level_0,Normed_load_value_value,Engine_torque_value,Accelerator_pedal_position_value,Fuel_level_value,Oil_fill_level_value,Engaged_gear:_raw_signal-Bits_0-7_value,Brake_pressure_value,Engine_oil_temperature_value,Time_since_engine_start_value,Efficiency_of_the_SCR_catalytic_converter_value,Vehicle_speed_value,Fuel_consumption_value
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
2024-10-16 12:20:00,,,,,,,,,0.0,0.0,,
2024-10-16 12:25:00,23.1,,14.5,,,,,,,0.0,,
2024-10-16 12:30:00,23.1,48.4,14.5,,75.024,,,13.0,0.0,0.0,,
2024-10-16 12:35:00,23.1,48.4,14.5,24.0,75.024,,,,0.0,0.0,,1.15
2024-10-16 12:40:00,23.1,48.4,14.5,24.0,75.024,0.0,3.63,13.0,0.0,,,
