In [1]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm


In [5]:
path = './../Data/'
usdjpy = './../Data/USDJPY_M15_202010070430_202410140915.csv'
os.listdir(path)

['GBPUSD_M15_202010071130_202410140915.csv',
 'NZDUSD_M15_202010010000_202410140915.csv',
 'USDJPY_M15_202010070430_202410140915.csv',
 'AUDNZD_M15_202010010000_202410140915.csv',
 'NZDJPY_M15_202010010000_202410140915.csv',
 'EURGBP_M15_202010010000_202410140915.csv',
 'AUDUSD_M15_202010010000_202410140915.csv',
 'GBPJPY_M15_202010010000_202410140915.csv',
 'EURUSD_M15_202010070145_202410140915.csv',
 'EURJPY_M15_202010010000_202410140915.csv',
 'CADJPY_M15_202010010000_202410140915.csv',
 'USDSGD_M15_202010010000_202410140915.csv']

In [45]:
def preprocess_data(df: pd.DataFrame, atr_length=14)->pd.DataFrame:
    df['<BODY_LENGTH>'] = abs(df['<CLOSE>'] - df['<OPEN>']) 
    df['<ATR>'] = df['<BODY_LENGTH>'].rolling(atr_length).mean()
    df.dropna(inplace=True)
    return df

def load_data(path: str)->pd.DataFrame:
    df = pd.read_csv(usdjpy, delimiter='\t')
    df = preprocess_data(df)
    return df

df = load_data(usdjpy)
df.head(20)

Unnamed: 0,<DATE>,<TIME>,<OPEN>,<HIGH>,<LOW>,<CLOSE>,<TICKVOL>,<VOL>,<SPREAD>,<BODY_LENGTH>,<ATR>
13,2020.10.07,07:45:00,105.723,105.741,105.71,105.725,425,0,1,0.002,0.010429
14,2020.10.07,08:00:00,105.725,105.733,105.709,105.722,231,0,1,0.003,0.009929
15,2020.10.07,08:15:00,105.722,105.727,105.712,105.727,150,0,1,0.005,0.0085
16,2020.10.07,08:30:00,105.727,105.756,105.724,105.744,465,0,1,0.017,0.009214
17,2020.10.07,08:45:00,105.744,105.771,105.735,105.746,422,0,0,0.002,0.009214
18,2020.10.07,09:00:00,105.746,105.766,105.745,105.753,527,0,0,0.007,0.0095
19,2020.10.07,09:15:00,105.753,105.762,105.74,105.744,421,0,0,0.009,0.009429
20,2020.10.07,09:30:00,105.744,105.75,105.726,105.743,296,0,1,0.001,0.008714
21,2020.10.07,09:45:00,105.743,105.744,105.724,105.73,208,0,2,0.013,0.009143
22,2020.10.07,10:00:00,105.731,105.77,105.716,105.766,667,0,0,0.035,0.011286


In [9]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100026 entries, 0 to 100025
Data columns (total 9 columns):
 #   Column     Non-Null Count   Dtype  
---  ------     --------------   -----  
 0   <DATE>     100026 non-null  object 
 1   <TIME>     100026 non-null  object 
 2   <OPEN>     100026 non-null  float64
 3   <HIGH>     100026 non-null  float64
 4   <LOW>      100026 non-null  float64
 5   <CLOSE>    100026 non-null  float64
 6   <TICKVOL>  100026 non-null  int64  
 7   <VOL>      100026 non-null  int64  
 8   <SPREAD>   100026 non-null  int64  
dtypes: float64(4), int64(3), object(2)
memory usage: 6.9+ MB


In [56]:
'''
G = Bull Candle = A
R = Bear Candle = B
N = No Color Candle = C

E = Candle == ATR ratio = D
SSS = Candle <<< ATR = E
SS = Candle << ATR = F
S = Candle < ATR = G
BBB = Candle >>> ATR = H
BB = Canadle >> ATR = I
B = Candle > ATR = J

M = Top Wick == Bot Wick = K
LLL = Bot Wick >>> TopWick = L
LL = Bot Wick >> TopWick = M 
L = Bot Wick > TopWick = N

UUU = Top Wick >>> Bot Wick = O
UU = Top Wick >> Bot Wick = P 
U = Top Wick > Bot Wick = Q
'''

def isbull(open: float, close: float)-> bool:
    return close > open

def bear_bull(open: float, close: float) -> str:
    
    if open < close:
        return 'A'  # bull candle
    elif open > close:
        return 'B' # bear candle
    # in rare case open == close
    return 'C' 
    
