In [2]:
# ========================================
# ESTRAZIONE CHECKSUM DA MESSAGGIO 
# ========================================

import pandas as pd
import cantools
from python.psa_checksum import psa_checksum

print("üöÄ INIZIO ESTRAZIONE CHECKSUM\n")

# Configurazione
MESSAGE_ID = 0x3AD
BUS = 2
#LOG_FILE = 'logs/aa0ad8ba95ff270c|00000017--62c22bb216.csv'
#LOG_FILE = 'logs/6a7075a4fdd765ee|0000004e--1f612006dd.csv'
LOG_FILE = 'logs/sunny1.csv'
DBC_FILE = 'dbc/psa_aee2010_r3.dbc'

üöÄ INIZIO ESTRAZIONE CHECKSUM



In [3]:
# 1. CARICA DBC
print(f"üìÇ Carico DBC: {DBC_FILE}...")
try:
    dbc = cantools.database.load_file(DBC_FILE, strict=False)
    print(f"‚úÖ DBC caricato\n")
except FileNotFoundError:
    print(f"‚ùå File {DBC_FILE} non trovato!")
    exit()


üìÇ Carico DBC: dbc/psa_aee2010_r3.dbc...
‚úÖ DBC caricato



In [4]:
# 2. MOSTRA DEFINIZIONE MESSAGGIO
print("=" * 80)
print(f"DEFINIZIONE MESSAGGIO 0x{MESSAGE_ID:03X}")
print("=" * 80)

try:
    msg = dbc.get_message_by_frame_id(MESSAGE_ID)
    print(f"Nome: {msg.name}")
    print(f"ID: 0x{MESSAGE_ID:03X} ({MESSAGE_ID})")
    print(f"Lunghezza: {msg.length} bytes")
    print(f"Segnali: {len(msg.signals)}")
    print()
    
    # Mostra tutti i segnali E trova TUTTI i checksum
    print("üìã SEGNALI NEL MESSAGGIO:")
    print("-" * 80)
    checksum_signals = []  # Lista di TUTTI i checksum
    
    for sig in msg.signals:
        print(f"  {sig.name:30s} | Start: {sig.start:3d} | Length: {sig.length:2d} bit | Scale: {sig.scale} | Offset: {sig.offset}")
        
        # Cerca TUTTI i checksum
        if 'CHECKSUM' in sig.name.upper() or 'CRC' in sig.name.upper():
            checksum_signals.append(sig)
            print(f"     ‚ö° QUESTO √à UN CHECKSUM!")
    
    print()
    
    if checksum_signals:
        print(f"‚úÖ Trovati {len(checksum_signals)} checksum:")
        for cs in checksum_signals:
            print(f"   - {cs.name}: bit {cs.start}, lunghezza {cs.length} bit")
    else:
        print("‚ö†Ô∏è  Nessun segnale con nome CHECKSUM o CRC trovato")
        exit()
        
except Exception as e:
    print(f"‚ùå Errore nel leggere il messaggio: {e}")
    exit()

DEFINIZIONE MESSAGGIO 0x3AD
Nome: Dyn_EasyMove
ID: 0x3AD (941)
Lunghezza: 8 bytes
Segnali: 2

üìã SEGNALI NEL MESSAGGIO:
--------------------------------------------------------------------------------
‚ùå Errore nel leggere il messaggio: 'Signal' object has no attribute 'start_bit'


In [None]:
# 3. CARICA CSV
print(f"\nüìÇ Carico CSV: {LOG_FILE}...")
df = pd.read_csv(LOG_FILE)
print(f"‚úÖ Caricato: {len(df)} righe\n")


In [None]:
# 4. FILTRA MESSAGGI
print (BUS)
print(f"üîç Filtro messaggi 0x{MESSAGE_ID:03X} su bus {BUS}...")
df_filtered = df[(df['bus'] == BUS) & (df['addr'] == f'0x{MESSAGE_ID:x}')].copy()
print(f"‚úÖ Trovati {len(df_filtered)} messaggi\n")

