In [99]:
#### Preamble ####
# Purpose: Cleans, engineers, and saves the cleaned data 
# Author: Jiazhou(Justin) Bi
# Date: 15 Nov 2024
# Contact: justin.bi@mail.utoronto.ca
# License: None
# Pre-requisites: see requirements.txt
# Any other information needed? None

# Loading the data

In [100]:
import pandas as pd

In [101]:
#Loading the datasets as DataFrame
df_1m = pd.read_parquet('../data/01-raw_data/raw_data_1m.parquet')
df_1h = pd.read_parquet('../data/01-raw_data/raw_data_1h.parquet')
df_1d = pd.read_parquet('../data/01-raw_data/raw_data_1d.parquet')
# print(df_1m.head())
# print(df_1h.head())
# print(df_1d.head())

# Checking for Missing Values

In [102]:
df_1m.isna().sum()

timestamp    0
open         0
high         0
low          0
close        0
volume       0
dtype: int64

In [103]:
df_1h.isna().sum()

timestamp    0
open         0
high         0
low          0
close        0
volume       0
dtype: int64

In [104]:
df_1d.isna().sum()

timestamp    0
open         0
high         0
low          0
close        0
volume       0
dtype: int64

There are no missing values found in these datasets.

# Data Types

This section examines the datatypes of each column and ensures they are appropriate for the analysis.

In [105]:
df_1m.dtypes

timestamp    datetime64[ns]
open                float64
high                float64
low                 float64
close               float64
volume              float64
dtype: object

In [106]:
df_1h.dtypes

timestamp    datetime64[ns]
open                float64
high                float64
low                 float64
close               float64
volume              float64
dtype: object

In [107]:
df_1d.dtypes

timestamp    datetime64[ns]
open                float64
high                float64
low                 float64
close               float64
volume              float64
dtype: object

# Data Validation

This section is to validate if the datasets are consecutive. That is, for the 1-minute timestamp dataset, no minutes should be skipped. Same logic appleis to the 1-hour and 1-day timestamp datasets.

## Validating if any minute is missing

In [108]:
# creating a full range of minutes for the 1-minute dataset
full_minute_range = pd.date_range(start=df_1m['timestamp'].min(), end=df_1m['timestamp'].max(), freq='1T')
# print(full_minute_range)

  full_minute_range = pd.date_range(start=df_1m['timestamp'].min(), end=df_1m['timestamp'].max(), freq='1T')


In [109]:
# reindexing the DataFrame
df_1m.set_index('timestamp', inplace=True)
df_1m = df_1m.reindex(full_minute_range)

In [110]:
missing_timestamps = full_minute_range.difference(df_1m.index)
print("Missing timestamps:", missing_timestamps)

Missing timestamps: DatetimeIndex([], dtype='datetime64[ns]', freq='min')


In [111]:
missing_rows = pd.DataFrame(index=missing_timestamps, columns=df_1m.columns)
missing_rows.reset_index(inplace=True)
missing_rows.rename(columns={'index': 'timestamp'}, inplace=True)
print("Missing rows:")
print(missing_rows)

Missing rows:
Empty DataFrame
Columns: [timestamp, open, high, low, close, volume]
Index: []


In [112]:
if set(full_minute_range) == set(df_1m.index):
    print("PASS: All timestamps match.")
else:
    print("FAIL: There are mismatched timestamps.")

PASS: All timestamps match.


## Validating if any hour is missing from the 1-hour dataset

In [113]:
# using the same logic as above
full_hour_range = pd.date_range(start=df_1h['timestamp'].min(), end=df_1h['timestamp'].max(), freq='1h')
df_1h.set_index('timestamp', inplace=True)
df_1h = df_1h.reindex(full_hour_range)
missing_timestamps_hour = full_hour_range.difference(df_1h.index)
print("Missing timestamps:", missing_timestamps_hour)

Missing timestamps: DatetimeIndex([], dtype='datetime64[ns]', freq='h')


In [114]:
missing_rows_hour = pd.DataFrame(index=missing_timestamps_hour, columns=df_1h.columns)
missing_rows_hour.reset_index(inplace=True)
missing_rows_hour.rename(columns={'index': 'timestamp'}, inplace=True)
print("Missing rows:")
print(missing_rows_hour)

if set(full_hour_range) == set(df_1h.index):
    print("PASS: All timestamps match.")
else:
    print("FAIL: There are mismatched timestamps.")

Missing rows:
Empty DataFrame
Columns: [timestamp, open, high, low, close, volume]
Index: []
PASS: All timestamps match.


## Validating if any day is missing from the 1-day dataset

In [115]:
# using the same logic as above
full_day_range = pd.date_range(start=df_1d['timestamp'].min(), end=df_1d['timestamp'].max(), freq='1d')
df_1d.set_index('timestamp', inplace=True)
df_1d = df_1d.reindex(full_day_range)
missing_timestamps_day = full_day_range.difference(df_1d.index)
print("Missing timestamps:", missing_timestamps_day)

missing_rows_day = pd.DataFrame(index=missing_timestamps_day, columns=df_1d.columns)
missing_rows_day.reset_index(inplace=True)
missing_rows_day.rename(columns={'index': 'timestamp'}, inplace=True)
print("Missing rows:")
print(missing_rows_day)

if set(full_day_range) == set(df_1d.index):
    print("PASS: All timestamps match.")
else:
    print("FAIL: There are mismatched timestamps.")

Missing timestamps: DatetimeIndex([], dtype='datetime64[ns]', freq='D')
Missing rows:
Empty DataFrame
Columns: [timestamp, open, high, low, close, volume]
Index: []
PASS: All timestamps match.


# Adding A Column For Price Change Direction

This subsection creates a new column for each dataset called direction. If the closing price is higher than the previous closing price, it is considered that the price has gone up and thus marked as 1 for appreciation. If the closing price is lower than the previous closing price, it is considered that the price has gone down and hence is marked as -1 for depreciation. If the price remains the same,  it is marked as 0 for no movement.

In [None]:
# Calculting the direction
df_1m['direction'] = df_1m['close'].diff().apply(lambda x: 1 if x > 0 else (-1 if x < 0 else 0))

# Dropping the first row as it does not have a direction
df_1m.reset_index(inplace=True)
df_1m = df_1m.iloc[1:].reset_index(drop=True)
#df_1m.head()

In [119]:
# applying the same logic to the 1-hour and 1-day datasets
df_1h['direction'] = df_1h['close'].diff().apply(lambda x: 1 if x > 0 else (-1 if x < 0 else 0))
df_1h.reset_index(inplace=True)
df_1h = df_1h.iloc[1:].reset_index(drop=True)

df_1d['direction'] = df_1d['close'].diff().apply(lambda x: 1 if x > 0 else (-1 if x < 0 else 0))
df_1d.reset_index(inplace=True)
df_1d = df_1d.iloc[1:].reset_index(drop=True)

# Saving the DataFrame as a parquet file

In [120]:
df_1m.to_parquet('../data/02-analysis_data/cleaned_data_1m.parquet', index=False)
df_1h.to_parquet('../data/02-analysis_data/cleaned_data_1h.parquet', index=False)
df_1d.to_parquet('../data/02-analysis_data/cleaned_data_1d.parquet', index=False)