In [None]:
from datetime import date, datetime
import pandas as pd 
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import warnings

**Simple Moving Average**

In [None]:
def SMA(data,n):
    
    #додаємо NaN на початку
	sma_list=[np.nan for k in range(n-1)]
    
	for i in range(len(data)-n+1):
		sum_n = 0
		for j in range(n):
			sum_n = sum_n + data[i+j]
		sma=sum_n/n
		sma_list.append(round(sma, 2))
        
	return sma_list

**Linear Weighted Moving Average**

In [None]:
def LWMA(data,n):
    
	lma_list=[np.nan for k in range(n-1)]

	for i in range(len(data)-n+1):
		sum_n, sum_j = 0, 0
		for j in range(n):
			sum_n = sum_n + (j+1)*data[i+j]
			sum_j = sum_j + (j+1)
		lma=sum_n/sum_j
		lma_list.append(lma)

	return lma_list

**Exponential Moving Average**

In [None]:
def EMA(data, n, a=0):
	ema_list=[data[0]]
	if a==0:
		a=2/(n+1)

	for i in range (1,len(data)):
		ema = 0
		ema = (1-a)*ema_list[i-1]+a*data[i]
		ema_list.append(ema)

	return ema_list

**SO**

In [None]:
def SO(data_close, data_high, data_low, n):
	K, D = [np.nan for k in range(n-1)], [np.nan for k in range(n-1)]
	highest_high, lowest_low = [np.nan for k in range(n-1)], [np.nan for k in range(n-1)]

	for i in range(len(data_close)-n+1):
		list_n_high, list_n_low = [], []
		for j in range(0, n):
			list_n_high.append(data_high[i+j])
			list_n_low.append(data_low[i+j])
		highest_high.append(max(list_n_high))
		lowest_low.append(min(list_n_low))

	for i in range(n-1, len(highest_high)):
		K.append(100*(data_close[i]-lowest_low[i])/(highest_high[i]-lowest_low[i]))
	D = SMA(K, n)

	return K, D

**CCI**

In [None]:
def CCI(data_close, data_high, data_low, n):
	p_typical = []

	for i in range(len(data_close)):
		p_typical.append((data_close[i]+data_low[i]+data_high[i])/3)
	sma = SMA(p_typical,n)

	MAD = [np.nan for k in range(n-1)]
	for j in range(n-1, len(data_close)):
		mad = 0
		for s in range(n):
			mad = mad +  abs(p_typical[j-s]-sma[j])
		MAD.append(mad/n)

	CCI = [np.nan for n in range(n-1)]
	for t in range(n-1, len(data_close)):
		cci = (p_typical[t]-sma[t])/(0.015*MAD[t])
		CCI.append(cci)

	return CCI

**Disparsity Index**

In [None]:
def DI(data, n, MA_type):
    if MA_type == "EMA":
        MA = EMA(data, n)
    elif MA_type == "LWMA":
        MA = LWMA(data, n)
    else:
        MA = SMA(data, n)
    
    di_list = []
    for i in range(len(MA)):
        if (np.isnan(MA[i])):
            di_list.append(np.nan)
        else:
            di_list.append((data[i]-MA[i])/(100*MA[i]))
        
    return di_list

**Elder-Ray Index**

In [None]:
def ERI(data_close, data_high, data_low, n):
    MA = EMA(data_close, n)
    
    BuP, BeP = [], []
    for i in range(len(data_close)):
        BuP.append(data_high[i] - MA[i])
        BeP.append(data_low[i] - MA[i])
    return BuP, BeP

**Strategy with moving averages**

Сигнали - перетин ціни і плинного середнього

P - list of Close prices

n1 - к-ть періодів для середнього

MA_type - тип середнього ("SMA", "EMA", "LWMA")