if len(df_filtered) == 0:
    print("‚ùå Nessun messaggio trovato!")
    exit()

In [None]:
# 5. DECODIFICA E ESTRAI CHECKSUM
print("üîÑ Decodifica messaggi ed estrazione checksum...\n")

decoded_messages = []
for idx, row in df_filtered.iterrows():
    try:
        time = row['time']
        data_hex = row['data']
        
        # Prepara bytes
        if data_hex.startswith('0x'):
            data_hex = data_hex[2:]
        if len(data_hex) % 2:
            data_hex = '0' + data_hex
        data_bytes = bytes.fromhex(data_hex)
        
        # Decodifica
        decoded = dbc.decode_message(MESSAGE_ID, data_bytes)
        
        # Aggiungi info base
        decoded['time'] = time
        decoded['raw_hex'] = data_hex
        decoded['raw_bytes'] = ' '.join([data_hex[i:i+2].upper() for i in range(0, len(data_hex), 2)])
        
        # Verifica TUTTI i checksum
        all_checksums_ok = True
        
        for cs_sig in checksum_signals:
            # Calcola checksum - CREA COPIA FRESCA OGNI VOLTA
            data_array = bytearray(data_bytes)  # Copia fresca per ogni checksum!
            calculated = psa_checksum(MESSAGE_ID, cs_sig, data_array)
    
            # Estrai checksum dal messaggio
            extracted = decoded.get(cs_sig.name, None)
            
            # Converti NamedSignalValue in int
            if hasattr(extracted, 'value'):
                extracted = int(extracted.value)
            else:
                extracted = int(extracted) if extracted is not None else 0
                
            # Verifica
            is_ok = (calculated == extracted)
            all_checksums_ok = all_checksums_ok and is_ok
            
            # Salva con nome univoco per ogni checksum
            decoded[f'{cs_sig.name}_extracted'] = extracted
            decoded[f'{cs_sig.name}_calculated'] = calculated
            decoded[f'{cs_sig.name}_ok'] = is_ok
        
        # Flag globale: OK solo se TUTTI sono OK
        decoded['all_checksums_ok'] = all_checksums_ok
        
        decoded_messages.append(decoded)
        
    except Exception as e:
        print(f"‚ö†Ô∏è  Errore decodifica a t={time:.3f}s: {e}")

df_decoded = pd.DataFrame(decoded_messages)
print(f"‚úÖ Decodificati {len(df_decoded)} messaggi\n")

In [None]:
# 6. MOSTRA RISULTATI
print("=" * 80)
print("MESSAGGI CON CHECKSUM")
print("=" * 80)
print()

N = 20
print(f"üìã Primi {N} messaggi:\n")

for idx, row in df_decoded.head(N).iterrows():
    time = row['time']
    raw_bytes = row['raw_bytes']
    all_ok = row['all_checksums_ok']
    
    # Status globale
    status = "‚úì OK" if all_ok else "‚úó FAIL"
    
    print(f"[{time:8.3f}s] {status:6s} | {raw_bytes}")
    
    # Mostra ogni checksum su riga separata
    for cs in checksum_signals:
        extr_val = row[f'{cs.name}_extracted']
        calc_val = row[f'{cs.name}_calculated']
        
        # Converti in int gestendo NamedSignalValue
        try:
            extr = int(float(extr_val))  # <-- float() poi int()
        except:
            extr = int(extr_val.value) if hasattr(extr_val, 'value') else int(extr_val)
            
        try:
            calc = int(float(calc_val))  # <-- float() poi int()
        except:
            calc = int(calc_val.value) if hasattr(calc_val, 'value') else int(calc_val)
        
        ok_symbol = "‚úì" if row[f'{cs.name}_ok'] else "‚úó"
        print(f"   {cs.name:30s}: Estratto=0x{extr:X}  Calcolato=0x{calc:X}  [{ok_symbol}]")
    
    # Altri segnali
    exclude_cols = ['time', 'raw_hex', 'raw_bytes', 'all_checksums_ok']
    for cs in checksum_signals:
        exclude_cols.extend([cs.name, f'{cs.name}_extracted', f'{cs.name}_calculated', f'{cs.name}_ok'])
    
    other_signals = []
    for col in df_decoded.columns:
        if col not in exclude_cols:
            val = row[col]
            if isinstance(val, float):
                other_signals.append(f"{col}={val:.2f}")
            else:
                other_signals.append(f"{col}={val}")
    
    if other_signals:
        print(f"   Segnali: {', '.join(other_signals[:4])}")
    print()

