# 1) Data Fetching

## 1.1) Installing & Importing Necessary Packages

In [1]:
import pandas as pd
import polars as pl
import numpy as np
import re, glob, requests, os
import datetime as dt
from datetime import datetime, timedelta
from io import StringIO
from urllib.parse import urlparse
from typing import List, Callable, Union, Dict, Tuple


## 1.2) Fetching MT5 Data

To fetch a market's OHLCV data from your broker in Meta Trader 5, you should do the following:

* Open MT5
* Go to `View > Symbols`
* Select your target symbol from the list of symbols
* Select the `Bars` section
* Choose your desired datetime range and timeframe
* Click on `Request` to get the data
* Click on `Export Bars` to download the CSV file

> Note that you can also fetch a market's tick data from the `Ticks` section.

> You should also note that you won't always be able to fetch the data you intend for not all brokers give a market's tick or intraday data's access to a user for a long datetime range.

## 1.3) Uploading Data in Google Drive

After exporting the CSV file from MT5, you should:

* Upload it to your Google Drive
* Manage its accessibility settings so that everyone can access it
* Take its access URL and use it in the following section

## 1.4) Downloading from Google Drive

In [2]:
def download_data(data_url):
    """
    Downloads the data file from a given Google Drive access URL.

    Args:
        data_url: The URL of the file to download.

    Returns:
        The downloaded data file content.
    """

    parsed_url = urlparse(data_url)
    file_id = parsed_url.path.split('/')[3]

    download_url = f"https://drive.google.com/uc?id={file_id}&export=download"

    try:
        response = requests.get(download_url)
        response.raise_for_status()

        return response.content
    except requests.exceptions.RequestException as e:
        print(f"Error downloading file: {e}")

        return None


In [166]:
symbol_name = "BTCUSD" #@param ["XAUUSD", "US30", "US100", "EURUSD", "GBPUSD", "USDJPY", "BTCUSD"]

if symbol_name == "XAUUSD":
    data_url = "https://drive.google.com/file/d/19Mpn_v62wgH9j9mIe8rLwM0xhTnpKMHg/view?usp=drive_link"
elif symbol_name == "US30":
    data_url = "https://drive.google.com/file/d/1T4l-UZvsY8uDjZc-oa2p1XDbtpyJDdJC/view?usp=drive_link"
elif symbol_name == "US100":
    data_url = "https://drive.google.com/file/d/11JUZysogbvJNR5vEO9KbaTFw5uLXu7_7/view?usp=drive_link"
elif symbol_name == "EURUSD":
    data_url = "https://drive.google.com/file/d/1bRN-_E4OMx4v13gJo0jmND0tHE0pe7JM/view?usp=drive_link"
elif symbol_name == "GBPUSD":
    data_url = "https://drive.google.com/file/d/1mCoU-v9KbqcdKwtTR2rgzdXEhzPrjk6R/view?usp=drive_link"
elif symbol_name == "USDJPY":
    data_url = "https://drive.google.com/file/d/1bpuCsaqVAVwyaQqAH2hCxFV9TxlbWa5d/view?usp=drive_link"
elif symbol_name == "BTCUSD":
    data_url = "https://drive.google.com/file/d/1sL8yc7vXaSLp559PG30XnEsSGFl3MMap/view?usp=drive_link"

data_content = download_data(data_url)

if data_content:
    # Save the downloaded data content
    with open("downloaded_data.csv", "wb") as f:
        f.write(data_content)
    print("Data file downloaded successfully!")
else:
    print("Failed to download data file.")


Data file downloaded successfully!


# 2) Data Preprocessing

In this section, I do the following:

* Reshaping the csv data from an unusable format to a proper one
* Dropping null-contained rows
* Checking for data consistency and integrity
* Checking for outliers and removing them
* Removing duplicates
* Removing dates containing time gaps

> All the required explanations to understand the process are available in function docstrings and code comments.

## 2.1) Data Reshaping

Here I reshape the downloaded CSV file into a proper format and save it as a `.parquet` file.

In [167]:
df = pd.read_csv('downloaded_data.csv')
print(df.shape)
df.head()


(690748, 1)


