In [1]:
!conda activate DSU-FIN
import polars as pl
import os
import numpy as np
import gc
from sklearn.preprocessing import StandardScaler
if not os.getcwd() == 'e:\\python_projects\\notebooks\\Deep learning\\FIN':
    os.chdir('e:\\python_projects\\notebooks\\Deep learning\\FIN')

In [None]:
data_root = os.getcwd() + '\\OD-TINA\\data\\'
files = []

for (a, d, f) in os.walk(data_root):
    files.extend(os.path.join(a, n) for n in f)

schema = pl.scan_parquet(files[0]).collect_schema()
cat_cols = []
feature_columns = []


for k,v in schema.items():
    if 'FEATURE' in k:
        feature_columns.append(k)
    if 'String' in str(v):
        cat_cols.append(k)

categories_pipeline = (
    pl.scan_parquet(files).select(cat_cols)
    )

cat_cols_p = categories_pipeline.collect(streaming=True)
cat_val_dict = {}
for colname in cat_cols:
    cat_val_dict[colname] = sorted(cat_cols_p[colname].unique().to_list())

num_columns = [col for col in feature_columns if col not in cat_cols]

dfs = []
cat_dfs = []
sample_period = '15s'

for f in files:
    ldf = (pl.scan_parquet(f)
           .cast({pl.Float64: pl.Float32})
           .with_columns(pl.from_epoch(pl.col('unix'), time_unit='s'))
           .with_columns(
           *[
            pl.col(col).cast(pl.Enum(categories=cats))
            for col, cats in cat_val_dict.items()
            ]
            )
            .select([pl.col('unix'), pl.all().exclude('unix')])
            .sort('unix')
            .rename({'unix': 'Timestamp'})
            )
    ldf_cat = ldf.select(['Timestamp'] + cat_cols)

    ldf = ldf.group_by_dynamic(
        index_column = 'Timestamp',
        every=sample_period,).agg(pl.col(num_columns).mean().name.prefix("mean_"),
                          )
    df = ldf.collect(streaming = True)
    cat_df =ldf_cat.collect(streaming = True)
    cat_dfs.append(cat_df)
    dfs.append(df)
    

tina_resampled = pl.concat(dfs)
tina_resampled.write_parquet('tina_numeric_resampled.parquet')
del tina_resampled, dfs
gc.collect()

tina_categorical = pl.concat(cat_dfs)
tina_categorical = tina_categorical.to_dummies(tina_categorical.columns[1:])

tina_categorical = tina_categorical.group_by_dynamic(
                                    index_column = 'Timestamp',
                                    every=sample_period,).agg(pl.col(tina_categorical.columns[1:]).max()
                                    )
tina_categorical.write_parquet('tina_categorical_resampled.parquet')
del tina_categorical, cat_dfs
gc.collect()

In [20]:
tina_full_num_lazy = pl.scan_parquet('tina_numeric_resampled.parquet').cast({pl.Float64: pl.Float32})
tina_full_cat_lazy = pl.scan_parquet('tina_categorical_resampled.parquet').cast({pl.Float64: pl.Float32})
data_size = tina_full_num_lazy.select(pl.len()).collect().item()

In [22]:
train_size = int(data_size * .6)
val_size = int(data_size * .2)
test_size = data_size - train_size - val_size

tina_train = tina_full_num_lazy.slice(0, train_size).fill_null(strategy = 'backward').collect().write_parquet('tina_train_15s.parquet')
tina_val = tina_full_num_lazy.slice(train_size, val_size).fill_null(strategy = 'backward').collect().write_parquet('tina_val_15s.parquet')
tina_test = tina_full_num_lazy.slice(train_size + val_size).fill_null(strategy = 'backward').collect().write_parquet('tina_test_15s.parquet')
