# CONSTANTS

In [1]:
import CONSTANTS as c

BACKWARD_WINDOW_LENGTH = c.BACKWARD_WINDOW_LENGTH
FORWARD_WINDOW_LENGTH = c.FORWARD_WINDOW_LENGTH

EXCHANGE_RATES = c.EXCHANGE_RATES

FROM_TIMESTAMP = c.FROM_TIMESTAMP
TO_TIMESTAMP = c.TO_TIMESTAMP

# LIBRARY

In [2]:
import MetaTrader5 as mt5

import pandas as pd

import numpy as np

import pyspark

from pyspark.sql import SparkSession

# SOURCE DATA

In [3]:
# establish connection to the MetaTrader 5 terminal
if not mt5.initialize():
    print("initialize() failed, error code =",mt5.last_error())
    quit()
    
tplSymbols = mt5.symbols_get()
dfSymbols = pd.DataFrame(tplSymbols, columns = tplSymbols[0]._asdict().keys())

In [4]:
# get OHLC data
dfOhlc = pd.DataFrame()
for sExchangeRate in EXCHANGE_RATES:
    
    df = pd.read_csv(r'Data\{}_M1_202010010001_202210312359.csv'.format(sExchangeRate), delimiter = '\t')
    df.loc[:, 'PRICE_TIME_STAMP'] = pd.to_datetime(df['<DATE>'] + df['<TIME>'], format='%Y.%m.%d%H:%M:%S')
    df.drop(['<DATE>', '<TIME>'], axis = 1, inplace = True)
    df.loc[:, 'EXCHANGE_RATE'] = sExchangeRate
    df.query('@FROM_TIMESTAMP<= PRICE_TIME_STAMP <= @TO_TIMESTAMP ', inplace = True)
    dfOhlc = pd.concat([dfOhlc, df])

# ANALYZE