Unnamed: 0,<DATE>\t<TIME>\t<OPEN>\t<HIGH>\t<LOW>\t<CLOSE>\t<TICKVOL>\t<VOL>\t<SPREAD>
0,2017.04.11\t01:00:00\t1199.03\t1200.06\t1198.9...
1,2017.04.11\t01:05:00\t1199.40\t1201.05\t1198.2...
2,2017.04.11\t01:10:00\t1198.81\t1199.00\t1198.4...
3,2017.04.11\t01:15:00\t1198.98\t1199.42\t1198.6...
4,2017.04.11\t01:20:00\t1199.42\t1200.12\t1199.2...


You can see that the CSV data has an improper shape `(n, 1)`, emerging the necessity to reshape it.

In [73]:
def transform_csv(csv_data, time_offset):
    """
    Transforms the extracted CSV file - which has a single column -
    to a DataFrame with the specified columns. Also adds _date column
    and adjusts time back by one hour.

    Args:
        csv_data (str): The CSV data as a string

    Returns:
        pandas.DataFrame: The transformed DataFrame
    """
    # Read the CSV data into a DataFrame
    df = pd.read_csv(StringIO(csv_data), header=None)

    # Split the single column into the required columns
    # and cut out the first row as it contains the column names
    data = df[0].iloc[1:].str.split('\t', expand=True)

    # Combine the date and time columns into a single '_time' column
    df['_time'] = pd.to_datetime(
        data[0] + ' ' + data[1], format='%Y.%m.%d %H:%M:%S'
    )

    # Subtract time_offset from _time column
    df['_time'] = df['_time'] - time_offset

    # Extract date from _time column
    df['_date'] = df['_time'].dt.date

    # Assign the correct column names and data types
    df = pd.DataFrame({
        '_time': df['_time'],
        '_date': df['_date'],
        'open': data[2].astype(float),
        'high': data[3].astype(float),
        'low': data[4].astype(float),
        'close': data[5].astype(float),
        'tick_volume': data[6].astype('uint64'),
        'spread': data[8].astype('int32')
    })

    return df.iloc[1:]


In [168]:
with open('downloaded_data.csv', 'r') as file:
    csv_data = file.read()

time_offset = pd.Timedelta(minutes=60)
transformed_df = transform_csv(csv_data, time_offset)
pl_df = pl.from_pandas(transformed_df)


In [169]:
pl_df.shape

(690748, 8)

In [170]:
pl.Config.set_tbl_rows(-1)
pl_df.head(10)


_time,_date,open,high,low,close,tick_volume,spread
datetime[ns],date,f64,f64,f64,f64,f64,f64
2017-04-11 00:00:00,2017-04-11,1199.03,1200.06,1198.99,1199.73,286.0,0.0
2017-04-11 00:05:00,2017-04-11,1199.4,1201.05,1198.26,1198.82,898.0,0.0
2017-04-11 00:10:00,2017-04-11,1198.81,1199.0,1198.48,1198.99,131.0,0.0
2017-04-11 00:15:00,2017-04-11,1198.98,1199.42,1198.68,1199.36,103.0,0.0
2017-04-11 00:20:00,2017-04-11,1199.42,1200.12,1199.23,1199.55,424.0,0.0
2017-04-11 00:25:00,2017-04-11,1199.53,1199.69,1197.84,1197.85,246.0,0.0
2017-04-11 00:30:00,2017-04-11,1197.86,1198.49,1197.85,1198.04,236.0,0.0
2017-04-11 00:35:00,2017-04-11,1198.01,1200.68,1197.96,1200.6,450.0,0.0
2017-04-11 00:40:00,2017-04-11,1200.61,1202.01,1200.11,1201.83,411.0,0.0
2017-04-11 00:45:00,2017-04-11,1201.84,1203.53,1201.76,1203.43,352.0,0.0


In [171]:
pl_df[300150:300200]

