In [1]:
from config import settings
from torchdata.datapipes import iter as it
from matplotlib import pyplot as plt
import numpy as np
import torch
from src.data_preprocessing.utils import (
    is_path_excluded,
    extract_turbine_name,
    add_interval_start_dask,
    extract_fs,
    filter_based_on_len,
    group_data,
    collate_data,
    convert_dict_to_tuples,
    )
import dask.dataframe as dd
from functools import partial
from src.data_preprocessing import preprocessing_function as pf
FS = 31.25

# Initialize the FileLister DataPipe
dp_filename = it.FileLister(root=settings.path.raw, recursive=True, masks='*.parquet.gzip')

# Apply the filter to exclude paths containing 'Trash'
func_filter = partial(is_path_excluded, exclude_words='Trash')
dp_filtered = dp_filename.filter(func_filter)
func_filter = partial(is_path_excluded, exclude_words='wierddata')
dp_filtered = dp_filtered.filter(func_filter)


# Fork the filtered DataPipe into two separate DataPipes
dp_forked = dp_filtered.fork(num_instances=2)

# Create dp_turbine_name from the first fork
dp_turbine_name = dp_forked[0].map(extract_turbine_name)

# Create dp_file from the second fork
dp_file = dp_forked[1].map(dd.read_parquet)
dp_file = dp_file.map(add_interval_start_dask)
dp_file = dp_file.map(lambda x: x.compute())

# Apply filtering based on length
filter_based_on_len_partial = partial(filter_based_on_len, required_len=FS*60*60)
dp_file = dp_file.filter(filter_based_on_len_partial)

# Combine the two DataPipes using Zipper
dp_file_all = it.Zipper(dp_file, dp_turbine_name)
dp_file_all = dp_file_all.map(group_data)

# Batch the combined DataPipe
dp = dp_file_all.batch(50)
dp = dp.map(collate_data)
processing_module = pf.ParallelModule([
    pf.Welch(n_fft=1024),
    pf.RMS(),
    pf.RollingAverage(window_size=int(FS/2)),
    pf.Range(),
    pf.Mean(),
])
def process_data(data):
    shape = data['signal'].shape
    process_data = processing_module(torch.from_numpy(data['signal'].reshape(-1, shape[-1])))
    for key in process_data.keys():
        process_data[key] = process_data[key].numpy().reshape(*shape[:-1],-1)
    data.update(process_data)
    data.pop('signal')
    return data

def reshape_data(data):    
    data = {k: v.reshape(v.shape[0]*v.shape[1],*v.shape[2:]) for k,v in data.items()}
    return data

def unravel_sensor_name(data):
    sensor_axes = data['sensor_name'][0]
    new_data = {}
    for transform in processing_module.module_dict.keys():
        for i, sensor in enumerate(sensor_axes):
            new_data[f'{transform}_{sensor}'] = data[transform][...,i,:]
    new_data['timestamp'] = data['timestamp']
    new_data['turbine_name'] = data['turbine_name']
    return new_data
        
def array_to_bytes(data, columns:list):
    for column in columns:
        data_col = data[column]
        data_col = data_col.astype(np.float32)
        data_col = [arr.tobytes() for arr in data_col]
        data[column] = data_col
    rest_columns = list(set(data.keys())-set(columns))
    for column in rest_columns:
        data[column] = data[column].flatten().tolist()
    return data

columns = ['Welch','RollingAverage']
columns = [f'{transform}_{sensor}' for transform in columns for sensor in ['X','Y','Z']]
array_to_bytes_partial = partial(array_to_bytes, columns=columns)


dp = dp.map(process_data)
dp = dp.map(reshape_data)
dp = dp.map(unravel_sensor_name)
dp = dp.map(array_to_bytes_partial)



################################################################################
The 'datapipes', 'dataloader2' modules are deprecated and will be removed in a
future torchdata release! Please see https://github.com/pytorch/data/issues/1196
to learn more and leave feedback.
################################################################################

Dask dataframe query planning is disabled because dask-expr is not installed.

You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.



In [14]:
import sqlite3 
from pathlib import Path
from src.data_preprocessing import queries 

dir_database = Path(settings.path.processed)
dir_database.mkdir(exist_ok=True, parents=True)
path_database = dir_database / 'norther.db'
conn = sqlite3.connect(path_database)
c = conn.cursor()
c.execute(queries.CREATE_PROCCESSED_DATA_TABLE)
c.execute(queries.CREATE_METADATA_TABLE)


<sqlite3.Cursor at 0x783f4239bc40>

In [15]:
for record in dp:
    record = convert_dict_to_tuples(record,queries.ORDERED_COLUMNS_PROCESSED)
    
    c.executemany(queries.INSERT_PROCESSED_DATA, record) 
    print('Record inserted')
conn.commit()

     
# Commit the changes and close the connection


Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record inserted
Record i

In [16]:
frequency_axis = processing_module.module_dict['Welch'].get_frequency_axis(FS)
frequency_axis = frequency_axis.numpy()
frequency_axis = np.asarray(frequency_axis, dtype=np.float32).tobytes()
metadata = {
    'frequency_axis': [frequency_axis],
    'sample_rate': [FS],
    'window_size': ['10min'],
    'processing_method': [processing_module.__str__()]
}
record = convert_dict_to_tuples(metadata,['frequency_axis', 'sample_rate', 'window_size', 'processing_method'])

conn.execute(queries.CREATE_METADATA_TABLE)
conn.executemany(queries.INSERT_METADATA, record)
conn.commit()
conn.close()