# CAN Bus Data Processing

This notebook processes a CAN bus log file (`full_data_capture.log`) to generate a dataset optimized for predictive maintenance and attack detection (DoS, Fuzzing, Suspension) using XGBoost. It performs the following steps:

1. **Import Libraries**: Load required Python libraries.
2. **Define Functions**: Define helper functions for parsing, attack injection, feature computation, and CSV output.
3. **Process Data**: Read the log file, inject distinct attack patterns, compute enhanced features, label messages, and save to CSV.

**Input File**: `C:\Users\pc\OneDrive\Bureau\VS_code_Projects\MLproject_Predictive_Maintenance_for_Vehicles_Using_CAN_Bus_Data\dataSet\raw\full_data_capture.log`

**Output File**: `C:\Users\pc\OneDrive\Bureau\VS_code_Projects\MLproject_Predictive_Maintenance_for_Vehicles_Using_CAN_Bus_Data\dataSet\processed\generated.csv`


## Step 1: Import Libraries

Import libraries for file handling, data processing, feature computation, and progress tracking.

In [1]:
import csv
from pathlib import Path
import sys
import os
import pandas as pd
import numpy as np
from scipy.stats import entropy
from tqdm import tqdm
import random
import string

## Step 2: Define Functions

Define helper functions for:
- Converting the CAN log to CSV.
- Computing features (`CAN_ID_Inter_Arrival`, `CAN_ID_Window_Count`, `Payload_Entropy`, etc.).
- Injecting attacks (DoS, Fuzzing, Suspension) with distinct patterns.
- Labeling messages.

In [2]:
def generate_random_payload(length=16):
    """Generate a random hexadecimal payload of specified length."""
    return ''.join(random.choice(string.hexdigits.upper()) for _ in range(length))