In [8]:
# 7. STATISTICHE CHECKSUM
print(f"\n{'=' * 80}")
print("üìä STATISTICHE CHECKSUM")
print("=" * 80)

all_ok_count = df_decoded['all_checksums_ok'].sum()
all_fail_count = len(df_decoded) - all_ok_count

print(f"Totale messaggi:       {len(df_decoded)}")
print(f"Tutti checksum OK:     {all_ok_count} ({all_ok_count/len(df_decoded)*100:.1f}%)")
print(f"Almeno 1 FAIL:         {all_fail_count} ({all_fail_count/len(df_decoded)*100:.1f}%)")
print()

# Statistiche per ogni checksum
for cs in checksum_signals:
    print(f"--- {cs.name} ---")
    ok_count = df_decoded[f'{cs.name}_ok'].sum()
    fail_count = len(df_decoded) - ok_count
    print(f"  OK:   {ok_count} ({ok_count/len(df_decoded)*100:.1f}%)")
    print(f"  FAIL: {fail_count} ({fail_count/len(df_decoded)*100:.1f}%)")
    print()

if all_fail_count > 0:
    print(f"‚ö†Ô∏è  Esempi di messaggi con checksum errati (primi 5):")
    fails = df_decoded[~df_decoded['all_checksums_ok']].head(5)
    for idx, row in fails.iterrows():
        print(f"\n   t={row['time']:.3f}s: {row['raw_bytes']}")
        for cs in checksum_signals:
            if not row[f'{cs.name}_ok']:
                extr_val = row[f'{cs.name}_extracted']
                calc_val = row[f'{cs.name}_calculated']
                # Prova diversi modi per convertire
                try:
                    extr = int(float(extr_val))
                    calc = int(float(calc_val))
                except:
                    extr = extr_val
                    calc = calc_val
                print(f"      {cs.name}: Estratto={extr}, Calcolato={calc}")
    print()


üìä STATISTICHE CHECKSUM
Totale messaggi:       18893
Tutti checksum OK:     18893 (100.0%)
Almeno 1 FAIL:         0 (0.0%)

--- CHECKSUM ---
  OK:   18893 (100.0%)
  FAIL: 0 (0.0%)



In [9]:
# 8. SALVA CSV
print(f"\n{'=' * 80}")
print("üíæ SALVATAGGIO CSV")
print("=" * 80)

# Prepara colonne dinamicamente
export_cols = ['time', 'raw_bytes']

# Aggiungi tutte le colonne checksum
for cs in checksum_signals:
    export_cols.extend([f'{cs.name}_extracted', f'{cs.name}_calculated', f'{cs.name}_ok'])

export_cols.append('all_checksums_ok')

# Aggiungi altri segnali
exclude = export_cols + ['raw_hex'] + [cs.name for cs in checksum_signals]
other_cols = [col for col in df_decoded.columns if col not in exclude]

final_cols = export_cols + other_cols
df_export = df_decoded[final_cols].copy()

# Rinomina colonne per chiarezza
rename_dict = {}
for cs in checksum_signals:
    rename_dict[f'{cs.name}_extracted'] = f'{cs.name}'
    rename_dict[f'{cs.name}_calculated'] = f'{cs.name}_recalculated'

df_export.rename(columns=rename_dict, inplace=True)

output_file = f'output/message_0x{MESSAGE_ID:03X}_with_checksum.csv'
df_export.to_csv(output_file, index=False)