_time,_date,open,high,low,close,tick_volume,spread
datetime[ns],date,f64,f64,f64,f64,f64,f64
2021-03-08 10:20:00,2021-03-08,49601.86,49701.45,49417.31,49532.81,633.0,2827.0
2021-03-08 10:25:00,2021-03-08,49532.81,49650.51,49532.81,49629.51,595.0,1147.0
2021-03-08 10:30:00,2021-03-08,49629.51,49639.01,49388.8,49502.34,796.0,1856.0
2021-03-08 10:35:00,2021-03-08,49503.34,49673.25,49465.84,49629.74,705.0,2171.0
2021-03-08 10:40:00,2021-03-08,49629.74,49879.14,49620.74,49856.65,958.0,741.0
2021-03-08 10:45:00,2021-03-08,49856.64,49865.64,49669.43,49699.43,855.0,2971.0
2021-03-08 10:50:00,2021-03-08,49698.93,49834.37,49697.43,49828.37,677.0,1925.0
2021-03-08 10:55:00,2021-03-08,49828.37,49917.57,49745.4,49892.57,619.0,1669.0
2021-03-08 11:00:00,2021-03-08,49889.07,49916.07,49809.87,49847.29,574.0,1735.0
2021-03-08 11:05:00,2021-03-08,49862.29,49922.7,49730.16,49771.61,389.0,1217.0


In [78]:
def remove_weekends(df, weekends_day=["Saturday", "Sunday"], convert_tz=True):
    """
    the input df _time must be in UTC

    """
    if convert_tz:
        raise ValueError("!!! neet to change this part")
        df["_time"] = df["_time"].dt.tz_localize("UTC").dt.tz_convert("Europe/Istanbul")

    df.sort_values("_time", inplace=True)
    df["date"] = df["_time"].dt.normalize()
    df["week_day_name"] = df["date"].dt.day_name()
    df = df.loc[~df["week_day_name"].isin(weekends_day)]
    df = df.drop(columns=["date", "week_day_name"])
    df.sort_values("_time", inplace=True)
    df.reset_index(drop=True, inplace=True)

    return df


In [45]:
def generate_true_time_df_pandas(df):
    start_date = df.iloc[0, 0]
    end_date = df.iloc[-1, 0]
    true_time_df = pd.DataFrame(
        pd.date_range(start_date, end_date, freq="5min"), columns=["_time"]
    )
    true_time_df = remove_weekends(
        true_time_df, weekends_day=["Saturday", "Sunday"], convert_tz=False
    )

    return true_time_df


In [46]:
def ffill_df_to_true_time_steps(df):
    true_time_df = generate_true_time_df_pandas(df)
    df = true_time_df.merge(df, on=["_time"], how="left")
    df.sort_values("_time", inplace=True)
    df.reset_index(drop=True)
    df_filled = df.ffill()

    # Create a column to indicate if any cell in the row was forward-filled
    df["was_ffilled"] = (df != df_filled).any(axis=1)
    # Update the DataFrame with forward-filled values
    df.update(df_filled)

    return df


In [172]:
df = ffill_df_to_true_time_steps(pl_df.to_pandas())
n_nulls = df.isnull().sum().sum()
pl_df = pl.from_pandas(df)
pl_df = pl_df.with_columns(pl_df['_date'].dt.date().alias('_date'))

# ? check for null and inf:
assert n_nulls == 0, f"DataFrame contains null values after ffill. {n_nulls}"

In [173]:
pl_df.shape

(580019, 9)

In [174]:
# Removing duplicate timestamps, keeping only the first occurrence
pl_df = pl_df.unique(subset=["_time"], keep="first")

In [175]:
pl_df.shape

(580019, 9)

In [176]:
pl_df = pl_df.sort("_time")
pl_df.head(10)

_time,_date,open,high,low,close,tick_volume,spread,was_ffilled
datetime[ns],date,f64,f64,f64,f64,f64,f64,bool
2017-04-11 00:00:00,2017-04-11,1199.03,1200.06,1198.99,1199.73,286.0,0.0,False
2017-04-11 00:05:00,2017-04-11,1199.4,1201.05,1198.26,1198.82,898.0,0.0,False
2017-04-11 00:10:00,2017-04-11,1198.81,1199.0,1198.48,1198.99,131.0,0.0,False
2017-04-11 00:15:00,2017-04-11,1198.98,1199.42,1198.68,1199.36,103.0,0.0,False
2017-04-11 00:20:00,2017-04-11,1199.42,1200.12,1199.23,1199.55,424.0,0.0,False
2017-04-11 00:25:00,2017-04-11,1199.53,1199.69,1197.84,1197.85,246.0,0.0,False
2017-04-11 00:30:00,2017-04-11,1197.86,1198.49,1197.85,1198.04,236.0,0.0,False
2017-04-11 00:35:00,2017-04-11,1198.01,1200.68,1197.96,1200.6,450.0,0.0,False
2017-04-11 00:40:00,2017-04-11,1200.61,1202.01,1200.11,1201.83,411.0,0.0,False
2017-04-11 00:45:00,2017-04-11,1201.84,1203.53,1201.76,1203.43,352.0,0.0,False


