In [3]:
from asammdf import MDF, Signal
from asammdf.blocks.types import DbcFileType, BusType, StrPath
from pathlib import Path
import requests
from datetime import datetime, timedelta
from time import sleep
import json
from collections.abc import Iterable
from collections import defaultdict
import cantools
import numpy as np

d65_onedrive_files = Path(
    r"/mnt/c/Users/CARAL/Epiroc/Rig Crew - Private - General/5. Testing/4. Lafarge Field Trial/D65 Mine Site Testing Software Documentation/CANEdge"
)
_path = d65_onedrive_files / "upper"
files = list(_path.rglob('*.[mM][fF]4'))
print(f'Found {len(files)} MF4 files in the current directory and subdirectories.')

Found 337 MF4 files in the current directory and subdirectories.


In [4]:
def get_dbc_files(directory: Path | str) -> list[StrPath]:
    """
    Get all DBC files in the specified directory.
    """

    if not isinstance(directory, Path):
        directory = Path(directory)

    return list(directory.rglob("*.[dD][bB][cC]"))

def get_dbc_dict(directory: Path | str) -> dict[BusType, Iterable[DbcFileType]]:
    """
    Get a dictionary of DBC files in the specified directory.
    This dictionary can be passed directly to extract_bus_logging() in asammdf.
    """

    dbc_files = get_dbc_files(directory)
    return {"CAN": [(file, 0) for file in dbc_files]}

def get_channel_data(signal: Signal) -> tuple[str, int, str]:
    display_names = list(signal.display_names.keys())
    message = display_names[1].split(".")[0]
    can_id = display_names[2].split(" ")[0].split("ID=")[1]
    name = signal.name.replace(" ", "_")
    return message, int(can_id, 16), name


In [8]:
dbc_dict = get_dbc_files(Path('../files/dbc/upper'))
dbc_dict

[PosixPath('../files/dbc/upper/d65_brightloops.dbc'),
 PosixPath('../files/dbc/upper/D65_CH5_CM.dbc'),
 PosixPath('../files/dbc/upper/D65_CH6_EVCC.dbc')]

In [14]:
for f in files[:3]:
    mdf = MDF(f, process_bus_logging=False, progress=processing)
    print(f" => Processing {f} complete", flush=True)
    local_date = mdf.start_time.astimezone(datetime.now().astimezone().tzinfo).date()
    local_date_str = local_date.strftime('%Y_%m_%d')
    if local_date_str not in file_dates:
        file_dates[local_date_str] = []
    file_dates[local_date_str].append(mdf)

 => Processing ../files/upper/data/00001085/00000004.MF4 complete
 => Processing ../files/upper/data/00001085/00000005.MF4 complete
 => Processing ../files/upper/data/00001085/00000001.MF4 complete


In [6]:
concatenated = MDF().concatenate(list(file_dates.values())[0], process_bus_logging=False)
decoded = concatenated.extract_bus_logging(database_files=get_dbc_dict('../files/upper'))

In [65]:
mdf = MDF(files[0])
mdf.search('AC_MotorSpeed')

['CAN1.WP120_AC_EMP__MotorCommand.AC_MotorSpeed',
 'WP120_AC_EMP__MotorCommand.AC_MotorSpeed',
 'CAN1.CAN_DataFrame.ID=0x18EF8A00 EXT=True.AC_MotorSpeed',
 'AC_MotorSpeed',
 'CAN1.WP120_AC_EMP__MotorStatus2.AC_MotorSpeed',
 'WP120_AC_EMP__MotorStatus2.AC_MotorSpeed',
 'CAN1.CAN_DataFrame.ID=0x18FF238A EXT=True.AC_MotorSpeed']

In [74]:
sig = list(mdf.iter_channels())[23]

In [75]:
message, can_id, metric_name = get_channel_data(sig)
start_ts = mdf.start_time + timedelta(seconds=sig.timestamps[0])
end_ts = mdf.start_time + timedelta(seconds=sig.timestamps[-1])