print(f"‚úÖ Salvato: {output_file}")
print(f"   Colonne: {', '.join(df_export.columns)}")
print(f"   Righe: {len(df_export)}")


üíæ SALVATAGGIO CSV
‚úÖ Salvato: output/message_0x305_with_checksum.csv
   Colonne: time, raw_bytes, CHECKSUM, CHECKSUM_recalculated, CHECKSUM_ok, all_checksums_ok, ANGLE, RATE, RATE_SIGN, COUNTER, RATE_ALT
   Righe: 18893


In [10]:
# 9. SALVA CSV CON SOLO CHECKSUM ERRATI
if all_fail_count > 0:
    print(f"\n{'=' * 80}")
    print("‚ùå CHECKSUM ERRATI")
    print("=" * 80)
    
    # Filtra solo i record con checksum diverso
    df_fails = df_decoded[~df_decoded['all_checksums_ok']].copy()
    
    # Usa le stesse colonne del CSV principale
    df_fails_export = df_fails[final_cols].copy()
    df_fails_export.rename(columns=rename_dict, inplace=True)
    
    # Salva CSV
    fails_file = f'output/message_0x{MESSAGE_ID:03X}_checksum_FAILS.csv'
    df_fails_export.to_csv(fails_file, index=False)
    
    print(f"‚úÖ Salvato CSV con checksum errati: {fails_file}")
    print(f"   Righe: {len(df_fails_export)}")
    print()
    
    # Mostra record errati (max 20)
    print(f"üìã RECORD CON CHECKSUM DIVERSO (primi 20 di {len(df_fails)}):")
    print("-" * 100)
    
    for idx, row in df_fails.head(20).iterrows():
        print(f"\nTime: {row['time']:.3f}s | {row['raw_bytes']}")
        for cs in checksum_signals:
            ok = row[f'{cs.name}_ok']
            if not ok:  # Mostra solo quelli errati
                extr = row[f'{cs.name}_extracted']
                calc = row[f'{cs.name}_calculated']
                print(f"  {cs.name:30s}: Estratto={extr}  Calcolato={calc}  [‚úó]")
else:
    print(f"\n‚úÖ Tutti i checksum sono corretti! Nessun errore trovato.")


‚úÖ Tutti i checksum sono corretti! Nessun errore trovato.


In [11]:
# 9. SAVE CSV WITH ONLY FAILED CHECKSUMS
if all_fail_count > 0:
    print(f"\n{'=' * 80}")
    print("‚ùå CHECKSUM ERRATI")
    print("=" * 80)
    
    # Filter only rows with checksum mismatch
    df_fails = df_decoded[~df_decoded['all_checksums_ok']].copy()
    
    # Use same columns as main CSV
    df_fails_export = df_fails[final_cols].copy()
    df_fails_export.rename(columns=rename_dict, inplace=True)
    
    # Save CSV
    fails_file = f'output/message_0x{MESSAGE_ID:03X}_checksum_FAILS.csv'
    df_fails_export.to_csv(fails_file, index=False)
    
    print(f"‚úÖ CSV with checksum errors saved: {fails_file}")
    print(f"   Rows: {len(df_fails_export)}")
    print()
    
    # Show only the first 5 failed records
    print("üìã FIRST 5 RECORDS WITH CHECKSUM MISMATCH:")
    print("-" * 120)

    for idx, row in df_fails.head(5).iterrows():
        print(f"\nTime: {row['time']:.3f}s | {row['raw_bytes']}")
        for cs in checksum_signals:
            extr = row[f'{cs.name}_extracted']
            calc = row[f'{cs.name}_calculated']
            ok = row[f'{cs.name}_ok']
            status = "‚úì" if ok else "‚úó"
            print(f"  {cs.name:30s}: Extracted=0x{extr:X}, Calculated=0x{calc:X} [{status}]")
else:
    print(f"\n‚úÖ All checksums are correct! No errors found.")

print("\nüéâ COMPLETATO!")


‚úÖ All checksums are correct! No errors found.

üéâ COMPLETATO!