def convert_can_log_to_csv(input_file_path, output_file_path):
    # Verify input file exists
    if not Path(input_file_path).is_file():
        print(f"Error: Input file '{input_file_path}' does not exist.")
        sys.exit(1)

    # Ensure output directory exists
    output_dir = Path(output_file_path).parent
    try:
        output_dir.mkdir(parents=True, exist_ok=True)
    except Exception as e:
        print(f"Error creating output directory '{output_dir}': {e}")
        sys.exit(1)

    # Step 1: Read messages with progress
    messages = []
    total_lines = sum(1 for _ in open(input_file_path, 'r', encoding='utf-8'))
    print(f"Reading input file with {total_lines} lines...")
    with open(input_file_path, 'r', encoding='utf-8') as log_file:
        for line in tqdm(log_file, total=total_lines, desc="Parsing lines"):
            line = line.strip()
            if not line or not line.startswith('('):
                continue
            try:
                timestamp_end = line.find(')')
                timestamp = float(line[1:timestamp_end])
                remaining = line[timestamp_end+1:].strip()
                parts = remaining.split(maxsplit=1)
                if len(parts) == 2:
                    interface = parts[0]
                    can_data = parts[1]
                    if '#' in can_data:
                        can_id, payload = can_data.split('#', 1)
                        messages.append({
                            'Timestamp': timestamp,
                            'Interface': interface,
                            'CAN_ID': can_id,
                            'Payload': payload
                        })
            except:
                continue
    if not messages:
        print("No valid messages found in the log file.")
        sys.exit(1)

    # Create DataFrame
    df = pd.DataFrame(messages)
    print(f"Loaded {len(df)} messages into DataFrame.")

    # Debug: Check 2C6 messages
    c2c6_count = len(df[df['CAN_ID'] == '2C6'])
    print(f"Debug: Total CAN_ID=2C6 messages: {c2c6_count}")
    if c2c6_count < 2222:
        print(f"Warning: Only {c2c6_count} CAN_ID=2C6 messages found. Suspension attack may be limited.")

    # Step 2: Inject DoS attack
    dos_start = 1508687520.000000
    dos_end = 1508687529.999750
    dos_interval = 0.00025  # 4 packets/ms
    dos_timestamps = np.arange(dos_start, dos_end + dos_interval, dos_interval)[:40001]
    dos_df = pd.DataFrame({
        'Timestamp': dos_timestamps,
        'Interface': 'slcan0',
        'CAN_ID': '000',
        'Payload': '0000000000000000'
    })
    df = df[~df['Timestamp'].between(dos_start, dos_end)]
    df = pd.concat([df, dos_df], ignore_index=True)
    print(f"Injected DoS attack: {len(dos_df)} messages.")

    # Step 3: Inject Fuzzing attack
    fuzzing_start = 1508687510.000000
    fuzzing_end = 1508687515.999500
    fuzzing_messages = df[(df['CAN_ID'] == '18A') & (df['Timestamp'].between(fuzzing_start, fuzzing_end))].head(2222)
    if len(fuzzing_messages) < 2222:
        print(f"Warning: Only {len(fuzzing_messages)} messages available for CAN_ID=18A for fuzzing.")
        # Supplement with synthetic fuzzing messages if needed
        additional_needed = 2222 - len(fuzzing_messages)
        fuzzing_timestamps = np.linspace(fuzzing_start, fuzzing_end, additional_needed)
        additional_fuzzing = pd.DataFrame({
            'Timestamp': fuzzing_timestamps,
            'Interface': 'slcan0',
            'CAN_ID': '18A',
            'Payload': [generate_random_payload() for _ in range(additional_needed)]
        })
        df = pd.concat([df, additional_fuzzing], ignore_index=True)
        print(f"Added {additional_needed} synthetic fuzzing messages.")
    else:
        fuzzing_indices = fuzzing_messages.index
        df.loc[fuzzing_indices, 'Payload'] = [generate_random_payload() for _ in fuzzing_indices]
    print(f"Injected Fuzzing attack: {2222} messages.")

    # Step 4: Inject Suspension attack
    suspension_start = 1508687486.000000
    suspension_end = 1508687506.000000  # Extended to 20s
    c2c6_deleted = len(df[(df['CAN_ID'] == '2C6') & (df['Timestamp'].between(suspension_start, suspension_end))])
    print(f"Debug: Deleted {c2c6_deleted} CAN_ID=2C6 messages in Suspension period.")
    df = df[~((df['CAN_ID'] == '2C6') & (df['Timestamp'].between(suspension_start, suspension_end)))]
    print(f"After Suspension attack: {len(df)} messages remain.")

    # Step 5: Compute parameters
    df = df.sort_values('Timestamp').reset_index(drop=True)

    # CAN_ID_Inter_Arrival
    df['CAN_ID_Inter_Arrival'] = df.groupby('CAN_ID')['Timestamp'].diff().fillna(0.010)
    print("Computed CAN_ID_Inter_Arrival.")

    # CAN_ID_Window_Count
    def compute_window_count(df, window_size=5.0):
        print(f"Computing CAN_ID_Window_Count for {len(df)} messages...")
        df = df.copy()
        df['Timestamp'] = pd.to_datetime(df['Timestamp'], unit='s')
        window_counts = []
        for can_id in tqdm(df['CAN_ID'].unique(), desc="Processing CAN_IDs"):
            can_id_df = df[df['CAN_ID'] == can_id][['Timestamp']].copy()
            can_id_df['Dummy'] = 1
            can_id_df.set_index('Timestamp', inplace=True)
            can_id_df['CAN_ID_Window_Count'] = (
                can_id_df['Dummy']
                .rolling(window=f'{window_size}s', closed='both')
                .count()
                .astype(int)
            )
            can_id_df.reset_index(inplace=True)
            can_id_df['CAN_ID'] = can_id
            window_counts.append(can_id_df[['Timestamp', 'CAN_ID', 'CAN_ID_Window_Count']])
        result = pd.concat(window_counts)
        result['Timestamp'] = result['Timestamp'].astype(int) / 10**9
        return result

    window_counts = compute_window_count(df)
    df = df.merge(window_counts, on=['Timestamp', 'CAN_ID'], how='left')
    df['CAN_ID_Window_Count'] = df['CAN_ID_Window_Count'].fillna(0).astype(int)
    print(f"Debug: NaN in CAN_ID_Window_Count: {df['CAN_ID_Window_Count'].isna().sum()}")
    print("Computed CAN_ID_Window_Count.")

    # Payload_Entropy
    def compute_payload_entropy(payload):
        if not payload:
            return 0.0
        bytes_array = [int(payload[i:i+2], 16) for i in range(0, len(payload), 2)]
        value_counts = pd.Series(bytes_array).value_counts()
        probs = value_counts / len(bytes_array)
        return entropy(probs, base=2)

    df['Payload_Entropy'] = df['Payload'].apply(compute_payload_entropy)
    print("Computed Payload_Entropy.")

    # Payload_Decimal
    def compute_payload_decimal(payload):
        try:
            return int(payload, 16)
        except ValueError:
            return 0

    df['Payload_Decimal'] = df['Payload'].apply(compute_payload_decimal)
    print("Computed Payload_Decimal.")

    # Norm_Payload_Decimal
    max_decimal = 2**64 - 1
    df['Norm_Payload_Decimal'] = df['Payload_Decimal'] / max_decimal
    print("Computed Norm_Payload_Decimal.")

    # Norm_Payload_Entropy
    max_entropy = 8.0
    df['Norm_Payload_Entropy'] = df['Payload_Entropy'] / max_entropy
    print("Computed Norm_Payload_Entropy.")

    # Suspension_Indicator
    def compute_suspension_indicator(df, threshold=2.0, window=0.5):
        print(f"Computing Suspension_Indicator for {len(df)} messages...")
        df = df.sort_values('Timestamp')
        indicators = np.zeros(len(df))
        total_can_ids = len(df['CAN_ID'].unique())
        timestamps = df['Timestamp'].to_numpy()
        inter_arrivals = df['CAN_ID_Inter_Arrival'].to_numpy()
        can_ids = df['CAN_ID'].to_numpy()
        for i in tqdm(range(len(df)), desc="Processing messages"):
            timestamp = timestamps[i]
            mask = (timestamps >= timestamp - window) & (timestamps <= timestamp + window) & (inter_arrivals > threshold)
            affected_can_ids = len(np.unique(can_ids[mask]))
            indicators[i] = affected_can_ids / total_can_ids if total_can_ids > 0 else 0.0
        return indicators

    df['Suspension_Indicator'] = compute_suspension_indicator(df)
    print("Computed Suspension_Indicator.")

    # Debug: Check 2C6 messages post-Suspension
    c2c6_post_suspension = df[(df['CAN_ID'] == '2C6') & (df['Timestamp'] >= suspension_end)]
    print(f"Debug: CAN_ID=2C6 messages after suspension_end ({suspension_end}): {len(c2c6_post_suspension)}")
    if not c2c6_post_suspension.empty:
        print("Debug: First 5 CAN_ID=2C6 messages post-Suspension:")
        print(c2c6_post_suspension[['Timestamp', 'CAN_ID_Inter_Arrival', 'Suspension_Indicator', 'Payload_Entropy']].head())

    # Compute normal statistics (excluding all attack periods)
    normal_df = df[
        ~(
            (df['Timestamp'].between(dos_start, dos_end) & (df['CAN_ID'] == '000')) |  # DoS
            (df['Timestamp'].between(fuzzing_start, fuzzing_end) & (df['CAN_ID'] == '18A')) |  # Fuzzing
            (df['CAN_ID'] == '2C6') & (df['Timestamp'] >= suspension_end)  # Suspension
        )
    ]
    normal_stats = normal_df.groupby('CAN_ID').agg({
        'CAN_ID_Inter_Arrival': 'mean',
        'CAN_ID_Window_Count': 'mean'
    }).reset_index()
    normal_stats.columns = ['CAN_ID', 'Mean_Inter_Arrival', 'Mean_Window_Count']
    print("Computed normal statistics.")

    # Normalize Inter_Arrival and Window_Count
    def normalize_features(row, stats_df):
        can_id = row['CAN_ID']
        stats = stats_df[stats_df['CAN_ID'] == can_id]
        if not stats.empty:
            norm_inter_arrival = row['CAN_ID_Inter_Arrival'] / stats['Mean_Inter_Arrival'].iloc[0] if stats['Mean_Inter_Arrival'].iloc[0] != 0 else 1.0
            norm_window_count = row['CAN_ID_Window_Count'] / stats['Mean_Window_Count'].iloc[0] if stats['Mean_Window_Count'].iloc[0] != 0 else 1.0
        else:
            norm_inter_arrival = 1.0
            norm_window_count = 1.0
        return pd.Series({
            'Norm_Inter_Arrival': norm_inter_arrival,
            'Norm_Window_Count': norm_window_count
        })

    print(f"Normalizing features for {len(df)} messages...")
    normalized_features = df.apply(normalize_features, axis=1, args=(normal_stats,))
    df = pd.concat([df, normalized_features], axis=1)
    print("Computed normalized features.")

    # Label attacks
    df['Label'] = 0  # Normal
    df.loc[df['Timestamp'].between(dos_start, dos_end) & (df['CAN_ID'] == '000'), 'Label'] = 1  # DoS
    df.loc[df['Timestamp'].between(fuzzing_start, fuzzing_end) & (df['CAN_ID'] == '18A'), 'Label'] = 2  # Fuzzing
    suspension_candidates = df[(df['CAN_ID'] == '2C6') & (df['Timestamp'] >= suspension_end)].head(2222)
    if len(suspension_candidates) < 2222:
        print(f"Warning: Only {len(suspension_candidates)} CAN_ID=2C6 messages available post-suspension.")
        # Supplement with synthetic messages if needed
        additional_needed = 2222 - len(suspension_candidates)
        last_timestamp = suspension_candidates['Timestamp'].max() if not suspension_candidates.empty else suspension_end
        synthetic_timestamps = np.linspace(last_timestamp + 0.001, last_timestamp + additional_needed * 0.010, additional_needed)
        synthetic_suspension = pd.DataFrame({
            'Timestamp': synthetic_timestamps,
            'Interface': 'slcan0',
            'CAN_ID': '2C6',
            'Payload': ['0000000000000000' for _ in range(additional_needed)],
            'CAN_ID_Inter_Arrival': [10.0 for _ in range(additional_needed)],  # Simulate large inter-arrival
            'CAN_ID_Window_Count': [1 for _ in range(additional_needed)],
            'Payload_Entropy': [0.0 for _ in range(additional_needed)],
            'Payload_Decimal': [0 for _ in range(additional_needed)],
            'Norm_Payload_Decimal': [0.0 for _ in range(additional_needed)],
            'Norm_Payload_Entropy': [0.0 for _ in range(additional_needed)],
            'Suspension_Indicator': [1.0 for _ in range(additional_needed)],
            'Norm_Inter_Arrival': [100.0 for _ in range(additional_needed)],  # High normalized value
            'Norm_Window_Count': [0.1 for _ in range(additional_needed)],
            'Label': [3 for _ in range(additional_needed)]
        })
        df = pd.concat([df, synthetic_suspension], ignore_index=True)
        print(f"Added {additional_needed} synthetic suspension messages.")
    else:
        df.loc[suspension_candidates.index, 'Label'] = 3  # Suspension
    print(f"Labeled {2222} Suspension messages.")

    # Debug: Feature distributions by label
    print("\nFeature Distributions by Label:")
    for label in range(4):
        label_df = df[df['Label'] == label]
        print(f"Label {label} ({['Normal', 'DoS', 'Fuzzing', 'Suspension'][label]}):")
        print(label_df[['CAN_ID_Inter_Arrival', 'CAN_ID_Window_Count', 'Payload_Entropy', 'Suspension_Indicator']].describe())

    # Step 6: Write to CSV with progress
    print(f"Writing {len(df)} rows to CSV...")
    try:
        if Path(output_file_path).exists():
            try:
                with open(output_file_path, 'a') as test_file:
                    pass
            except PermissionError:
                print(f"Warning: Output file '{output_file_path}' may be read-only.")
                print("Suggestions:")
                print("- Close applications using the file (e.g., Excel).")
                print("- Pause OneDrive sync temporarily.")
                print("- Delete or rename the existing file.")
                print("- Check file permissions (Properties > Security).")
                print("- Use an alternative output path (e.g., 'C:\\Temp').")

        with open(output_file_path, 'w', newline='', encoding='utf-8') as csv_file:
            writer = csv.writer(csv_file, quoting=csv.QUOTE_MINIMAL, escapechar='\\')
            writer.writerow([
                'Timestamp', 'Interface', 'CAN_ID', 'Payload', 'CAN_ID_Inter_Arrival',
                'CAN_ID_Window_Count', 'Payload_Entropy', 'Norm_Inter_Arrival',
                'Norm_Window_Count', 'Norm_Payload_Entropy', 'Norm_Payload_Decimal',
                'Suspension_Indicator', 'Label'
            ])
            for idx, row in tqdm(df.iterrows(), total=len(df), desc="Writing CSV rows"):
                writer.writerow([
                    row['Timestamp'],
                    f'"{row["Interface"]}"',
                    f'"{row["CAN_ID"]}"',
                    f'"{row["Payload"]}"',
                    f"{row['CAN_ID_Inter_Arrival']:.5f}",
                    int(row['CAN_ID_Window_Count']),
                    f"{row['Payload_Entropy']:.5f}",
                    f"{row['Norm_Inter_Arrival']:.5f}",
                    f"{row['Norm_Window_Count']:.5f}",
                    f"{row['Norm_Payload_Entropy']:.5f}",
                    f"{row['Norm_Payload_Decimal']:.10f}",
                    f"{row['Suspension_Indicator']:.5f}",
                    int(row['Label'])
                ])
        print(f"✅ Conversion complete! CSV saved to: {output_file_path}")
        if os.access(output_file_path, os.W_OK):
            print("File is writable.")
        else:
            print("Warning: File is read-only. Check permissions or OneDrive sync.")
    except PermissionError as e:
        print(f"PermissionError: Cannot write to '{output_file_path}'. {e}")
        print("Suggestions:")
        print("- Ensure the output file is not open in another application.")
        print("- Check write permissions for the directory.")
        print("- Run the script as Administrator.")
        print("- Pause OneDrive sync.")
        print("- Use a different output path (e.g., 'C:\\Temp\\can_bus_combined.csv').")
        sys.exit(1)
    except Exception as e:
        print(f"Error writing output file: {e}")
        sys.exit(1)

    # Print summary
    print("\nLabel Counts:")
    print(df['Label'].value_counts())
    print("\nColumns:")
    print(df.columns.tolist())
    print("\nSample Rows:")
    print(df.head())
    print("\nSuspension Attack Messages (first 5):")
    print(df[df['Label'] == 3][['Timestamp', 'CAN_ID', 'CAN_ID_Inter_Arrival', 'Suspension_Indicator', 'Label']].head())
    print("\nFuzzing Attack Messages (first 5):")
    print(df[df['Label'] == 2][['Timestamp', 'CAN_ID', 'Payload_Entropy', 'Label']].head())
    print("\nDoS Attack Messages (first 5):")
    print(df[df['Label'] == 1][['Timestamp', 'CAN_ID', 'CAN_ID_Window_Count', 'Label']].head())

