## Back Test Data Generation

In [None]:
import requests
import time
import calendar
import dateutil.parser as parser
from dateutil.relativedelta import relativedelta
from datetime import datetime, timezone
import pandas as pd
import numpy as np
import warnings
import yaml
from sklearn.neighbors import NearestNeighbors
from sklearn.linear_model import LinearRegression
from sklearn import linear_model
from sklearn.metrics import r2_score
from sklearn.metrics import mean_squared_error
import pytz
warnings.filterwarnings('ignore')

### Settings

In [None]:
with open ('back_test_pipeline_settings.yaml') as ymlfile:
    cfg = yaml.safe_load(ymlfile)
    k_number = cfg['knn']['k_number']
    metric = cfg['knn']['metric']
    algorithm = cfg['knn']['algorithm']
    feature_1 = cfg['knn']['feature_1']
    feature_2 = cfg['knn']['feature_2']
    feature_3 = cfg['knn']['feature_3']
    feature_7 = cfg['knn']['feature_7']
    feature_8 = cfg['knn']['feature_8']
    feature_15 = cfg['knn']['feature_15']
    volume = cfg['feature']['volume']
    volume_size = cfg['sample']['volume_size']
    sample_count = cfg['sample']['count']
    candles = cfg['recommendation']['candle_count']
    pair = cfg['currency']['pair']
    instrument = cfg['currency']['instrument']

In [None]:
print('K Number:',k_number)
print('Metric:', metric)
print('Algorithm:', algorithm)
print('Candle Volume Size:', volume_size)
print('Random Sample Count:', sample_count)
print('Future Candle Count:', candles)
print('Pair:', pair)
print('Instrument:', instrument)

In [None]:
def convert_date(utc_time): 
    parsed_date = parser.parse(utc_time)
    var_date=parsed_date.date()
    var_time=parsed_date.time()
    var_f_time=var_time.hour
    var_julian_date=parsed_date.timetuple().tm_yday
    var_weekday=parsed_date.weekday()
    var_weekday_name=calendar.day_name[parsed_date.weekday()]
    return var_date, var_time, var_f_time, var_julian_date, var_weekday, var_weekday_name

In [None]:
def find_k_similar_candles(candle_id, dataset, k = k_number):
    indices=[]
    distances = []
    output = []
    model_knn = NearestNeighbors(metric = metric, algorithm = algorithm) 
    model_knn.fit(dataset)
    
    #metric = 'euclidean' or 'cosine' or 'manhattan' or 'mahalanobis'
    
    distances, indices = model_knn.kneighbors(dataset.iloc[candle_id,:].values.reshape(1,-1),
                                              n_neighbors = k)

    for i in range(0,len(distances.flatten())):
        if i!=0:
            
            output.append ([dataset.index[indices.flatten()[i]],
                            distances.flatten()[i],
                            dataset.iloc[indices.flatten()[i]][feature_1],
                            dataset.iloc[indices.flatten()[i]][feature_2],
                            dataset.iloc[indices.flatten()[i]][feature_3],
                            dataset.iloc[indices.flatten()[i]][feature_7],
                            dataset.iloc[indices.flatten()[i]][feature_8],                            
                           ])
    
    output = pd.DataFrame(output)
    output.columns = ['Indice','Distance',
                      feature_1,
                      feature_2,
                      feature_3,
                      feature_7,
                      feature_8,
                     ]
   # display (output)
    
    return indices, distances

# <font color='red'>Test Configs</font>

In [None]:
filename = '{}_H4.csv'.format(instrument)
data = pd.read_csv(filename)

In [None]:
data.columns

In [None]:
data.shape

In [None]:
data.head()

In [None]:
data.describe()

In [None]:
volume_med = data['Volume'].median()
volume_med

## Selecting n random candles where their volume is more than 5500

In [None]:
print('Candle Volume Size:', volume_size)
print('Random Sample Count:', sample_count)

In [None]:
random_samples = data[data[volume] > volume_size].sample(n = sample_count)