In [None]:
def strategy_MA_1(P, n1, MA_type, n2=26, n3=9):
	MA= []
	if MA_type == "SMA":
		MA = SMA(P, n1)
	elif MA_type == "EMA":
		MA = EMA(P, n1)
	elif MA_type == "LWMA":
		MA = LWMA(P, n1)

	if MA_type != "EMA":
		s=[np.nan for k in range(n1)]
		for i in range(n1, len(MA)):
			if (P[i-1]<MA[i-1]) and (P[i]>MA[i]):
				s.append(1)
			elif (P[i-1]>MA[i-1]) and (P[i]<MA[i]):
				s.append(-1)
			else:
				s.append(0)
	elif MA_type == "EMA":
		s=[np.nan]
		for i in range(1,len(MA)):
			if (P[i-1]<MA[i-1]) and (P[i]>MA[i]):
				s.append(1)
			elif (P[i-1]>MA[i-1]) and (P[i]<MA[i]):
				s.append(-1)
			else:
				s.append(0)
	return s

**Strategy SO**

In [None]:
def strategy_SO(P_close, P_high, P_low, n):
	K, D = SO(P_close, P_high, P_low, n)
	s=[np.nan for k in range(n)]
	for i in range(n, len(P_close)):
		if (K[i]>D[i]) and (K[i-1]<D[i-1]):
			s.append(-1)
		elif (K[i]<D[i]) and (K[i-1]>D[i-1]):
			s.append(1)
		else:
			s.append(0)
	return s

**Strategy CCI**

In [None]:
def strategy_CCI(data_close, data_high, data_low, n):
	cci = CCI(data_close, data_high, data_low, n)
	s=[]
	for i in range(len(data_close)):
		if ((cci[i]>0) and (cci[i-1]<0)):
			s.append(1)
		elif ((cci[i]<0) and (cci[i-1]>0)):
			s.append(-1)
		else:
			s.append(0)
	return s

**Strategy DI**

In [None]:
def strategy_DI(P_close, n):
    DI_ = DI(P_close, n, MA_type="SMA")
    
    s=[np.nan]
    for i in range(1,len(DI_)):
        if np.isnan(DI_[i]):
            s.append(np.nan)
        elif ((DI_[i]>0)&(DI_[i-1]<0)):
            s.append(1)
        elif ((DI_[i]<0)&(DI_[i-1]>0)):
            s.append(-1)
        else:
            s.append(0)
            
    return s

**Strategy ERI**

In [None]:
def strategy_ERI(P_close, P_high, P_low, n=13):
    BuP_ = ERI(P_close, P_high, P_low, n)[0]
    BeP_ = ERI(P_close, P_high, P_low, n)[1]
    
    s=[np.nan]
    for i in range(1, len(BuP_)):
        if ((BeP_[i]>0)&(BeP_[i-1]<0)):
            s.append(1)
        elif ((BuP_[i]<0)&(BuP_[i-1]>0)):

            s.append(-1)
        else:
            s.append(0)
            
    return s

**Strategy MAE**

In [None]:
def strategy_MAE(P, upper, lower, n1, MA_type, MAE_type="LL"):
	if n1 >= 0:
		UL, LL, MA = [], [], []
		if MA_type == "SMA":
			MA = SMA(P, n1)
		elif MA_type == "EMA":
			MA = EMA(P, n1)
		elif MA_type == "LWMA":
			MA = LWMA(P, n1)

		for i in range(len(P)):
			UL.append((1+upper)*MA[i])
			LL.append((1-lower)*MA[i])
		if MA_type != "EMA":
			s=[np.nan for k in range(n1)]
			if MAE_type == "UL":
				for i in range(n1, len(MA)):
					if (P[i-1]<UL[i-1]) and (P[i]>UL[i]):
						s.append(1)
					elif (P[i-1]>UL[i-1]) and (P[i]<UL[i]):
						s.append(-1)
					else:
						s.append(0)
			elif MAE_type == "LL":
				for i in range(n1, len(MA)):
					if (P[i-1]<LL[i-1]) and (P[i]>LL[i]):
						s.append(1)
					elif (P[i-1]>LL[i-1]) and (P[i]<LL[i]):
						s.append(-1)
					else:
						s.append(0)
		elif MA_type == "EMA":
			s=[np.nan]
			if MAE_type == "UL":
				for i in range(1, len(MA)):
					if (P[i-1]<UL[i-1]) and (P[i]>UL[i]):
						s.append(1)
					elif (P[i-1]>UL[i-1]) and (P[i]<UL[i]):
						s.append(-1)
					else:
						s.append(0)
			elif MAE_type == "LL":
				for i in range(1, len(MA)):
					if (P[i-1]<LL[i-1]) and (P[i]>LL[i]):
						s.append(1)
					elif (P[i-1]>LL[i-1]) and (P[i]<LL[i]):
						s.append(-1)
					else:
						s.append(0)
		return s
	else:
		return ["nan", "nan"]