params = {
    # "query": '{message="HYD_Command_Message"}',
    # "time": str(start_ts.timestamp()),
    # "step": "10m",
    # "start": str(start_ts.timestamp()),
    # "end": str(end_ts.timestamp()),
    # "step": f"{(sig.timestamps[-1] - sig.timestamps[0])/3:0f}s",
    "match[]": f'{metric_name}{{message="{message}"}}',
    "start": str(start_ts.timestamp()),
    "end": str(end_ts.timestamp()),
    "step": f"{10}s",
}
print(params)
resp = requests.get('http://localhost:8428/api/v1/export', params=params, timeout=10)
print(f'Status code: {resp.status_code}')
print(f'Response text: {resp.text[:100]}') 
# if resp.status_code == 200:
#     _json = json.loads(resp.text)
#     series_fetched = int(_json['stats']['seriesFetched'])
#     print(f"seriesFetched: {series_fetched}")
#     if series_fetched > 0:
#        print(_json['data']['result'])

{'match[]': 'DCDC2_Set_index_byte{message="DCDC2_Setpoints"}', 'start': '1750518069.66005', 'end': '1750525045.029', 'step': '10s'}
Status code: 200
Response text: {"metric":{"__name__":"DCDC2_Set_index_byte","can_id":"18EFB101","message":"DCDC2_Setpoints"},"value


In [76]:
_json = json.loads(resp.text)
print(f"Start at {datetime.fromtimestamp(1e-3*_json['timestamps'][0])} and end at {datetime.fromtimestamp(1e-3*_json['timestamps'][-1])}")

Start at 2025-06-21 08:01:09.660000 and end at 2025-06-21 09:57:25.029000


In [77]:
resp.text.strip()

'{"metric":{"__name__":"DCDC2_Set_index_byte","can_id":"18EFB101","message":"DCDC2_Setpoints"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0,1,0,0,1,0,1,2,2,2,2,2,2,2,2,2,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,2,2,2,2,2,2,2,2,2,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,2,2,2,2,2,2,2,2,2,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,2,2,2,2,2,2,2,2,2,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,2,2,2,2,2,2,2,2,2,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,2,2,2,2,2,2,2,2,2,2,0,0,0,0,0,0,0,0

In [78]:

respstart_ts = datetime.fromtimestamp(1e-3*_json['timestamps'][0], tz=mdf.start_time.tzinfo)
respend_ts = datetime.fromtimestamp(1e-3*_json['timestamps'][-1], tz=mdf.start_time.tzinfo)

cutstart = (respstart_ts - mdf.start_time).total_seconds()
cutend = (respend_ts - mdf.start_time).total_seconds()

print(f'Sig has {len(sig.timestamps)} timestamps, starts at {start_ts} and ends at {end_ts}')
print(f'Response has {len(_json["timestamps"])} timestamps, starts at {respstart_ts} and ends at {respend_ts}')
print(f'Cutting from {cutstart} to {cutend} seconds')
older_data = sig.cut(sig.timestamps[-1]+0.001, cutstart-0.001)
newer_data = sig.cut(cutend+0.001, sig.timestamps[-1]-0.001)
newsig = older_data.extend(newer_data)

if len(newsig.timestamps):
    print(f'Sig has {len(newsig.timestamps)} timestamps, starts at {newsig.timestamps[0]} and ends at {newsig.timestamps[-1]}')
else:
    print('No data left after cutting')


Sig has 96392 timestamps, starts at 2025-06-21 15:01:09.660050+00:00 and ends at 2025-06-21 16:57:25.029000+00:00
Response has 340989 timestamps, starts at 2025-06-21 15:01:09.660000+00:00 and ends at 2025-06-21 16:57:25.029000+00:00
Cutting from 1.66 to 6977.029 seconds
No data left after cutting


In [None]:
newsig = sig.cut(-10,200)

newsig.timestamps

array([172.97305, 174.97235, 175.9734 , 176.9724 , 177.97275, 178.97305,
       179.9719 , 180.97235, 181.97135, 182.97155, 183.9706 , 184.9705 ,
       185.9704 , 186.9702 , 187.97025, 188.96945, 189.96925, 190.97   ,
       191.96915, 192.96855, 193.96865, 194.96875, 195.96775, 196.9681 ,
       197.9682 , 198.967  , 199.96725, 200.     ])

In [24]:
sig.timestamps[:3]

array([172.97305, 174.97235, 175.9734 ])

In [None]:
def get_cantools_databases(files: str | Path | list[str | Path]) -> list[cantools.db.Database]:
    """
    Load and return a list of DBC databases from the configured paths.
    """
    db_paths: list[Path] = []
    if isinstance(files, str | Path):
        if Path(files).is_dir():
            db_paths.extend(list(Path(files).rglob("*.[dD][bB][cC]")))
        elif Path(files).is_file() and Path(files).suffix.lower() == '.dbc':
            db_paths.append(Path(files))
    elif isinstance(files, list):
        for db in files:
            if Path(db).is_dir():
                db_paths.extend(list(Path(db).rglob("*.[dD][bB][cC]")))
            else:
                db_paths.append(Path(db))

    databases: list[cantools.db.Database] = []
    for db in db_paths:
        if not db.exists():
            print(f"⚠️ DBC file {db} does not exist, skipping.")
            continue
        try:
            _db = cantools.db.load_file(db)
            if isinstance(_db, cantools.db.Database):
                databases.append(_db)
        except Exception as e:
            print(f"⚠️ Error loading DBC file {db}: {e}")
    return databases

In [12]:
dbc = get_cantools_databases('../files/dbc/upper')
print(f'Loaded {len(dbc)} DBC databases.')

Loaded 3 DBC databases.


In [16]:
mdf = MDF(files[0], process_bus_logging=False)

In [17]:
channels = [ch for ch in mdf.iter_channels() if len(ch.timestamps)]

In [18]:
len(channels)

3

In [43]:
n = 0
for ch in channels:
    for _id, _bytes in zip(ch.samples['CAN_DataFrame.ID'].astype(int), ch.samples['CAN_DataFrame.DataBytes'].astype(bytes)):
        for db in dbc:
            try:
                decoded = db.decode_message(_id, _bytes)
                print(f"Decoded message with ID {_id:X}: {decoded}")
            except Exception as e:
                # print(f"Error decoding message with ID {_id:X}: {e}")
                continue

In [49]:
bytes(channels[1].samples['CAN_DataFrame.DataBytes'][0])

b'\x00\x00\x00\x00\x03\x07\xdc^'

In [57]:
filtered_samples = [smp for smp in channels[1].samples if int(smp['CAN_DataFrame.ID']) == 0x1002]

In [60]:
for _id, _bytes in zip(channels[1].samples['CAN_DataFrame.ID'][:10], channels[1].samples['CAN_DataFrame.DataBytes']):
    for db in dbc:
        try:
            decoded = db.decode_message(int(_id), bytes(_bytes))
            print(f"Decoded message with ID {_id:X}")
            for key, value in decoded.items():
                print(f"  {key}: {value}")
        except Exception as e:
            # print(f"Error decoding message with ID {smp['CAN_DataFrame.ID']}: {e}")
            continue

Decoded message with ID 18FF2182
  contactor_voltage: 0
  link_voltage: 0
  isolation_measurement_status: SNA
  plug_lock_permission: Not_allowed
  plug_unlock_permission: Not_allowed
  charge_permission: Not_allowed
  contactor_status: SNA
  stop_charge: Pressed
  charge_selection: PrimaryNode
  renegotiate: Not_requested
  pwm_charge_mode_request: PwmChargeModeRequestC
  cnt: 13
  crc: 94
Decoded message with ID 18FF1780
  isolation_measurement_request: Requested
  contactor_request: ForceOpen
  charge_selection_ack: SNA
  charging_status: StopCharging
  mobilization_status: Vehicle_mobile
  state_charge_from_vehicle: VALID_OK
  cnt: 9
  crc: 187
Decoded message with ID 18FF1180
  ptc_1_raw: 1545
  ptc_1_resistance: 1082
  ptc_1_temperature: 21
  ptc_1_self_diagnostic_status: OK
Decoded message with ID 18FF2182
  contactor_voltage: 0
  link_voltage: 0
  isolation_measurement_status: SNA
  plug_lock_permission: Not_allowed
  plug_unlock_permission: Not_allowed
  charge_permission: Not

In [38]:
dbc[0].get_message_by_frame_id(0x1002)

message('BL1_Node2Status', 0x1002, True, 8, {None: 'Overal HVHV Status & Diagnostic'})