# Nissan 370Z CAN Bus Analyzer

This notebook provides interactive analysis and visualization for Nissan 370Z CAN bus data.

## Features:
- Load and parse CAN log files
- Decode common parameters based on known CAN IDs
- Generate visualizations of vehicle data
- Analyze performance metrics

## Setup

In [1]:
# Import required libraries
import sys
import re
import os
import json
import csv
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from collections import defaultdict, Counter
from pathlib import Path
from datetime import datetime

# Set better default style for plots
plt.style.use('ggplot')
sns.set_theme(style="darkgrid")

# Import our decoder functionality
sys.path.append('./')
try:
    from can_decoder_cli import CANMessageParser, CANFileReader
    print("CAN decoder modules imported successfully")
except ImportError as e:
    print(f"Error importing CAN decoder modules: {e}")
    print("Make sure can_decoder_cli.py is in the current directory")

CAN decoder modules imported successfully


## Load and Parse CAN Data

First, let's load a CAN log file for analysis.

In [2]:
# File selection with dropdown
import ipywidgets as widgets
from IPython.display import display

# Find log files (check both arduino_logs and logs directories)
log_files = []
for dir_path in ['arduino_logs', 'logs']:
    if os.path.exists(dir_path):
        log_files.extend([os.path.join(dir_path, f) for f in os.listdir(dir_path) 
                          if f.endswith('.txt') or f.endswith('.log')])

if not log_files:
    print("No log files found in arduino_logs/ or logs/ directories")
    log_files = [""]

# Create dropdown for file selection
file_dropdown = widgets.Dropdown(
    options=log_files,
    description='Log File:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='500px')
)

display(file_dropdown)