In [None]:
def signal_convertation(signals):
    non_zero_element = 0
    converted_signals = []
    #елементи на початку ряду можуть бути нулями, ми їх не змінюємо до появи першого сигналу
    
    for i in range(len(signals)):
        if np.isnan(signals[i]):
            converted_signals.append(np.nan)
        else:
            if signals[i] != 0:
                non_zero_element = signals[i]
                converted_signals.append(signals[i])
            else:
                converted_signals.append(non_zero_element)
                
    return converted_signals

In [None]:
def trading_results(price, signal):
    results=["n/a" for i in range(len(price))]
    open_short, close_short, open_long, close_long = 0, 0, 0, 0
    
    
    for i in range(1,len(signal)):
        
        if signal[i] == 0 and signal[i-1] == 1:
            open_short = i
            close_long = i
            #рахуємо прибутковість довгої позиції
            #result = (price[close_long] - price[open_long])/price[open_long]
            result = (price[close_long] - price[open_long])/price[open_long] - 0.4/100
            results[i] = result
            
        if signal[i] == 1 and signal[i-1] == 0:
            close_short = i
            open_long = i
            #рахуємо прибутковість короткої позиції
            #result = -(price[close_short] - price[open_short])/price[open_short]
            result = -(price[close_short] - price[open_short])/price[open_short] - 0.4/100
            results[i] = result
            
    return(results)

***підбір оптимальних параметрів***

In [None]:
warnings.filterwarnings("ignore")

list_param, list_per, list_result = [], [], []

for n_ in range(2,101):
  for per_ in [0.01, 0.015, 0.02, 0.025, 0.03]:

    columns = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume']
    df = pd.read_excel('UX-dailyhistory.xlsx', header = None, skiprows = 1, names = columns)
    df.index = df["Date"]
    df.index = pd.to_datetime(df.index)
    df.drop(['Date', 'Volume'], axis='columns', inplace=True)

    df = df.sort_index()

    train , test = train_test_split(df, test_size = 0.2, shuffle=False)
    df = train

    #df["PARAM"] = strategy_MAE(P=df.Close, upper=0, lower=per_, n1=n_, MA_type="SMA", MAE_type="LL")
    #df["PARAM"] = strategy_MAE(P=df.Close, upper=0, lower=per_, n1=n_, MA_type="EMA", MAE_type="LL")
    #df["PARAM"] = strategy_MAE(P=df.Close, upper=0, lower=per_, n1=n_, MA_type="LWMA", MAE_type="LL")
    #df["PARAM"] = strategy_MAE(P=df.Close, upper=per_, lower=0, n1=n_, MA_type="SMA", MAE_type="UL")
    #df["PARAM"] = strategy_MAE(P=df.Close, upper=per_, lower=0, n1=n_, MA_type="EMA", MAE_type="UL")
    df["PARAM"] = strategy_MAE(P=df.Close, upper=per_, lower=0, n1=n_, MA_type="LWMA", MAE_type="UL")
    df["PARAM"] = signal_convertation(df["PARAM"])

    df = df.replace(0, np.nan)
    df = df.dropna()
  
    signal = []
    for i in range(len(df["Close"])):
      if df["PARAM"][i] > 0:
          signal.append(1)
      else:
          signal.append(0)

    df["Signal"] = signal

    results = trading_results(df["Close"], df["Signal"])

    for i in range(len(results)):
        if results[i] != 'n/a':
            results[i] = results[i]*100
        
    amount = []
    amount_ = 100
    for element in results:
        if element != 'n/a':
            amount_ = amount_*(1+element/100)
            amount.append(amount_)
        else:
            amount.append(element)
    amount[0] = 100

    for i in range(len(amount)):
      if amount[i] != 'n/a':
          remember = amount[i]
      else:
          amount[i] = remember 

    df["Results"] = amount

    list_param.append(n_)
    list_per.append(per_)
    list_result.append(df["Results"][len(df["Results"])-1])