In [177]:
pl_df[300350:300400]

_time,_date,open,high,low,close,tick_volume,spread,was_ffilled
datetime[ns],date,f64,f64,f64,f64,f64,f64,bool
2021-04-08 21:10:00,2021-04-08,57867.25,57878.99,57790.56,57822.1,336.0,1480.0,False
2021-04-08 21:15:00,2021-04-08,57821.85,57828.1,57697.07,57704.57,476.0,1480.0,False
2021-04-08 21:20:00,2021-04-08,57704.57,57820.64,57693.57,57790.6,341.0,1480.0,False
2021-04-08 21:25:00,2021-04-08,57790.6,57810.85,57758.78,57780.6,253.0,1480.0,False
2021-04-08 21:30:00,2021-04-08,57780.6,57877.85,57780.6,57849.4,340.0,1480.0,False
2021-04-08 21:35:00,2021-04-08,57849.27,57864.18,57778.35,57814.85,400.0,1480.0,False
2021-04-08 21:40:00,2021-04-08,57814.85,57857.6,57765.41,57769.37,314.0,1480.0,False
2021-04-08 21:45:00,2021-04-08,57769.37,57771.37,57701.39,57721.35,212.0,1480.0,False
2021-04-08 21:50:00,2021-04-08,57719.04,57751.79,57635.29,57641.79,250.0,1480.0,False
2021-04-08 21:55:00,2021-04-08,57641.79,57734.85,57569.73,57723.85,425.0,1480.0,False


In [178]:
pl_df.filter(pl.col("was_ffilled") == True).shape

(9672, 9)

In [179]:
# ? check for big time gap
time_diffs = df["_time"].diff().iloc[1:]
assert time_diffs.between(
    pd.Timedelta("5min"),
    pd.Timedelta(days=7),
).all(), "Gaps detected in timestamps"

## 2.2) Dropping Nulls

In [180]:
column_names = pl_df.columns

print("Before dropping nulls:\n")
for col_name in column_names:
    num_of_nulls = pl_df.filter(pl.col(col_name).is_null().any()).shape[0]
    print(f"Number of {col_name} rows with null values: {num_of_nulls}")


Before dropping nulls:

Number of _time rows with null values: 0
Number of _date rows with null values: 0
Number of open rows with null values: 0
Number of high rows with null values: 0
Number of low rows with null values: 0
Number of close rows with null values: 0
Number of tick_volume rows with null values: 0
Number of spread rows with null values: 0
Number of was_ffilled rows with null values: 0


In [None]:
for col_name in column_names:
    pl_df = pl_df.filter(~pl.col(col_name).is_null())


In [None]:
print("After dropping nulls:\n")
for col_name in column_names:
    num_of_nulls = pl_df.filter(pl.col(col_name).is_null().any()).shape[0]
    print(f"Number of {col_name} rows with null values: {num_of_nulls}")


After dropping nulls:

Number of _time rows with null values: 0
Number of _date rows with null values: 0
Number of M5_OPEN rows with null values: 0
Number of M5_HIGH rows with null values: 0
Number of M5_LOW rows with null values: 0
Number of M5_CLOSE rows with null values: 0
Number of M5_VOLUME rows with null values: 0
Number of spread rows with null values: 0


In [181]:
# removing rows from the first day if it doesn't start at midnight
first_row_time = pl_df.row(0, named=True)["_time"]
if dt.time(0, 0) < first_row_time.time():
    pl_df = pl_df.filter(
        pl.col("_time").dt.date() > first_row_time.date()
    )  # Delete the first day that started from the middle of the day

## 2.3) Checking & Maintaining Data Integrity

Here I make sure that the data integrity is maintained; e.g. all prices (alongside `spread`) should be greater than or equal to zero, or all `open` (or `close`) prices should be between their corresponding `low` and `high` prices in all rows.

