In [1]:
import mne
from netCDF4 import Dataset
import json
import dask.array as da
from dask import delayed
import os, logging
from services.utils.timing import TimingContext
import pyarrow as pa
from services.delta_lake import Duck_Lake
from services.utils.directory_utils import get_tmpdir
from prefect import flow, task
from prefect_dask import DaskTaskRunner
from dataclasses import dataclass, asdict
from typing import List
from uuid import uuid4 as uuid

logging.basicConfig()
logging.root.setLevel(logging.INFO)

ducklake = Duck_Lake()

In [11]:
my_edf_file_path = os.path.join(
    os.environ["CONTAINER_FILE_STORAGE_PATH"],
    "test33_HypoactiveHeidi_05_DAY1_PROCESSED.edf",
)
my_parquet_output_dir = os.path.join(os.environ["CONTAINER_FILE_STORAGE_PATH"], "test")

# Non-EEG
misc_channels = [
    "pitch",
    "roll",
    "heading",
    "GyrZ",
    "MagZ",
    "Tag_On",
    "Depth",
    "MagX",
    "MagY",
]

@dataclass
class SignalSchema:
    signal_name: str
    frequency: float
    start_time: float
    data: List[float]

@task
def read_signal(
    edf_file_path,
    signal_name,
    mode: str = "SINGLE"
):
    """Function to read a single signal from an EDF file."""
    raw = mne.io.read_raw_edf(edf_file_path, include=[signal_name], preload=False)
    signal = raw.pick(signal_name).get_data()
    
    if mode == 'SINGLE':
        return SignalSchema(
            signal_name=[signal_name],
            frequency=[raw.info["sfreq"]],
            start_time=[pa.scalar(raw.info["meas_date"].timestamp(), type=pa.timestamp('us', tz="UTC"))],
            data=[signal[0]],
        )
    if mode == "TABLE":
        return SignalSchema(
            signal_name=signal_name,
            frequency=raw.info["sfreq"],
            start_time=pa.scalar(raw.info["meas_date"].timestamp(), type=pa.timestamp('us', tz="UTC")),
            data=signal[0],
        )
        


@flow
def process_edf(
    edf_file_path: str,
    schema: pa.schema
):
    with TimingContext("EDF Read"):
        buff = []
        tmpdir = get_tmpdir()
        for signal_name in ['pitch', ]:
        # for signal_name in raw.ch_names[0:2]:
            signal = read_signal(edf_file_path, signal_name)
            table = pa.table(asdict(signal), schema=schema)
            fname = os.path.join(tmpdir, f'{signal_name}.parquet')
            pa.parquet.write_table(table, fname)
            buff.append(fname)
        
        ducklake.write_parquet_to_delta(
            parquet_files=buff,
            schema=schema,
            mode="append",
            partition_by=['signal_name'],
            name="test",
            description="test"
        )


schema = pa.schema(
    [
        pa.field("signal_name", pa.string()),
        pa.field("frequency", pa.float64()),
        pa.field("start_time", pa.timestamp('us', tz="UTC")),
        pa.field("data", pa.list_(pa.float64())),
    ]
)

with TimingContext("Main"):
    process_edf(my_edf_file_path, schema)


  warn(
  warn(


Extracting EDF parameters from /data/files/test33_HypoactiveHeidi_05_DAY1_PROCESSED.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...


  raw = mne.io.read_raw_edf(edf_file_path, preload=False)
  value = np.nanmax([_prefilter_float(x) for x in values])
  raw = mne.io.read_raw_edf(edf_file_path, preload=False)
  value = np.nanmin([_prefilter_float(x) for x in values])


Extracting EDF parameters from /data/files/test33_HypoactiveHeidi_05_DAY1_PROCESSED.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...


In [9]:
print(os.environ["CONTAINER_DELTA_LAKE_PATH"])

ducklake.conn.execute(
    f'''
    SELECT data 
    FROM delta_scan('{os.environ["CONTAINER_DELTA_LAKE_PATH"]}') LIMIT 1
    '''
).pl()

/data/delta-lake


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

data
list[f64]
"[-0.000031, -0.000033, … 0.000175]"