data = {'Param_n': list_param, 'Param_per': list_per, 'Result': list_result }
df_final = pd.DataFrame(data)
df_final.to_excel("LWMA_UL.xlsx")

In [None]:
warnings.filterwarnings("ignore")

list_param, list_result = [], []

for n_ in range(2,101):

  columns = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume']
  df = pd.read_excel('UX-dailyhistory.xlsx', header = None, skiprows = 1, names = columns)
  df.index = df["Date"]
  df.index = pd.to_datetime(df.index)
  df.drop(['Date', 'Volume'], axis='columns', inplace=True)

  df = df.sort_index()

  train , test = train_test_split(df, test_size = 0.2, shuffle=False)
  df = train

  #df["PARAM"] = strategy_DI(P_close=df.Close, n=n_)
  #df["PARAM"] = strategy_ERI(P_close=df.Close, P_high=df.High, P_low=df.Low, n=n_)
  #df["PARAM"] = strategy_SO(P_close=df.Close, P_high=df.High, P_low=df.Low, n=n_)
  #df["PARAM"] = strategy_CCI(data_close=df.Close, data_high=df.High, data_low=df.Low, n=n_)
  #df["PARAM"] = strategy_MA_1(P = df.Close, n1 = n_, MA_type = "SMA")
  #df["PARAM"] = strategy_MA_1(P = df.Close, n1 = n_, MA_type = "EMA")
  df["PARAM"] = strategy_MA_1(P = df.Close, n1 = n_, MA_type = "LWMA")
  df["PARAM"] = signal_convertation(df["PARAM"])
  
  df = df.replace(0, np.nan)
  df = df.dropna()
  
  signal = []
  for i in range(len(df["Close"])):
    if df["PARAM"][i] > 0:
        signal.append(1)
    else:
        signal.append(0)

  df["Signal"] = signal

  results = trading_results(df["Close"], df["Signal"])

  for i in range(len(results)):
      if results[i] != 'n/a':
          results[i] = results[i]*100
        
  amount = []
  amount_ = 100
  for element in results:
      if element != 'n/a':
          amount_ = amount_*(1+element/100)
          amount.append(amount_)
      else:
          amount.append(element)
  amount[0] = 100

  for i in range(len(amount)):
    if amount[i] != 'n/a':
        remember = amount[i]
    else:
        amount[i] = remember 

  df["Results"] = amount

  #df.to_excel('question_.xlsx')

  list_param.append(n_)
  list_result.append(df["Results"][len(df["Results"])-1])

data = {'Param': list_param, 'Result': list_result }
df_final = pd.DataFrame(data)
df_final.to_excel("MA_LWMA.xlsx")

In [None]:
columns = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume']
df = pd.read_excel('UX-dailyhistory.xlsx', header = None, skiprows = 1, names = columns)
df.index = df["Date"]
df.index = pd.to_datetime(df.index)
df.drop(['Date', 'Volume'], axis='columns', inplace=True)

df = df.sort_index()

train , test = train_test_split(df, test_size = 0.2, shuffle=False)
df = train

print(df)
print(test)

print(( df["Close"][len(df["Close"])-1]/df["Close"][0] ) * 100)

               Open     High      Low    Close
Date                                          
2009-03-26   500.00   500.00   500.00   500.00
2009-03-27   557.04   567.24   508.31   509.86
2009-03-30   509.35   509.87   502.22   509.87
2009-03-31   509.70   511.46   509.70   511.46
2009-04-01   508.05   521.66   508.05   521.66
...             ...      ...      ...      ...
2019-07-22  1593.63  1593.63  1587.99  1587.99
2019-07-23  1587.99  1604.56  1587.99  1597.37
2019-07-24  1597.37  1598.31  1587.52  1587.52
2019-07-25  1587.52  1587.52  1587.52  1587.52
2019-07-26  1587.52  1587.52  1587.52  1587.52

[2549 rows x 4 columns]
               Open     High      Low    Close
Date                                          
2019-07-29  1587.52  1614.96  1587.52  1614.96
2019-07-30  1614.96  1614.96  1603.24  1603.24
2019-07-31  1603.24  1610.91  1601.91  1601.91
2019-08-01  1601.91  1601.91  1601.91  1601.91
2019-08-02  1601.91  1601.91  1583.93  1583.93
...             ...      ...      .