In [None]:
#Random_Candles = np.random.randint(low=1, high=len(data)-40, size=1000)
Random_Candles = list(random_samples.index.values)

In [None]:
#show the fisrt 10 random generated candle numbers
Random_Candles[0:10]

# <font color='red'>CANDLE LOOP</font>

In [None]:
CST = pytz.timezone('America/Chicago')
datetime_cst = datetime.now(CST)
print("Date & Time in CST : ", 
      datetime_cst.strftime('%Y:%m:%d %H:%M:%S %Z %z'))

In [None]:
%%time

result_output = pd.DataFrame({'Candle_No':[],
                              'Current_Market_Fit':[],
                              'Current_Market':[],
                              
                              'Rec1_Close_Score':[],
                              'Rec1_High_Score':[],
                              'Rec1_Low_Score':[],
                              'Rec1_HH':[],
                              'Rec1_LL':[],
                              
                              'Rec2_Close_Score':[],
                              'Rec2_High_Score':[],
                              'Rec2_Low_Score':[],
                              'Rec2_HH':[],
                              'Rec2_LL':[],
                              
                              'Rec3_Close_Score':[],
                              'Rec3_High_Score':[],
                              'Rec3_Low_Score':[],
                              'Rec3_HH':[],
                              'Rec3_LL':[],
                              
                              'Rec4_Close_Score':[],
                              'Rec4_High_Score':[],
                              'Rec4_Low_Score':[],
                              'Rec4_HH':[],
                              'Rec4_LL':[],
                             })

