In [None]:
import pandas as pd
import numpy as np
import matplotlib as mpl
import dask.dataframe as dd
from tsfresh import extract_relevant_features
from tsfresh.utilities.dataframe_functions import roll_time_series, make_forecasting_frame
from sklearn.preprocessing import StandardScaler

mpl.rcParams['figure.figsize'] = (10, 8)
mpl.rcParams['axes.grid'] = False

%load_ext line_profiler

In [None]:
eurusd_ask_file_path = r'/root/workspace/Autoformer/dataset/forex/EURUSD_Candlestick_1_M_ASK_07.07.2020-07.07.2023.csv'
eurusd_bid_file_path = r'/root/workspace/Autoformer/dataset/forex/EURUSD_Candlestick_1_M_BID_07.07.2020-07.07.2023.csv'

# Read CSV files into Dask DataFrames
eurusd_ask_dataset = dd.read_csv(eurusd_ask_file_path)
eurusd_bid_dataset = dd.read_csv(eurusd_bid_file_path)

# Convert 'Gmt time' columns to datetime
eurusd_ask_dataset['Gmt time'] = dd.to_datetime(eurusd_ask_dataset['Gmt time'], format='%d.%m.%Y %H:%M:%S.%f')
eurusd_bid_dataset['Gmt time'] = dd.to_datetime(eurusd_bid_dataset['Gmt time'], format='%d.%m.%Y %H:%M:%S.%f')

# Rename columns
ask_rename_dict = {col: f'eurusd_ask_{col}' for col in eurusd_ask_dataset.columns if col != 'Gmt time'}
eurusd_ask_dataset = eurusd_ask_dataset.rename(columns=ask_rename_dict)

bid_rename_dict = {col: f'eurusd_bid_{col}' for col in eurusd_bid_dataset.columns if col != 'Gmt time'}
eurusd_bid_dataset = eurusd_bid_dataset.rename(columns=bid_rename_dict)

# Merge datasets
eurusd_dataset = dd.merge(eurusd_ask_dataset, eurusd_bid_dataset, left_on='Gmt time', right_on='Gmt time', how='inner')

# Filtering by weekday
eurusd_dataset = eurusd_dataset[eurusd_dataset['Gmt time'].dt.weekday < 5]

# Linearly interpolate missing values using Dask's map_partitions with pandas interpolate
eurusd_dataset = eurusd_dataset.map_partitions(lambda df: df.interpolate(method='linear'))

# Drop 'Gmt time' and set 'Gmt time' as index
eurusd_dataset = eurusd_dataset.set_index('Gmt time')

# Create the scaler
scaler = StandardScaler()

# Fit the scaler to your data and transform it
eurusd_dataset = eurusd_dataset.persist()
normalized_data = scaler.fit_transform(eurusd_dataset)

# Convert back to Dask DataFrame
normalized_dataset = dd.from_pandas(pd.DataFrame(normalized_data, columns=eurusd_dataset.columns), npartitions=eurusd_dataset.npartitions)

# Reset index and add a constant 'id' column
normalized_dataset = normalized_dataset.reset_index()
normalized_dataset['id'] = 0

# Convert to pandas DataFrame if needed
normalized_dataset = normalized_dataset.compute()

# Display the final dataset
print(normalized_dataset)

In [None]:
# Assuming your dataset has 'date' as the time column and 'id' as an identifier
# (based on previous interactions)
df_rolled = roll_time_series(normalized_dataset, column_id="id", column_sort="index", max_timeshift=20, min_timeshift=5, rolling_direction=1, chunksize=50000)

In [None]:
# Now, extract features from this rolled data:
df_features = extract_relevant_features(df_rolled, column_id="id", column_sort="date")

# If you want to create a target vector for forecasting:
# (Replace 'price_column' with whatever column you're trying to forecast)
df_shift, y = make_forecasting_frame(dataset["eurusd_ask_Close"], kind="price", max_timeshift=20, rolling_direction=1)

In [None]:
num = 0 
for i in df_shift.columns:
    num += 1

print(num)

In [None]:
df_shift.to_csv('/root/workspace/Autoformer/dataset/forex/Preprocessed_EURUSD_Candlestick_1_M_ASKBID_07.07.2020-07.07.2023.csv')