In [182]:
assert (pl_df["open"] <= pl_df["high"]).all(), "Open higher than high"
assert (pl_df["open"] >= pl_df["low"]).all(), "Open lower than low"
assert (pl_df["close"] <= pl_df["high"]).all(), "Close higher than high"
assert (pl_df["close"] >= pl_df["low"]).all(), "Close lower than low"
assert (pl_df["high"] >= pl_df["low"]).all(), "High lower than low"
assert (pl_df["open"] >= 0).all(), "Negative open price"
assert (pl_df["high"] >= 0).all(), "Negative high price"
assert (pl_df["low"] >= 0).all(), "Negative low price"
assert (pl_df["close"] >= 0).all(), "Negative close price"
assert (pl_df["tick_volume"] >= 0).all(), "Negative volume"
assert (pl_df["spread"] >= 0).all(), "Negative spread"


If any of the above integrity regulations are not held, run the following cell:

In [None]:
pl_df = pl_df.filter(
    (pl.col("open") <= pl.col("high"))
    & (pl.col("open") >= pl.col("low"))
    & (pl.col("close") <= pl.col("high"))
    & (pl.col("close") >= pl.col("low"))
    & (pl.col("high") >= pl.col("low"))
    & (pl.col("open") >= 0)
    & (pl.col("high") >= 0)
    & (pl.col("low") >= 0)
    & (pl.col("close") >= 0)
    & (pl.col("tick_volume") >= 0)
    & (pl.col("spread") >= 0)
)


## 2.4) Checking & Removing Return Outliers

`Return` is defined as the difference between the close price of a given candle and the close price of its previous candle. A return much bigger than the expecting range of normal returns can naturally be interpreted as an outlier. Thus, handling outliers in the context of returns is vital.

In [183]:
returns = pl_df["close"].pct_change().slice(1)
assert returns.is_between(-0.2, 0.2).all(), "pct_change outlier returns detected"


If the above outlier detection line illustrated an error, run the following cell:

In [None]:
while True:
    pl_df = pl_df.with_columns(
        pl.col("close")
        .pct_change()
        .alias("returns")
    ).filter(
        pl.col("returns").is_null()
        | (
            (pl.col("returns") >= -0.2)
            & (pl.col("returns") <= 0.2)
        )
    ).drop("returns")

    # Break if no outliers remain
    returns = pl_df["close"].pct_change().slice(1)
    if returns.is_between(-0.2, 0.2).all():
        break

df = ffill_df_to_true_time_steps(pl_df.to_pandas())
n_nulls = df.isnull().sum().sum()
pl_df = pl.from_pandas(df)
pl_df = pl_df.with_columns(pl_df['_date'].dt.date().alias('_date'))

# ? check for null and inf:
assert n_nulls == 0, f"DataFrame contains null values after ffill. {n_nulls}"

  df_filled = df.ffill()


In [184]:
pl_df.shape

(580019, 9)

In [185]:
pl_df.head(10)

_time,_date,open,high,low,close,tick_volume,spread,was_ffilled
datetime[ns],date,f64,f64,f64,f64,f64,f64,bool
2017-04-11 00:00:00,2017-04-11,1199.03,1200.06,1198.99,1199.73,286.0,0.0,False
2017-04-11 00:05:00,2017-04-11,1199.4,1201.05,1198.26,1198.82,898.0,0.0,False
2017-04-11 00:10:00,2017-04-11,1198.81,1199.0,1198.48,1198.99,131.0,0.0,False
2017-04-11 00:15:00,2017-04-11,1198.98,1199.42,1198.68,1199.36,103.0,0.0,False
2017-04-11 00:20:00,2017-04-11,1199.42,1200.12,1199.23,1199.55,424.0,0.0,False
2017-04-11 00:25:00,2017-04-11,1199.53,1199.69,1197.84,1197.85,246.0,0.0,False
2017-04-11 00:30:00,2017-04-11,1197.86,1198.49,1197.85,1198.04,236.0,0.0,False
2017-04-11 00:35:00,2017-04-11,1198.01,1200.68,1197.96,1200.6,450.0,0.0,False
2017-04-11 00:40:00,2017-04-11,1200.61,1202.01,1200.11,1201.83,411.0,0.0,False
2017-04-11 00:45:00,2017-04-11,1201.84,1203.53,1201.76,1203.43,352.0,0.0,False