## Step 3: Execute Processing

Run the processing function with the specified input and output paths.

In [6]:
# Define input and output paths
input_file_path = r"C:\Users\pc\OneDrive\Bureau\VS_code_Projects\MLproject_Predictive_Maintenance_for_Vehicles_Using_CAN_Bus_Data\dataSet\raw\full_data_capture.log"
output_file_path = r"C:\Users\pc\OneDrive\Bureau\VS_code_Projects\MLproject_Predictive_Maintenance_for_Vehicles_Using_CAN_Bus_Data\dataSet\processed\generated.csv"

# Run processing
convert_can_log_to_csv(input_file_path, output_file_path)

Reading input file with 386567 lines...


Parsing lines: 100%|██████████| 386567/386567 [00:02<00:00, 180864.28it/s]


Loaded 386567 messages into DataFrame.
Debug: Total CAN_ID=2C6 messages: 13752
Injected DoS attack: 40000 messages.
Added 1622 synthetic fuzzing messages.
Injected Fuzzing attack: 2222 messages.
Debug: Deleted 1000 CAN_ID=2C6 messages in Suspension period.
After Suspension attack: 413121 messages remain.
Computed CAN_ID_Inter_Arrival.
Computing CAN_ID_Window_Count for 413121 messages...


Processing CAN_IDs: 100%|██████████| 56/56 [00:03<00:00, 16.48it/s]