Dropdown(description='Log File:', layout=Layout(width='500px'), options=('arduino_logs\\370Z-log-extended.txt'…

In [3]:
# Function to load and parse CAN data
def load_can_data(file_path):
    """Load and parse CAN data from a file"""
    if not file_path or not os.path.exists(file_path):
        print(f"File not found: {file_path}")
        return None
    
    print(f"Loading CAN data from {file_path}...")
    parser = CANMessageParser()
    reader = CANFileReader(file_path, parser)
    messages = reader.read_all()
    
    if not messages:
        print("No valid CAN messages found in the file.")
        return None
    
    # Convert to DataFrame for easier analysis
    df_rows = []
    for msg in messages:
        row = {
            'timestamp': msg['timestamp'],
            'can_id': msg['can_id'],
            'can_id_hex': msg['can_id_hex'],
            'source': msg.get('source', 'Unknown'),
            'data_bytes': msg['data']
        }
        
        # Add decoded fields
        for key, value in msg.items():
            if key not in ['timestamp', 'can_id', 'can_id_hex', 'msg_type', 'data', 'raw', 'source', 'frequency', 'data_bytes']:
                row[key] = value
        
        df_rows.append(row)
    
    df = pd.DataFrame(df_rows)
    print(f"Loaded {len(df)} CAN messages")
    
    # Set timestamp as index and sort
    df = df.sort_values('timestamp')
    
    # Reset timestamp to start from 0
    if len(df) > 0:
        start_time = df['timestamp'].min()
        df['time'] = df['timestamp'] - start_time
    
    return df

# Load button
load_button = widgets.Button(
    description='Load Selected File',
    button_style='primary',
    tooltip='Click to load the selected file'
)

output = widgets.Output()

# Global variable to store the loaded data
can_data = None

def on_load_clicked(b):
    """Handle load button click"""
    global can_data
    with output:
        output.clear_output()
        file_path = file_dropdown.value
        can_data = load_can_data(file_path)
        if can_data is not None:
            print(f"Data preview:")
            display(can_data.head())

load_button.on_click(on_load_clicked)

display(load_button)
display(output)

Button(button_style='primary', description='Load Selected File', style=ButtonStyle(), tooltip='Click to load t…

Output()

## CAN Message Statistics

Let's examine some basic statistics about the CAN messages.

In [None]:
def analyze_can_messages(df):
    """Analyze CAN messages and display statistics"""
    if df is None or len(df) == 0:
        print("No data available for analysis.")
        return
    
    # Count messages by CAN ID
    id_counts = df['can_id_hex'].value_counts()
    
    # Message frequency
    print(f"Total CAN messages: {len(df)}")
    print(f"Time span: {df['time'].max():.2f} seconds")
    print(f"Average message rate: {len(df)/df['time'].max():.2f} messages/second")
    print("\nMost frequent CAN IDs:")
    
    # Create a table of ID statistics
    id_stats = []
    for can_id_hex, count in id_counts.head(10).items():
        # Get source/function for this ID
        id_df = df[df['can_id_hex'] == can_id_hex]
        source = id_df['source'].iloc[0]
        
        # Calculate frequency
        if len(id_df) > 1:
            # Calculate average time between messages
            time_diffs = id_df['time'].diff().dropna()
            avg_interval = time_diffs.mean()
            frequency = 1 / avg_interval if avg_interval > 0 else float('inf')
        else:
            frequency = 0
        
        id_stats.append({
            'CAN ID': can_id_hex,
            'Source': source,
            'Count': count,
            'Percentage': 100 * count / len(df),
            'Frequency (Hz)': frequency
        })
    
    stats_df = pd.DataFrame(id_stats)
    display(stats_df)
    
    # Plot message distribution
    plt.figure(figsize=(12, 6))
    
    # Use the actual hex string for the plot
    top_ids = id_counts.head(10).index.tolist()
    plt.bar(top_ids, id_counts.head(10).values)
    
    plt.title('CAN Message Distribution by ID')
    plt.xlabel('CAN ID (hex)')
    plt.ylabel('Number of Messages')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

# Button to run analysis
analyze_button = widgets.Button(
    description='Analyze CAN Messages',
    button_style='info',
    tooltip='Click to analyze CAN message statistics'
)

analysis_output = widgets.Output()

def on_analyze_clicked(b):
    """Handle analyze button click"""
    with analysis_output:
        analysis_output.clear_output()
        analyze_can_messages(can_data)

analyze_button.on_click(on_analyze_clicked)

display(analyze_button)
display(analysis_output)

Button(button_style='info', description='Analyze CAN Messages', style=ButtonStyle(), tooltip='Click to analyze…

Output()

## Vehicle Performance Metrics

Now let's visualize key performance metrics over time.

In [5]:
def plot_vehicle_metrics(df):
    """Plot key vehicle metrics over time"""
    if df is None or len(df) == 0:
        print("No data available for plotting.")
        return
    
    # Create dataframes for metrics we want to plot
    rpm_df = df[df['can_id'] == 0x180].copy() if 'rpm' in df.columns else None
    speed_df = df[df['can_id'] == 0x280].copy() if 'speed_kph' in df.columns else None
    throttle_df = df[df['can_id'] == 0x180].copy() if 'throttle_pct' in df.columns else None
    steering_df = df[df['can_id'] == 0x002].copy() if 'steering_angle' in df.columns else None
    temp_df = df[df['can_id'] == 0x551].copy() if 'engine_temp' in df.columns else None
    
    # Create multi-plot figure
    fig, axes = plt.subplots(nrows=3, ncols=2, figsize=(14, 12))
    fig.suptitle('Vehicle Performance Metrics', fontsize=16)
    
    # Plot RPM
    ax1 = axes[0, 0]
    if rpm_df is not None and len(rpm_df) > 0:
        ax1.plot(rpm_df['time'], rpm_df['rpm'], 'r-')
        ax1.set_title('Engine RPM')
        ax1.set_xlabel('Time (s)')
        ax1.set_ylabel('RPM')
        ax1.grid(True)
    else:
        ax1.text(0.5, 0.5, 'No RPM data available', ha='center', va='center')
    
    # Plot Speed
    ax2 = axes[0, 1]
    if speed_df is not None and len(speed_df) > 0:
        ax2.plot(speed_df['time'], speed_df['speed_kph'], 'b-')
        ax2_twin = ax2.twinx()
        ax2_twin.plot(speed_df['time'], speed_df['speed_kph'] * 0.621371, 'g-')
        ax2.set_title('Vehicle Speed')
        ax2.set_xlabel('Time (s)')
        ax2.set_ylabel('Speed (km/h)', color='b')
        ax2_twin.set_ylabel('Speed (mph)', color='g')
        ax2.grid(True)
    else:
        ax2.text(0.5, 0.5, 'No speed data available', ha='center', va='center')
    
    # Plot Throttle
    ax3 = axes[1, 0]
    if throttle_df is not None and len(throttle_df) > 0:
        ax3.plot(throttle_df['time'], throttle_df['throttle_pct'], 'g-')
        ax3.set_title('Throttle Position')
        ax3.set_xlabel('Time (s)')
        ax3.set_ylabel('Throttle (%)')
        ax3.set_ylim([0, 100])
        ax3.grid(True)
    else:
        ax3.text(0.5, 0.5, 'No throttle data available', ha='center', va='center')
    
    # Plot Steering Angle
    ax4 = axes[1, 1]
    if steering_df is not None and len(steering_df) > 0:
        ax4.plot(steering_df['time'], steering_df['steering_angle'], 'c-')
        ax4.set_title('Steering Angle')
        ax4.set_xlabel('Time (s)')
        ax4.set_ylabel('Angle (degrees)')
        ax4.grid(True)
    else:
        ax4.text(0.5, 0.5, 'No steering data available', ha='center', va='center')
    
    # Plot Engine Temperature
    ax5 = axes[2, 0]
    if temp_df is not None and len(temp_df) > 0:
        ax5.plot(temp_df['time'], temp_df['engine_temp'], 'r-')
        ax5.set_title('Engine Temperature')
        ax5.set_xlabel('Time (s)')
        ax5.set_ylabel('Temperature (°C)')
        ax5.grid(True)
    else:
        ax5.text(0.5, 0.5, 'No temperature data available', ha='center', va='center')
    
    # Calculate acceleration if we have speed data
    ax6 = axes[2, 1]
    if speed_df is not None and len(speed_df) > 1:
        # Calculate acceleration (m/s²) from speed differences
        speed_df['speed_ms'] = speed_df['speed_kph'] * (1000/3600)  # Convert km/h to m/s
        speed_df['acceleration'] = speed_df['speed_ms'].diff() / speed_df['time'].diff()
        
        # Apply a simple moving average to smooth the acceleration data
        window_size = 5
        if len(speed_df) > window_size:
            speed_df['acceleration_smooth'] = speed_df['acceleration'].rolling(window=window_size, center=True).mean()
            ax6.plot(speed_df['time'], speed_df['acceleration_smooth'], 'm-')
        else:
            ax6.plot(speed_df['time'], speed_df['acceleration'], 'm-')
        
        ax6.set_title('Acceleration')
        ax6.set_xlabel('Time (s)')
        ax6.set_ylabel('Acceleration (m/s²)')
        ax6.grid(True)
    else:
        ax6.text(0.5, 0.5, 'No acceleration data available', ha='center', va='center')
    
    # Adjust layout
    plt.tight_layout(rect=[0, 0.03, 1, 0.95])
    plt.show()

# Button to plot metrics
plot_metrics_button = widgets.Button(
    description='Plot Vehicle Metrics',
    button_style='success',
    tooltip='Click to plot vehicle performance metrics'
)

metrics_output = widgets.Output()

def on_plot_metrics_clicked(b):
    """Handle plot metrics button click"""
    with metrics_output:
        metrics_output.clear_output()
        plot_vehicle_metrics(can_data)

plot_metrics_button.on_click(on_plot_metrics_clicked)

display(plot_metrics_button)
display(metrics_output)

Button(button_style='success', description='Plot Vehicle Metrics', style=ButtonStyle(), tooltip='Click to plot…

Output()

## Message Data Analysis

Let's examine the data for specific CAN IDs in more detail.

In [None]:
# Create dropdown for selecting a CAN ID
def update_can_id_dropdown():
    """Update the CAN ID dropdown with available IDs"""
    if can_data is None or len(can_data) == 0:
        return ["No data available"]
    
    # Get unique CAN IDs with counts
    id_counts = can_data['can_id_hex'].value_counts()
    options = [f"{id_hex} ({count} msgs)" for id_hex, count in id_counts.items()]
    return options

id_dropdown = widgets.Dropdown(
    options=update_can_id_dropdown(),
    description='CAN ID:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='400px')
)

# Function to extract the CAN ID from dropdown selection
def get_selected_can_id(selection):
    if selection.startswith("No data"):
        return None
    # Extract the hex CAN ID from the selection
    return selection.split(' ')[0]

# Button to analyze specific CAN ID
analyze_id_button = widgets.Button(
    description='Analyze Selected ID',
    button_style='warning',
    tooltip='Click to analyze the selected CAN ID'
)

id_analysis_output = widgets.Output()

def analyze_specific_id(can_id_hex):
    """Analyze data for a specific CAN ID"""
    if can_data is None or len(can_data) == 0 or can_id_hex is None:
        print("No data available for analysis.")
        return
    
    # Filter data for the selected CAN ID
    id_df = can_data[can_data['can_id_hex'] == can_id_hex].copy()
    
    if len(id_df) == 0:
        print(f"No messages found for CAN ID {can_id_hex}")
        return
    
    # Display basic info
    source = id_df['source'].iloc[0]
    print(f"Analysis for CAN ID {can_id_hex} ({source})")
    print(f"Total messages: {len(id_df)}")
    
    # Calculate frequency
    if len(id_df) > 1:
        time_diffs = id_df['time'].diff().dropna()
        avg_interval = time_diffs.mean()
        frequency = 1 / avg_interval if avg_interval > 0 else float('inf')
        print(f"Average interval: {avg_interval * 1000:.2f} ms")
        print(f"Frequency: {frequency:.2f} Hz")
    
    # Show data byte distribution
    print("\nData byte statistics:")
    
    # Get the maximum length of data bytes
    max_len = id_df['data_bytes'].apply(len).max()
    
    # Create a numpy array of all data bytes
    data_array = np.zeros((len(id_df), max_len), dtype=np.int32)
    for i, data_bytes in enumerate(id_df['data_bytes']):
        data_array[i, :len(data_bytes)] = data_bytes
    
    # Calculate statistics for each byte position
    byte_stats = []
    for i in range(max_len):
        byte_values = data_array[:, i]
        unique_values = np.unique(byte_values)
        
        byte_stats.append({
            'Position': i,
            'Min': np.min(byte_values),
            'Max': np.max(byte_values),
            'Mean': np.mean(byte_values),
            'StdDev': np.std(byte_values),
            'Unique Values': len(unique_values),
            'Most Common': Counter(byte_values).most_common(1)[0][0]
        })
    
    display(pd.DataFrame(byte_stats))
    
    # Plot data byte values over time (for bytes that change)
    changing_bytes = [i for i in range(max_len) if np.std(data_array[:, i]) > 0]
    
    if changing_bytes:
        fig, ax = plt.subplots(figsize=(12, 6))
        for i in changing_bytes[:5]:  # Limit to 5 bytes for clarity
            ax.plot(id_df['time'], data_array[:, i], 'o-', label=f'Byte {i}')
        
        ax.set_title(f'Data Byte Values Over Time for CAN ID {can_id_hex}')
        ax.set_xlabel('Time (s)')
        ax.set_ylabel('Byte Value')
        ax.grid(True)
        ax.legend()
        plt.show()
    
    # Plot correlation between bytes (if there are multiple changing bytes)
    if len(changing_bytes) > 1:
        print("\nCorrelation between changing bytes:")
        corr_matrix = np.corrcoef(data_array[:, changing_bytes], rowvar=False)
        
        # Plot correlation matrix
        fig, ax = plt.subplots(figsize=(8, 6))
        im = ax.imshow(corr_matrix, cmap='coolwarm', vmin=-1, vmax=1)
        
        # Add labels
        byte_labels = [f'Byte {i}' for i in changing_bytes]
        ax.set_xticks(range(len(changing_bytes)))
        ax.set_yticks(range(len(changing_bytes)))
        ax.set_xticklabels(byte_labels)
        ax.set_yticklabels(byte_labels)
        
        # Rotate x labels
        plt.setp(ax.get_xticklabels(), rotation=45, ha='right', rotation_mode='anchor')
        
        # Add colorbar
        cbar = fig.colorbar(im, ax=ax)
        cbar.set_label('Correlation')
        
        ax.set_title(f'Correlation Matrix for CAN ID {can_id_hex} Data Bytes')
        fig.tight_layout()
        plt.show()
    
    # Display decoded values if available
    decoded_cols = [col for col in id_df.columns if col not in 
                   ['timestamp', 'can_id', 'can_id_hex', 'source', 'data_bytes', 'time', 'frequency', 'msg_type', 'data']]
    
    if decoded_cols:
        print("\nDecoded values:")
        display(id_df[['time'] + decoded_cols].head(10))
        
        # Plot decoded values over time
        fig, ax = plt.subplots(figsize=(12, 6))
        for col in decoded_cols:
            if id_df[col].dtype in [np.int64, np.float64]:
                ax.plot(id_df['time'], id_df[col], 'o-', label=col)
        
        if ax.get_legend_handles_labels()[0]:  # Check if anything was plotted
            ax.set_title(f'Decoded Values Over Time for CAN ID {can_id_hex}')
            ax.set_xlabel('Time (s)')
            ax.set_ylabel('Value')
            ax.grid(True)
            ax.legend()
            plt.show()
    
    # Sample raw messages
    print("\nSample messages:")
    for i, (_, row) in enumerate(id_df.head(5).iterrows()):
        data_str = ' '.join([f'{b:02X}' for b in row['data_bytes']])
        print(f"{row['time']:.3f}s: {row['can_id_hex']} - {data_str}")

def on_analyze_id_clicked(b):
    """Handle analyze ID button click"""
    with id_analysis_output:
        id_analysis_output.clear_output()
        selected_id = get_selected_can_id(id_dropdown.value)
        analyze_specific_id(selected_id)

analyze_id_button.on_click(on_analyze_id_clicked)

# Button to refresh the CAN ID dropdown
refresh_button = widgets.Button(
    description='Refresh ID List',
    tooltip='Refresh the list of CAN IDs after loading data'
)

def on_refresh_clicked(b):
    """Handle refresh button click"""
    id_dropdown.options = update_can_id_dropdown()

refresh_button.on_click(on_refresh_clicked)

display(widgets.HBox([id_dropdown, refresh_button]))
display(analyze_id_button)
display(id_analysis_output)

HBox(children=(Dropdown(description='CAN ID:', layout=Layout(width='400px'), options=('No data available',), s…



Output()

## Drive Cycle Analysis

Let's analyze a driving session to extract performance metrics.

In [None]:
def analyze_drive_cycle(df):
    """Analyze a drive cycle for performance metrics"""
    if df is None or len(df) == 0:
        print("No data available for analysis.")
        return
    
    # Get speed data if available
    speed_df = df[df['can_id'] == 0x280].copy() if 'speed_kph' in df.columns else None
    rpm_df = df[df['can_id'] == 0x180].copy() if 'rpm' in df.columns else None
    
    if speed_df is None or len(speed_df) < 10:
        print("Insufficient speed data for drive cycle analysis.")
        return
    
    # Calculate metrics
    duration = speed_df['time'].max() - speed_df['time'].min()
    max_speed = speed_df['speed_kph'].max()
    avg_speed = speed_df['speed_kph'].mean()
    
    # Convert km/h to m/s for calculations
    speed_df['speed_ms'] = speed_df['speed_kph'] * (1000/3600)
    
    # Calculate acceleration
    speed_df['acceleration'] = speed_df['speed_ms'].diff() / speed_df['time'].diff()
    speed_df = speed_df.dropna()
    
    max_accel = speed_df['acceleration'].max()
    max_decel = speed_df['acceleration'].min()
    
    # Additional metrics from RPM data if available
    if rpm_df is not None and len(rpm_df) > 0:
        max_rpm = rpm_df['rpm'].max()
        avg_rpm = rpm_df['rpm'].mean()
    else:
        max_rpm = "N/A"
        avg_rpm = "N/A"
    
    # Identify acceleration events (0-60 mph, quarter mile, etc.)
    accel_events = []
    
    # Function to find time to reach a target speed from stationary (or near stationary)
    def find_acceleration_time(start_speed, target_speed):
        # Find all periods where the car starts at or below start_speed
        speed_data = speed_df.sort_values('time')
        potential_starts = speed_data[speed_data['speed_kph'] <= start_speed].index
        
        for start_idx in potential_starts:
            # Look for a subsequent point where we reach target speed
            start_time = speed_data.loc[start_idx, 'time']
            after_start = speed_data[speed_data['time'] > start_time]
            
            if len(after_start) == 0:
                continue
            
            # Find the first point where we reach or exceed target speed
            target_reached = after_start[after_start['speed_kph'] >= target_speed]
            
            if len(target_reached) > 0:
                target_idx = target_reached.index[0]
                target_time = speed_data.loc[target_idx, 'time']
                
                elapsed_time = target_time - start_time
                
                # Verify this is a continuous acceleration (no significant pauses)
                continuous = True
                between_points = speed_data[(speed_data['time'] > start_time) & (speed_data['time'] < target_time)]
                
                if len(between_points) > 0:
                    # Check for any significant deceleration or pauses
                    if between_points['acceleration'].min() < -1.0:  # Significant braking
                        continuous = False
                    
                    # Check for any long pauses at constant speed
                    speed_diffs = between_points['speed_kph'].diff().abs()
                    if (speed_diffs < 0.1).sum() > 5:  # Several points with minimal speed change
                        continuous = False
                
                if continuous:
                    return elapsed_time, start_time, target_time
        
        return None, None, None
    
    # Calculate 0-60 mph time
    zero_to_60_mph = 96.56  # Convert 60 mph to km/h
    time_to_60, start_60, end_60 = find_acceleration_time(5, zero_to_60_mph)  # Allow a small start speed to account for creeping
    
    if time_to_60 is not None:
        accel_events.append({
            'Event': '0-60 mph',
            'Time': time_to_60,
            'Start': start_60,
            'End': end_60
        })
    
    # Calculate 0-100 km/h time
    time_to_100, start_100, end_100 = find_acceleration_time(5, 100)  # Allow a small start speed
    
    if time_to_100 is not None:
        accel_events.append({
            'Event': '0-100 km/h',
            'Time': time_to_100,
            'Start': start_100,
            'End': end_100
        })
    
    # Present the results
    print(f"Drive Cycle Analysis:\n")
    print(f"Duration: {duration:.1f} seconds ({duration/60:.1f} minutes)")
    print(f"Max Speed: {max_speed:.1f} km/h ({max_speed * 0.621371:.1f} mph)")
    print(f"Average Speed: {avg_speed:.1f} km/h ({avg_speed * 0.621371:.1f} mph)")
    print(f"Max RPM: {max_rpm}")
    print(f"Average RPM: {avg_rpm if isinstance(avg_rpm, str) else avg_rpm:.1f}")
    print(f"Max Acceleration: {max_accel:.2f} m/s² ({max_accel * 0.101972:.2f} g)")
    print(f"Max Deceleration: {max_decel:.2f} m/s² ({max_decel * 0.101972:.2f} g)")
    
    # Show acceleration events
    if accel_events:
        print("\nAcceleration Events:")
        for event in accel_events:
            print(f"  {event['Event']}: {event['Time']:.2f} seconds")
    
    # Plot acceleration events
    if accel_events and (time_to_60 is not None or time_to_100 is not None):
        plt.figure(figsize=(12, 6))
        plt.plot(speed_df['time'], speed_df['speed_kph'], 'b-', label='Speed (km/h)')
        
        # Add markers for acceleration events
        for event in accel_events:
            plt.axvspan(event['Start'], event['End'], alpha=0.2, color='green', label=f"{event['Event']} ({event['Time']:.2f}s)")
        
        plt.title('Drive Cycle Speed Profile with Acceleration Events')
        plt.xlabel('Time (s)')
        plt.ylabel('Speed (km/h)')
        plt.grid(True)
        plt.legend()
        plt.tight_layout()
        plt.show()

# Button to analyze drive cycle
analyze_drive_button = widgets.Button(
    description='Analyze Drive Cycle',
    button_style='danger',
    tooltip='Click to analyze drive cycle performance metrics'
)

drive_output = widgets.Output()

def on_analyze_drive_clicked(b):
    """Handle analyze drive cycle button click"""
    with drive_output:
        drive_output.clear_output()
        analyze_drive_cycle(can_data)

analyze_drive_button.on_click(on_analyze_drive_clicked)

display(analyze_drive_button)
display(drive_output)

Button(button_style='danger', description='Analyze Drive Cycle', style=ButtonStyle(), tooltip='Click to analyz…

Output()

## Save Analysis Results

Save the decoded data and analysis results for future reference.

In [8]:
def export_data(df, format_type):
    """Export the decoded CAN data"""
    if df is None or len(df) == 0:
        print("No data available for export.")
        return
    
    # Create logs directory if it doesn't exist
    os.makedirs('logs', exist_ok=True)
    
    # Generate timestamp for filenames
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    if format_type == 'csv':
        # Export to CSV
        filename = f"logs/can_data_export_{timestamp}.csv"
        
        # Create a copy of the DataFrame without the data_bytes column (which isn't CSV friendly)
        export_df = df.copy()
        if 'data_bytes' in export_df.columns:
            export_df['data_hex'] = export_df['data_bytes'].apply(lambda x: ' '.join([f'{b:02X}' for b in x]))
            export_df = export_df.drop('data_bytes', axis=1)
        
        export_df.to_csv(filename, index=False)
        print(f"Exported {len(export_df)} records to {filename}")
        
    elif format_type == 'excel':
        # Export to Excel
        try:
            import openpyxl
        except ImportError:
            print("Error: openpyxl package required for Excel export. Install with 'pip install openpyxl'")
            return
        
        filename = f"logs/can_data_export_{timestamp}.xlsx"
        
        # Create a copy of the DataFrame without the data_bytes column (which isn't Excel friendly)
        export_df = df.copy()
        if 'data_bytes' in export_df.columns:
            export_df['data_hex'] = export_df['data_bytes'].apply(lambda x: ' '.join([f'{b:02X}' for b in x]))
            export_df = export_df.drop('data_bytes', axis=1)
        
        # Split data by CAN ID
        with pd.ExcelWriter(filename) as writer:
            # Write all data to first sheet
            export_df.to_excel(writer, sheet_name='All_Data', index=False)
            
            # Create sheets for top 10 most frequent CAN IDs
            top_ids = export_df['can_id_hex'].value_counts().head(10).index
            for can_id_hex in top_ids:
                id_df = export_df[export_df['can_id_hex'] == can_id_hex]
                sheet_name = can_id_hex.replace('0x', 'ID_')
                id_df.to_excel(writer, sheet_name=sheet_name, index=False)
        
        print(f"Exported {len(export_df)} records to {filename} with sheets for top 10 CAN IDs")
        
    elif format_type == 'json':
        # Export to JSON
        filename = f"logs/can_data_export_{timestamp}.json"
        
        # Create a copy of the DataFrame and convert data_bytes to a serializable format
        export_df = df.copy()
        if 'data_bytes' in export_df.columns:
            export_df['data_bytes'] = export_df['data_bytes'].apply(lambda x: [int(b) for b in x])
        
        # Convert to JSON
        export_df.to_json(filename, orient='records')
        print(f"Exported {len(export_df)} records to {filename}")

# Format selection radio buttons
export_format = widgets.RadioButtons(
    options=['csv', 'excel', 'json'],
    description='Export Format:',
    disabled=False
)

# Export button
export_button = widgets.Button(
    description='Export Data',
    button_style='info',
    tooltip='Export the decoded CAN data to the selected format'
)

export_output = widgets.Output()

def on_export_clicked(b):
    """Handle export button click"""
    with export_output:
        export_output.clear_output()
        export_data(can_data, export_format.value)

export_button.on_click(on_export_clicked)

display(export_format)
display(export_button)
display(export_output)

RadioButtons(description='Export Format:', options=('csv', 'excel', 'json'), value='csv')

Button(button_style='info', description='Export Data', style=ButtonStyle(), tooltip='Export the decoded CAN da…

Output()