In [186]:
pl_df = pl_df.filter(
    ~(pl.col("_time").dt.time().is_between(
        dt.time(23, 0, 0),  # Start time: 23:00:00
        dt.time(23, 55, 0)  # End time: 23:55:00
    ))
)

In [187]:
pl_df.shape

(555863, 9)

In [188]:
pl_df = pl_df.with_columns(pl_df['_time'].dt.date().alias('_date'))

In [189]:
pl_df[555250:555650]

_time,_date,open,high,low,close,tick_volume,spread,was_ffilled
datetime[ns],date,f64,f64,f64,f64,f64,f64,bool
2024-12-25 17:50:00,2024-12-25,98255.4,98394.96,98198.24,98350.56,1423.0,1126.0,False
2024-12-25 17:55:00,2024-12-25,98351.06,98388.56,98309.9,98317.33,1241.0,1888.0,False
2024-12-25 18:00:00,2024-12-25,98325.83,98426.2,98239.38,98251.33,1477.0,1901.0,False
2024-12-25 18:05:00,2024-12-25,98250.46,98300.32,98131.3,98141.03,1712.0,175.0,False
2024-12-25 18:10:00,2024-12-25,98141.03,98177.21,98081.99,98158.34,1342.0,1867.0,False
2024-12-25 18:15:00,2024-12-25,98158.34,98306.04,98158.34,98265.1,1542.0,1894.0,False
2024-12-25 18:20:00,2024-12-25,98261.1,98359.42,98217.13,98351.19,1366.0,1856.0,False
2024-12-25 18:25:00,2024-12-25,98351.44,98482.46,98317.81,98402.27,1391.0,1868.0,False
2024-12-25 18:30:00,2024-12-25,98402.27,98436.27,98321.33,98365.42,1238.0,2000.0,False
2024-12-25 18:35:00,2024-12-25,98361.44,98409.86,98321.9,98338.67,868.0,2000.0,False


In [190]:
# Identify dates where all `was_ffilled` values are True
dates_to_exclude = (
    pl_df.group_by("_date")
    .agg(pl.col("was_ffilled").all().alias("all_ffilled"))
    .filter(pl.col("all_ffilled"))
    .select("_date")
)

# Filter out rows with these dates
pl_df = pl_df.filter(~pl.col("_date").is_in(dates_to_exclude))


In [191]:
pl_df = pl_df.sort("_time")

In [192]:
pl_df.shape

(553655, 9)

In [193]:
pl_df[552150:552350]

_time,_date,open,high,low,close,tick_volume,spread,was_ffilled
datetime[ns],date,f64,f64,f64,f64,f64,f64,bool
2024-12-20 12:30:00,2024-12-20,93446.55,93491.48,92914.26,92965.54,1558.0,897.0,False
2024-12-20 12:35:00,2024-12-20,92965.54,93409.99,92525.87,93060.31,1494.0,0.0,False
2024-12-20 12:40:00,2024-12-20,93057.25,93271.4,92858.26,92948.95,1563.0,301.0,False
2024-12-20 12:45:00,2024-12-20,92953.46,93215.45,92582.96,93128.3,1692.0,722.0,False
2024-12-20 12:50:00,2024-12-20,93131.37,93233.99,92429.62,92895.1,1631.0,0.0,False
2024-12-20 12:55:00,2024-12-20,92895.23,93164.65,92562.28,92576.27,1653.0,468.0,False
2024-12-20 13:00:00,2024-12-20,92546.2,92884.46,92283.25,92397.76,1686.0,0.0,False
2024-12-20 13:05:00,2024-12-20,92439.5,92696.77,92188.06,92229.76,1714.0,598.0,False
2024-12-20 13:10:00,2024-12-20,92226.39,92547.51,92124.27,92360.75,1788.0,0.0,False
2024-12-20 13:15:00,2024-12-20,92379.15,93013.73,92078.55,92861.08,1763.0,0.0,False


In [194]:
pl_df.filter(pl.col("was_ffilled") == True).shape

(1248, 9)

In [195]:
pl_df.write_parquet(f"{symbol_name}_stage_one.parquet")