for candle_no in Random_Candles:
    data = pd.read_csv(filename)
    data = data.iloc[candle_no:candle_no+candles]
    data['candleno'] = range (1, len(data) + 1)
    X = data['candleno'].values.reshape(-1, 1)
    Y = data['Close'].values.reshape(-1, 1)
    linear_regressor = LinearRegression()
    linear_regressor.fit(X, Y)
    y_pred = linear_regressor.predict(X) 
    
    Current_Market_Fit = r2_score(Y, y_pred)*100
    #print(Current_Market_Fit)
    coeficient = (linear_regressor.coef_)

    if coeficient > 0:
        Current_Market = 1  ## Bullish / Buy ##
    else:
        Current_Market = 0  ## Bearish / Sell ##
    
    data = pd.read_csv(filename)
    data = data[[feature_1,
                 feature_2,
                 feature_3,
                 feature_7,
                 feature_8,
                ]]

    indices, distances = find_k_similar_candles (candle_no,data)
    indices = indices[0:1][0]
    
    L_L = []
    H_H = []
    predicted_output_1 = []
    predicted_output_2 = []
    predicted_output_3 = []

    for indice in indices[1:5]:
        #print (indice)
        
        data = pd.read_csv(filename) 
        data = data.iloc[indice:indice+candles]
        
        HH = data.iloc[0]['Close'] - data['High'].max()
        LL = data.iloc[0]['Close'] - data['Low'].min()
        
        L_L.append([LL])
        H_H.append([HH])

        #print (HH.round(4), LL.round(4))

        data['candleno'] = range (1, len(data) + 1)
        X = data['candleno'].values.reshape(-1, 1)
        
        Y_Close = data['Close'].values.reshape(-1, 1)
        linear_regressor = LinearRegression()
        linear_regressor.fit(X, Y_Close)
        y_pred = linear_regressor.predict(X)
        Predicted_Market_Fit_Close = r2_score(Y_Close, y_pred)*100
        coeficient_close = (linear_regressor.coef_)
        if coeficient_close > 0:
            Predicted_Trade_Close = 1    ## Buy ##
        else:
            Predicted_Trade_Close = -1   ## Sell ##
        
        predicted_output_1.append([Predicted_Market_Fit_Close * Predicted_Trade_Close])
        #print ('****Close****', Predicted_Market_Fit_Close * Predicted_Trade_Close)
        
        
        Y_High = data['High'].values.reshape(-1, 1)
        linear_regressor.fit(X, Y_High)
        y_pred = linear_regressor.predict(X)
        Predicted_Market_Fit_High= r2_score(Y_High, y_pred)*100
        coeficient_high = (linear_regressor.coef_)
        
        if coeficient_high > 0:
            Predicted_Trade_High = 1    ## Buy ##
        else:
            Predicted_Trade_High = -1   ## Sell ##            
        
        predicted_output_2.append([Predicted_Market_Fit_High * Predicted_Trade_High])
        #print ('****High****', Predicted_Market_Fit_High * Predicted_Trade_High)
        
        
        Y_Low = data['Low'].values.reshape(-1, 1)
        linear_regressor.fit(X, Y_Low)
        y_pred = linear_regressor.predict(X)
        Predicted_Market_Fit_Low= r2_score(Y_Low, y_pred)*100
        coeficient_low = (linear_regressor.coef_)
        if coeficient_low > 0:
            Predicted_Trade_Low = 1    ## Buy ##
        else:
            Predicted_Trade_Low = -1   ## Sell ##
        
        predicted_output_3.append([Predicted_Market_Fit_Low * Predicted_Trade_Low])
        #print ('****Low****', Predicted_Market_Fit_Low * Predicted_Trade_Low)
        
    
    result = {'Candle_No': candle_no,
              'Current_Market_Fit': Current_Market_Fit,
              'Current_Market': Current_Market,
              
              'Rec1_Close_Score': predicted_output_1[0][0],
              'Rec1_High_Score': predicted_output_2[0][0],
              'Rec1_Low_Score': predicted_output_3[0][0],
              'Rec1_HH': H_H[0][0],
              'Rec1_LL': L_L[0][0],
              
              'Rec2_Close_Score': predicted_output_1[1][0],
              'Rec2_High_Score': predicted_output_2[1][0],
              'Rec2_Low_Score': predicted_output_3[1][0],
              'Rec2_HH': H_H[1][0],
              'Rec2_LL': L_L[1][0],
              
              'Rec3_Close_Score': predicted_output_1[2][0],
              'Rec3_High_Score': predicted_output_2[2][0],
              'Rec3_Low_Score': predicted_output_3[2][0],
              'Rec3_HH': H_H[2][0],
              'Rec3_LL': L_L[2][0],
              
              'Rec4_Close_Score': predicted_output_1[3][0],
              'Rec4_High_Score': predicted_output_2[3][0],
              'Rec4_Low_Score': predicted_output_3[3][0],
              'Rec4_HH': H_H[3][0],
              'Rec4_LL': L_L[3][0],
             }
    
    result_output = result_output.append(result, ignore_index = True)

In [None]:
now = datetime.now()
today = now.strftime("%d-%m-%Y_%I-%M_%p")

In [None]:
result_output.to_csv('01_Back_Test_Data.csv', header = True, index = False)

In [None]:
result_output.shape

In [None]:
result_output.isnull().sum()

#### Generating Log File

In [None]:
file = open(today + "_" + "data_generation_log_" + pair + '.txt', "w")
file.write ("Date: " + today + "\n" + \
            "Currency Pair: " + pair + "\n" + \
            "K_Number: " + str(k_number) + "\n" + \
            "KNN_Metric: " + metric + "\n" + \
            "KNN_Algorithm: " + algorithm + "\n" + \
            "Feature: " + feature_1 + "\n" + \
            "Feature: " + feature_2 + "\n" + \
            "Feature: " + feature_3 + "\n" + \
            "Feature: " + feature_7 + "\n" + \
            "Feature: " + feature_8 + "\n" + \
            "Volume Size: " + str(volume_size) + "\n" + \
            "Sample Count: " + str(sample_count) + "\n" + \
            "Candle Counts: " + str(candles) + "\n"
           )
file.close()

In [None]:
data = pd.read_csv('01_Back_Test_Data.csv')

In [None]:
data.head()