def candle_to_atr_ratio(open: float, close: float, atr: float, eps: float = 0.05) -> str:
    canlde_height = abs(open - close)
    ratio = canlde_height/(atr + 1e-6)
    
    if ratio < 1 + eps and ratio > 1 - eps:
        return 'D'
    
    if ratio < 0.25:
        return "E"

    if ratio < 0.5:
        return "F"
    
    if ratio < 1 - eps:
        return "G"
    
    if ratio > 1.75:
        return "H"
    
    if ratio > 1.50:
        return "I"
    
    if ratio > 1 + eps:
        return "J"
    
    raise ValueError("Sanity Check")

def top_bot_wick_ratio(high: float, low: float, open: float, close: float, eps: float=0.05)-> str:
    if isbull(open, close):
        top_wick = high - close
        bot_wick = open - low
    else:
        top_wick = high - open
        bot_wick = close - low
    
    ratio = top_wick/(bot_wick + 1e-6)


    if ratio < 1 + eps and ratio > 1 - eps:
        return 'K'
    
    if ratio < 0.25:
        return "L"

    if ratio < 0.5:
        return "M"
    
    if ratio < 1 - eps:
        return "N"
    
    if ratio > 1.75:
        return "O"
    
    if ratio > 1.50:
        return "P"
    
    if ratio > 1 + eps:
        return "Q"
    
    raise ValueError("Sanity Check")


def candle2char(open:float, close: float, low: float,
                 high: float, vol: float, atr: float, eps:float=0.05) -> str:
    candle = ''
    candle += bear_bull(open, close)
    candle += candle_to_atr_ratio(open, close, atr, eps)
    candle += top_bot_wick_ratio(high, low, open, close, eps)
    return candle

def processs_df_row(row, eps=0.05):
    open = row['<OPEN>']
    close = row['<CLOSE>']
    high = row['<HIGH>']
    low = row['<LOW>']
    atr = row['<ATR>']
    vol = row['<VOL>']
    return candle2char(open, close, low, high, vol, atr, eps)

def preprocess_data(df: pd.DataFrame, atr_length=14)->pd.DataFrame:
    df['<BODY_LENGTH>'] = abs(df['<CLOSE>'] - df['<OPEN>']) 
    df['<ATR>'] = df['<BODY_LENGTH>'].rolling(atr_length).mean()
    df.dropna(inplace=True)
    df['word'] = df.apply(lambda row: processs_df_row(row), axis=1)
    return df

def load_data(path: str)->pd.DataFrame:
    df = pd.read_csv(usdjpy, delimiter='\t')
    df = preprocess_data(df)
    return df

df = load_data(usdjpy)
df.head()


ZeroDivisionError: float division by zero

In [57]:

def get_all_combinations():
    color = list('abc')
    size_ratio = list('defghij')
    wick_ratio = list('klmnopq')


    opts = [color, size_ratio, wick_ratio]
    res = []
    def fill_opts(i, word=[]):
        if i == 3:
            res.append(word[:])
            return
        
        for item in opts[i]:
            word += item
            fill_opts(i+1, word)
            word.pop()



    fill_opts(0)
    return res[:]

def get_itos_dicts():

    combos = get_all_combinations()

    stoi = {}
    itos = {}

    for i, c in enumerate(combos):
        # print(i, ''.join(c))
        w = ''.join(c)
        stoi[w] = i
        itos[i] = w
    return (stoi, itos)

stoi , itos = get_itos_dicts()

In [46]:
from glob import glob

data_path = './../Data/'

files = glob(f'{data_path}/*.csv')

save_path = './../Data/stringified_data/'
for file in files:
    df = load_data(file)
    save_path = file.replace(data_path, save_path)
    
    break

In [47]:
df.head()

Unnamed: 0,<DATE>,<TIME>,<OPEN>,<HIGH>,<LOW>,<CLOSE>,<TICKVOL>,<VOL>,<SPREAD>,<BODY_LENGTH>,<ATR>
13,2020.10.07,07:45:00,105.723,105.741,105.71,105.725,425,0,1,0.002,0.010429
14,2020.10.07,08:00:00,105.725,105.733,105.709,105.722,231,0,1,0.003,0.009929
15,2020.10.07,08:15:00,105.722,105.727,105.712,105.727,150,0,1,0.005,0.0085
16,2020.10.07,08:30:00,105.727,105.756,105.724,105.744,465,0,1,0.017,0.009214
17,2020.10.07,08:45:00,105.744,105.771,105.735,105.746,422,0,0,0.002,0.009214