Debug: NaN in CAN_ID_Window_Count: 0
Computed CAN_ID_Window_Count.
Computed Payload_Entropy.
Computed Payload_Decimal.
Computed Norm_Payload_Decimal.
Computed Norm_Payload_Entropy.
Computing Suspension_Indicator for 413121 messages...


Processing messages: 100%|██████████| 413121/413121 [09:06<00:00, 756.27it/s]


Computed Suspension_Indicator.
Debug: CAN_ID=2C6 messages after suspension_end (1508687506.0): 2147
Debug: First 5 CAN_ID=2C6 messages post-Suspension:
           Timestamp  CAN_ID_Inter_Arrival  Suspension_Indicator  \
311157  1.508688e+09             20.022096              0.035714   
311183  1.508688e+09              0.019225              0.035714   
311211  1.508688e+09              0.019824              0.035714   
311240  1.508688e+09              0.019994              0.035714   
311267  1.508688e+09              0.020039              0.035714   

        Payload_Entropy  
311157         0.650022  
311183         0.650022  
311211         0.650022  
311240         0.650022  
311267         0.650022  
Computed normal statistics.
Normalizing features for 413121 messages...
Computed normalized features.
Added 75 synthetic suspension messages.
Labeled 2222 Suspension messages.

Feature Distributions by Label:
Label 0 (Normal):
       CAN_ID_Inter_Arrival  CAN_ID_Window_Count  Payloa

