In [1]:
import polars as pl
import os

input_folder = '/mnt/h/Studia/magisterskie/1 sem/ProjektSemestralny/train.parquet'
output_folder = '/mnt/h/Studia/magisterskie/1 sem/ProjektSemestralny/csv_partitions'

os.makedirs(output_folder, exist_ok=True)

lag_cols_original = ["date_id", "symbol_id"] + [f"responder_{idx}" for idx in range(9)]
lag_cols_rename = {f"responder_{idx}": f"responder_{idx}_lag_1" for idx in range(9)}

for partition in os.listdir(input_folder):
    partition_path = os.path.join(input_folder, partition)
    if os.path.isdir(partition_path) and partition.startswith('partition_id='):
        parquet_file = os.path.join(partition_path, 'part-0.parquet')
        if os.path.exists(parquet_file):
            try:
                print(f'Loading file: {parquet_file}')

                df = pl.read_parquet(parquet_file)

                df_lags = (
                    df
                    .select(lag_cols_original)
                    .rename(lag_cols_rename)
                    .with_columns(
                        (pl.col("date_id") + 1).alias("date_id")
                    )
                    .group_by(["date_id", "symbol_id"], maintain_order=True)
                    .agg([pl.col(name).last().alias(name) for name in lag_cols_rename.values()])
                )

                df_joined = df.join(df_lags, on=["date_id", "symbol_id"], how="left")

                total_rows = df_joined.height
                validation_split_index = int(total_rows * 0.8)
                test_split_index = int(total_rows * 0.9)

                split_column = ["TRAIN"] * validation_split_index + \
                               ["VALIDATE"] * (test_split_index - validation_split_index) + \
                               ["TEST"] * (total_rows - test_split_index)
                
                df_joined = df_joined.with_columns(pl.Series("split", split_column))

                csv_file = os.path.join(output_folder, f'dataset_{partition}.csv')
                df_joined.write_csv(csv_file)
                print(f'Saved with splits: {csv_file}')

            except Exception as e:
                print(f'Error processing {parquet_file}: {e}')
        else:
            print(f'File {parquet_file} does not exist.')
    else:
        print(f'Skipping unknown directory: {partition}')

print('Conversion of partition_id with lags and manual split completed.')


Loading file: /mnt/h/Studia/magisterskie/1 sem/ProjektSemestralny/train.parquet/partition_id=0/part-0.parquet
Saved with splits: /mnt/h/Studia/magisterskie/1 sem/ProjektSemestralny/csv_partitions/dataset_partition_id=0.csv
Loading file: /mnt/h/Studia/magisterskie/1 sem/ProjektSemestralny/train.parquet/partition_id=1/part-0.parquet
Saved with splits: /mnt/h/Studia/magisterskie/1 sem/ProjektSemestralny/csv_partitions/dataset_partition_id=1.csv
Loading file: /mnt/h/Studia/magisterskie/1 sem/ProjektSemestralny/train.parquet/partition_id=2/part-0.parquet
Saved with splits: /mnt/h/Studia/magisterskie/1 sem/ProjektSemestralny/csv_partitions/dataset_partition_id=2.csv
Loading file: /mnt/h/Studia/magisterskie/1 sem/ProjektSemestralny/train.parquet/partition_id=3/part-0.parquet
Saved with splits: /mnt/h/Studia/magisterskie/1 sem/ProjektSemestralny/csv_partitions/dataset_partition_id=3.csv
Loading file: /mnt/h/Studia/magisterskie/1 sem/ProjektSemestralny/train.parquet/partition_id=4/part-0.parque