In [1]:
import json, gzip, glob
import pandas as pd
from pathlib import Path
from dataclasses import dataclass
from datetime import datetime
from enum import StrEnum, auto
from typing import List

In [2]:
NAME = Path('../../data/cable_clips/')
SENSOR_TSV_FILE = next(NAME.glob('*.tsv.gz')) 

# Load sensordata

In [5]:
class SensorType(StrEnum):
    ACCELEROMETER = 'A'
    MAGNETOMETER = 'M'
    GYROSCOPE = 'G'
    GAS = 'C',
    HEATER = 'H',
    EXTRUDER = 'E',
    MOTION = 'M'

@dataclass
class IMUSensorData:
    sensor_type: SensorType
    datetime: datetime
    x: float
    y: float
    z: float

@dataclass
class GasSensorData:
    sensor_type: SensorType
    datetime: datetime
    value: int
sensor_data: List[IMUSensorData | GasSensorData] = list()
index = None
with gzip.open(SENSOR_TSV_FILE, 'rb') as f:
    for line in f:
        data = line.decode('ascii')
        data = data.split('\t')
        datetime_ = data[0]
        datetime_ = datetime.fromisoformat(datetime_)
        
        sensor_type = SensorType(data[1])
        
        if sensor_type in {SensorType.ACCELEROMETER, SensorType.MAGNETOMETER, SensorType.GYROSCOPE}:
            sensor_data.append(
                IMUSensorData(
                    sensor_type=sensor_type,
                    datetime=datetime_,
                    x = float(data[2]),
                    y = float(data[3]),
                    z = float(data[4])
                )
            )
            pass
        elif sensor_type == SensorType.GAS:
            sensor_data.append(
                GasSensorData(
                    sensor_type=sensor_type,
                    datetime=datetime_,
                    value=float(data[2])
                )
            )
        else:
            raise Exception("Invalid sensor type")

# Load printer_data

In [19]:
DATA_NAME = next(NAME.glob('*_klipper.json.gz'))
DATA_NAME = DATA_NAME.name[:-len('.json.gz')]

In [21]:
index = None
with gzip.open(f'{NAME / DATA_NAME}.index.gz', 'rb') as f:
    index = f.read()
    index = index.split('\x03'.encode('ascii'))
    index = filter(lambda i: len(i) > 0, index)
    index = [json.loads(i) for i in index]

In [22]:
data = None
with gzip.open(f'{NAME / DATA_NAME}.json.gz', 'rb') as f:
    data = f.read()
    data = data.split('\x03'.encode('ascii'))
    data = filter(lambda i: len(i) > 0, data)
    data = [json.loads(i) for i in data]

In [24]:
def get_field(field_name: str, data):
    def _f(row):
        if 'q' not in row or row['q'] != 'status':
            return False
        return field_name in row['params']['status']

    def _m(row):
        return row['params']['eventtime'], row['params']['status'][field_name]
    
    
    return list(map(_m, filter(_f, data)))

In [None]:
@dataclass
class TemperatureSensorData:
    sensor_type: SensorType
    datetime: datetime
    temperature: float

@dataclass
class TargetData:
    sensor_type: SensorType
    datetime: datetime
    target: float

@dataclass
class PowerSensorData:
    sensor_type: SensorType
    datetime: datetime
    power: float

In [26]:
@dataclass
class MotionSensorData:
    sensor_type: SensorType
    datetime: datetime
    x: float
    y: float
    z: float
    velocity: float

@dataclass
class MotionTargetData:
    sensor_type: SensorType
    datetime: datetime
    x: float
    y: float
    z: float