Writing CSV rows: 100%|██████████| 413196/413196 [00:57<00:00, 7245.94it/s]


✅ Conversion complete! CSV saved to: C:\Users\pc\OneDrive\Bureau\VS_code_Projects\MLproject_Predictive_Maintenance_for_Vehicles_Using_CAN_Bus_Data\dataSet\processed\generated.csv
File is writable.

Label Counts:
Label
0    370916
1     39983
2      2222
3        75
Name: count, dtype: int64

Columns:
['Timestamp', 'Interface', 'CAN_ID', 'Payload', 'CAN_ID_Inter_Arrival', 'CAN_ID_Window_Count', 'Payload_Entropy', 'Payload_Decimal', 'Norm_Payload_Decimal', 'Norm_Payload_Entropy', 'Suspension_Indicator', 'Norm_Inter_Arrival', 'Norm_Window_Count', 'Label']

Sample Rows:
      Timestamp Interface CAN_ID           Payload  CAN_ID_Inter_Arrival  \
0  1.508687e+09    slcan0    12E  C680027FD0FFFF00                  0.01   
1  1.508687e+09    slcan0    090          1A000000                  0.01   
2  1.508687e+09    slcan0    0C6  7512800A8008BAAC                  0.01   
3  1.508687e+09    slcan0    242    0000FFEFFE000D                  0.01   
4  1.508687e+09    slcan0    29C  00000000FFFFF