In [5]:
def GET_DATASETS_1(dfPreprocessed, dfTimes):
    #Create PySpark SparkSession
    oSparkSess = SparkSession.builder \
        .master("local[1]") \
        .appName("SparkByExamples.com") \
        .config("spark.executor.memory", "70g") \
        .config("spark.driver.memory", "50g") \
        .config("spark.memory.offHeap.enabled",True) \
        .config("spark.memory.offHeap.size","16g")   \
        .getOrCreate()

    oSparkSess.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

    def dfGetPriceAnalysis(df, iFrom, iTo, dfTimes):
        
        df = df[['PRICE_TIME_STAMP','<CLOSE>', '<HIGH>']]

        sdf= oSparkSess.createDataFrame(df)
        sdf.createOrReplaceTempView("sdf")

        dfTimes['FROM_TIME_STAMP'] = dfTimes['TIME_STAMP'] + pd.DateOffset(hours=iFrom)
        dfTimes['TO_TIME_STAMP'] = dfTimes['TIME_STAMP'] + pd.DateOffset(hours=iTo)

        sdfTimes= oSparkSess.createDataFrame(dfTimes)
        sdfTimes.createOrReplaceTempView("sdfTimes")

        dfToReturn =  oSparkSess.sql("""
            SELECT *
            FROM
            (
                SELECT 
                t.*,
                df.`<CLOSE>` AS CURRENT_CLOSE,
                df2.`<HIGH>` AS HISTORY_HIGH,
                (df2.`<HIGH>`-df.`<CLOSE>`)/(df.`<CLOSE>`) AS DIFF
                FROM
                (
                    SELECT df1.*, df2.TIME_STAMP AS HISTORY_TIME_STAMP FROM sdfTimes df1
                    INNER JOIN sdfTimes df2
                    ON df2.TIME_STAMP >= df1.FROM_TIME_STAMP and df2.TIME_STAMP < df1.TO_TIME_STAMP
                ) t
                LEFT JOIN sdf df
                ON t.TIME_STAMP = df.PRICE_TIME_STAMP
                LEFT JOIN sdf df2
                ON t.HISTORY_TIME_STAMP = df2.PRICE_TIME_STAMP
            ) df
            ORDER BY df.TIME_STAMP, df.HISTORY_TIME_STAMP
        """).toPandas()
        # find the unique timestamps where there is no historical or current data. and drop them from the dataset.
        aTimeStampsToDrop = dfToReturn.query('CURRENT_CLOSE.isna() == True or HISTORY_HIGH.isna() == True')['TIME_STAMP'].unique()
        dfToReturn.query('TIME_STAMP not in @aTimeStampsToDrop', inplace = True)
        dfToReturn.reset_index(drop = True, inplace = True)

        # drop the time stamps where there is no 60-mins data avaiable
        aTimeStampsToDrop = dfToReturn.groupby(['TIME_STAMP']).count().reset_index().query('HISTORY_TIME_STAMP < 60')['TIME_STAMP'].unique()
        dfToReturn.query('TIME_STAMP not in @aTimeStampsToDrop', inplace = True)
        dfToReturn.reset_index(drop = True, inplace = True)    

        dfToReturn.loc[:, 'MINUTE_DIFF'] = ((dfToReturn.loc[:, 'HISTORY_TIME_STAMP']-dfToReturn.loc[:, 'FROM_TIME_STAMP']).dt.seconds/60).astype(int)
        return dfToReturn



    dfPrep = dfPreprocessed.copy()

    aExchangeRates = list(dfPrep['EXCHANGE_RATE'].unique())

    dicDatasets = {
        'INPUT':
        {
        },
        'OUTPUT':
        {   
        }
    }
    
    
    for sExchangeRate in aExchangeRates:
        df_single_exc = dfPrep.query('EXCHANGE_RATE == @sExchangeRate')
        df_single_exc = df_single_exc[['PRICE_TIME_STAMP','<CLOSE>', '<HIGH>']].fillna(0)

        dic_input_single_exc = {}
        for i in range(-BACKWARD_WINDOW_LENGTH, 0):
            iFrom = i
            iTo = i +1

            print(iFrom)
            dfPriceAnalysis = dfGetPriceAnalysis(df_single_exc, iFrom, iTo, dfTimes)

            dic_input_single_exc[iFrom] = dfPriceAnalysis



        dic_output_single_exc = {}
        for i in range(0, FORWARD_WINDOW_LENGTH):
            iFrom = i
            iTo = i +1

            print(iFrom)
            dfPriceAnalysis = dfGetPriceAnalysis(df_single_exc, iFrom, iTo, dfTimes)

            dic_output_single_exc[iFrom] = dfPriceAnalysis


        dicDatasets['INPUT'][sExchangeRate] = dic_input_single_exc
        dicDatasets['OUTPUT'][sExchangeRate] = dic_output_single_exc

    
    
    # identify common time stamps
    aCommonTimeStamps = list()
    for i in dicDatasets:
        for j in dicDatasets[i]:
            for k in dicDatasets[i][j]:
                df = dicDatasets[i][j][k]

                if len(aCommonTimeStamps) == 0:
                    aCommonTimeStamps = df['TIME_STAMP'].unique()
                else:
                    aCommonTimeStamps = np.intersect1d(aCommonTimeStamps, df['TIME_STAMP'].unique())


    # drop the datafrom datasets that don't have common time stamps
    for i in dicDatasets:
        for j in dicDatasets[i]:
            for k in dicDatasets[i][j]:
                df = dicDatasets[i][j][k]
                dicDatasets[i][j][k] = df.query('TIME_STAMP in @aCommonTimeStamps').reset_index(drop = True)



    def dfCompileDic(p_dic, tplFormat,sKey):
        aToReturn =  np.zeros(tplFormat)
        dic = p_dic[sKey]
        ixExcRate = 0
        for i in dic:
            ixTimeStep = 0
            for j in dic[i]:
                df = dic[i][j]
                df = pd.pivot_table(
                    data = df[['DIFF','MINUTE_DIFF', 'TIME_STAMP']], 
                    values='DIFF', index='TIME_STAMP',
                    columns='MINUTE_DIFF', 
                    aggfunc=np.sum
                )
                df.sort_index(ascending=True, inplace=True)

                aToReturn[:, ixTimeStep,:,ixExcRate] = df.values

                ixTimeStep = ixTimeStep + 1

            ixExcRate = ixExcRate  + 1


        return aToReturn

    def aGetTimeFeatures(aTimeStamps):
        df =pd.DataFrame(data = aTimeStamps, columns = ['TIME_STAMP'])
        df.sort_values(by = 'TIME_STAMP' , ascending=True, inplace=True)
        df.loc[:, 'MINUTE'] = pd.to_datetime(df.loc[:, 'TIME_STAMP'],unit='s').dt.minute
        df.loc[:, 'HOUR'] = pd.to_datetime(df.loc[:, 'TIME_STAMP'],unit='s').dt.hour
        df.loc[:, 'DAY_OF_WEEK'] = pd.to_datetime(df.loc[:, 'TIME_STAMP'],unit='s').dt.day_of_week
        df.loc[:, 'DAY_OF_MONTH'] = pd.to_datetime(df.loc[:, 'TIME_STAMP'],unit='s').dt.day
        df.loc[:, 'MONTH'] = pd.to_datetime(df.loc[:, 'TIME_STAMP'],unit='s').dt.month
        df.loc[:, 'YEAR'] = pd.to_datetime(df.loc[:, 'TIME_STAMP'],unit='s').dt.year
        df.drop('TIME_STAMP', axis = 1, inplace = True)
        aToReturn = df.values
        return aToReturn
    
    

    X = dfCompileDic(dicDatasets, (len(aCommonTimeStamps), BACKWARD_WINDOW_LENGTH , 60 , len(aExchangeRates)), 'INPUT') # sample_size, time_steps, features, channel_size
    Y = dfCompileDic(dicDatasets, (len(aCommonTimeStamps), FORWARD_WINDOW_LENGTH , 60 , len(aExchangeRates)), 'OUTPUT') # sample_size, time_steps, features, channel_size
    X_TIME= aGetTimeFeatures(aCommonTimeStamps) #sample size, features
    
    return X,Y,X_TIME


In [None]:
dfPrep = dfOhlc.copy()
dfTimeStamps = pd.DataFrame(
    data = pd.date_range(
        start=FROM_TIMESTAMP, 
        end=TO_TIMESTAMP, freq = 'min'
    ),
    columns  = ['TIME_STAMP']
)
dfTimeStamps.query('TIME_STAMP.dt.day_of_week not in (6,7)', inplace = True)
dfTimeStamps.reset_index(drop = True, inplace = True)

X_ORIGINAL, Y_ORIGINAL, X_TIME_ORIGINAL = GET_DATASETS_1(dfPrep, dfTimeStamps)

-4


# SAVE

In [None]:
np.save(r'Temp\X_ORIGINAL.npy', X_ORIGINAL)
np.save(r'Temp\Y_ORIGINAL.npy', Y_ORIGINAL)
np.save(r'Temp\X_TIME_ORIGINAL.npy', X_TIME_ORIGINAL)