In [1]:
# import libraries
import numpy as np
import pandas as pd
import pickle
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score

# read csv file
csvPath = 'ADAUSDT_M15.csv'
df = pd.read_csv(csvPath)
df.head()

columns = df.columns.values
# high - low
if not 'Size' in columns:
    df['Size'] = df['High'] - df['Low']
# shift back
if not 'Previous_Close' in columns:
    df['Previous_Close'] = df['Close'].shift(-1)
# size * volume
if not 'VS_multipy' in columns:
    df['VS_multipy'] = df['Size'] * df['Volume']
    df = df[1:-1]
# true range
if not 'True_ranage' in columns:
    df['True_ranage'] = np.abs(df['Close'] - df['Open'])

# cacluate atr 18
if not 'atr_14' in columns:
    high_low = df['High'] - df['Low']
    high_cp = np.abs(df['High'] - df['Close'].shift())
    low_cp = np.abs(df['Low'] - df['Close'].shift())
    df_ = pd.concat([high_low, high_cp, low_cp], axis=1)
    true_range = np.max(df_, axis=1)
    average_true_range = true_range.rolling(14).mean()
    df['atr_14'] = average_true_range.tolist()
    df = df[14:]

# calculate atr 55
if not 'atr_55' in columns:
    high_low = df['High'] - df['Low']
    high_cp = np.abs(df['High'] - df['Close'].shift())
    low_cp = np.abs(df['Low'] - df['Close'].shift())
    df_ = pd.concat([high_low, high_cp, low_cp], axis=1)
    true_range = np.max(df_, axis=1)
    average_true_range = true_range.rolling(55).mean()
    df['atr_55'] = average_true_range.tolist()
    df = df[55:]

# candle colors
if not 'status' in columns:
    conditions = [(df['Close'] > df['Open']), (df['Close'] < df['Open']), (df['Close'] == df['Open'])]
    choices = [1, -1, 0] # up, down, zero
    df['status'] = np.select(condlist=conditions, choicelist=choices, default=0)
# candle types
if not 'candle_type' in columns:
    condistions = [
        (df['True_ranage'] < 0.8 * df['atr_14']), 
        ((df['True_ranage'] > 0.8 * df['atr_14']) & (df['True_ranage'] < 1.2 * df['atr_14'])), 
        ((df['True_ranage'] > 1.2 * df['atr_14']))]
    choices = [0, 1, 2] # 0 --> spining candles, 1 --> standard candles, 2 --> spike candles
    df['candle_type'] = np.select(condlist=condistions, choicelist=choices, default=1)

# atr_55 * previous close
if not 'AP_multipy' in columns:
    df['AP_multipy'] = df['atr_55'] * df['Previous_Close']
# atr_55 * Size
if not 'AS_multipy' in columns:
    df['AS_multipy'] = df['atr_55'] * df['Size']
# atr_55 * Volume
if not 'AV_multipy' in columns:
    df['AV_multipy'] = df['atr_55'] * df['Volume']
if not 'typeSize' in columns:
    df['typeSize'] = df[(df.True_ranage < 0.8 * df.atr_14)]['atr_14'] * 0.8
    df['typeSize'] = df[(df.True_ranage > 0.8 * df.atr_14) & df.True_ranage < 1.2 * df.atr_14]['atr_14']
    df['typeSize'] = df[(df.True_ranage > 1.2 * df.atr_14)]['atr_14'] * 1.2
# write to csv
df.to_csv(csvPath)
df = pd.read_csv(csvPath)

In [6]:
# train all date base
reg = LinearRegression()

def modeling(dFrame: pd.DataFrame, candleType: str):
    dfTrain = dFrame.sample(frac=0.8, random_state=0)
    dfTest = dFrame.drop(dfTrain.index)

    xTrain = dfTrain[['Size', 'Volume', 'VS_multipy', 'AP_multipy', 'AS_multipy']].values
    yTrain = dfTrain['Previous_Close'].values
    xTest = dfTest[['Size', 'Volume', 'VS_multipy', 'AP_multipy', 'AS_multipy']].values
    yTest = dfTest['Previous_Close'].values
    reg.fit(xTrain, yTrain)
    yPredict = reg.predict(xTest)
    n = r2_score(yTest, yPredict)
    print('{}: {}'.format(candleType, n))

def spinig():
    global df
    df_ = df[(df.candle_type == 0)]
    modeling(df_, "Spining")

def standard():
    global df
    df_ = df[(df.candle_type == 1)]
    modeling(df_, 'Standard')

def spike():
    global df
    df_ = df[(df.candle_type == 2)]
    modeling(df_, 'Spike')

def standard_spining():
    global df
    df_ = df[(df.candle_type == 0) | (df.candle_type == 1)]
    modeling(df_, 'Standard & Spining')

spike()
spinig()
standard_spining()
standard()


Spike: 0.7583036751483573
Spining: 0.8402513029000274
Standard & Spining: 0.8392421716794881
Standard: 0.8535499977911281


In [7]:
# save model
modelFile = 'adaModel.sav'
pickle.dump(reg, open(modelFile, 'wb'))