In [1]:
import numpy as np
import pandas as pd
from scipy import stats
import plotly.graph_objects as go
from matplotlib import pyplot as plt
import time
PATH = "../Data/"

In [4]:
def load(ticker, interval='D'):
    if ticker == "DAX":
        if interval== 'D':
           return  pd.read_csv(PATH+"DAX/yahoo_fin/1d.csv")
        elif interval == 'W':
            return  pd.read_csv(PATH+"DAX/yahoo_fin/1wk.csv")
        elif interval == 'M':
            return  pd.read_csv(PATH+"DAX/yahoo_fin/1mo.csv")
    else:
        return None

def parser(ts, arg=0):
    if (arg > 0 and arg < 4):
        return ts.split("-")[arg]
    else:
        return (ts.split("-"))
    
def extract(data, sData, eData):
    startIndex = endIndex = 0
    date = data[data.columns[0]]
    startIndex = np.where(date == sData)[0][0]
    if isinstance(eData, str):
        endIndex = np.where(date == eData)[0][0]
    elif isinstance(eData, int):
        endIndex = startIndex + eData
        if endIndex > len(date):
            endIndex = len(date) - 1 
    return startIndex, endIndex
        

# Test plot #1
Loading some samples and plot the candlestick graph using Plotly

In [11]:
data = load("DAX")

period = data[2000:2200]
fig = go.Figure(data=[go.Candlestick(x=period[period.columns[0]],
                open=period['open'], high=period['high'],
                low=period['low'], close=period['close'])
                     ])

fig.update_layout(xaxis_rangeslider_visible=False)
fig.show()

# Test update the plot
Test update the plot every 2 seconds 

In [4]:
newPeriod = data[200:300]

for i in range(10):
    s = i*10
    nP = newPeriod[s:s+10]
    fig.add_trace(go.Candlestick(x=nP[nP.columns[0]],
                open=nP['open'], high=nP['high'],
                low=nP['low'], close=nP['close']))

    fig.show()
    time.sleep(2)


# Test polynomial regression
Using Numpy polyfit() to fit a polynom of degree=4 to the data, and add it to the graph. It should also predict the future trend (30 days).

In [5]:
period = data[:300]

x_axis = np.arange(300, dtype=np.float64)
y_axis =period['close'].values
idx = np.isfinite(x_axis) & np.isfinite(y_axis)

k = np.polyfit(x_axis[idx], y_axis[idx], deg=8)
p = np.poly1d(k)

x_axis = np.arange(330)
y_axis = np.asarray([p(x) for x in x_axis], dtype=np.float64)

fig.add_trace(go.Scatter(x=data[:330][data.columns[0]], y=y_axis, name='P',
                         line = dict(color='royalblue', width=4, dash='dash')))
fig.show()

# Check how accurace the prediction is.
By comparing the next 30 days real data to the predictions.
The error is the deviation relative to the true values.
THe accuracy is simply the percentage of predictions that fall within 10% of the true values.


In [6]:
newD = data[300:330]

fig.add_trace(go.Candlestick(x=newD[newD.columns[0]],
                open=newD['open'], high=newD['high'],
                low=newD['low'], close=newD['close']))

fig.show()

idx = np.isfinite(newD['close'])
errors = np.abs((y_axis[300:] - newD['close'].values[idx]) / newD['close'].values[idx])
accuracy = np.where(errors<=0.1)[0].shape[0] / errors.shape[0]
print("Min error (%) : {}".format(np.min(errors)*100))
print("Max error (%) : {}".format(np.max(errors)*100))
print("Avg error (%) : {}".format(np.average(errors)*100))
print("Accuracy (%) : {}".format(accuracy*100))

Min error (%) : 0.15191245962437053
Max error (%) : 34.94031178870462
Avg error (%) : 12.215341989012469
Accuracy (%) : 46.666666666666664


# Simple Moving Average (SMA)
A moving average (MA) is a widely used indicator in technical analysis that helps smooth out price action by filtering out the “noise” from random short-term price fluctuations.

# Exponential moving average (EMA)
Exponential moving averages (EMA) use a weighted average that gives greater importance to more recent days to make it more responsive to new information.

In [14]:
def nan_to_avg(data):
    """
    Replace invalid inputs with the mean of the two nearest points
    """
    data = data.values
    idx = np.where(np.isnan(data))[0]
    data[idx] = 0
    data[idx] = np.abs(data[idx+2] + data[idx-2]) // 2 
    idx = np.where(np.isnan(data))[0]

# Test calculating MA and plot it
def sma(data, n1, n2=0):
    """
    IN:
        data = (d,1) Index data
        n1 =  (int) number of time periods
        n2 = (int) optional
    OUT:
        ma = (d,1) the simple moving average of the index data 
    """
    nan_to_avg(data)
    data_padded = np.pad(data, (n1//2, n1-1-n1//2), mode='edge')
    ma = np.convolve(data_padded, np.ones((n1))/n1, mode='valid')
    if (n2 > 0):
        data_padded2 = np.pad(data, (n2//2, n1-1-n1//2), mode='edge')
        ma2 = np.convolve(data_padded2, np.ones((n2))/n2, mode='valid')
        return ma, ma2
    else:
        return ma

def ema(data, n1, n2=0):
    """
    IN:
        data = (d,1) Index data
        n1 =  (int) number of time periods
        n2 = (int) optional
    OUT:
        ema = (d,1) the exponential moving average of the index data 
    """
    smoothing = 2 / (n1+1)
    nan_to_avg(data)
    ema = np.zeros(data.size)
    ema[0] = data[0]
    for i in range(1, data.size):
        ema[i] = data[i] * smoothing + ema[i-1] * (1-smoothing)
    if (n2 > 0):
        smoothing = 2 / (n2+1)
        ema2 = np.zeros(data.size)
        ema2[0] = data[0]
        for i in range(1, data.size):
            ema2[i] = data[i] * smoothing + ema2[i-1] * (1-smoothing)
        return ema, ema2
    else:
        return ema

In [100]:
"""
Test SMA(20, 50)
"""
data = load("DAX")

period = data[:200]

fig = go.Figure(data=[go.Candlestick(x=period[period.columns[0]],
                open=period['open'], high=period['high'],
                low=period['low'], close=period['close'])
                     ])
ma20, ma50 = sma(period['close'], 20, 50)

fig.add_trace(go.Scatter(x=data[:200][data.columns[0]], y=ma20, name='Ma 20',
                         line = dict(color='royalblue', width=3, dash='dash')))

fig.add_trace(go.Scatter(x=data[:200][data.columns[0]], y=ma50, name='Ma 50',
                         line = dict(color='black', width=3, dash='dash')))

fig.update_layout(xaxis_rangeslider_visible=False)
fig.show()

In [99]:
"""
Test EMA(9, 26)
"""
data = load("DAX")

period = data[:200]

fig = go.Figure(data=[go.Candlestick(x=period[period.columns[0]],
                open=period['open'], high=period['high'],
                low=period['low'], close=period['close'])
                     ])
ema9, ema26 = ema(period['close'], 9, 26)

fig.add_trace(go.Scatter(x=data[:200][data.columns[0]], y=ema9, name='EMA 9',
                         line = dict(color='royalblue', width=3, dash='dash')))

fig.add_trace(go.Scatter(x=data[:200][data.columns[0]], y=ema26, name='EMA 26',
                         line = dict(color='black', width=3, dash='dash')))

fig.update_layout(xaxis_rangeslider_visible=False)
fig.show()

# Bollinger bands
There are three lines that compose Bollinger Bands: A simple moving average (middle band) and an upper and lower band.
The upper and lower bands are typically 2 standard deviations +/- from a 20-day simple moving average, but can be modified.

In [101]:
def bollinger_bands(data, n=20, sig=2):
    """
    IN:
        data = (d,1) Typicla price { (high + low + close) / 2 }
        n =  (int) number of days in smoothing period
        sig = number of standard deviations
    OUT:
        upper = (d,1) Upper bollinger band
        lower = (d, 1) Lower bollinger band
        avg = (d,1) Middle moving average 
    """
    nan_to_avg(data)
    ma = sma(data, n)
    std = np.zeros(data.size)
    std[:n] = np.std(data[:n])
    for i in range(n, data.size):
        std[i] = np.std(data[i-n:i])
    
    upper = ma + sig * std
    lower = ma - sig * std
    avg = (upper + lower) / 2
    return upper, lower, avg

In [105]:
"""
Test Bollinger Bands(20, 2)
"""
data = load("DAX")

period = data[:200]

fig = go.Figure(data=[go.Candlestick(x=period[period.columns[0]],
                open=period['open'], high=period['high'],
                low=period['low'], close=period['close'])
                     ])

upper, lower, avg = bollinger_bands((period['close']+period['high']+period['low'])/3)

fig.add_trace(go.Scatter(x=data[:200][data.columns[0]], y=lower, name='Lower',
                         line = dict(color='black', width=3, dash='dash')))

fig.add_trace(go.Scatter(x=data[:200][data.columns[0]], y=upper, name='Upper',
                         line = dict(color='royalblue', width=3, dash='dash'), fill='tonexty'))

fig.add_trace(go.Scatter(x=data[:200][data.columns[0]], y=avg, name='Middle',
                         line = dict(color='yellow', width=3, dash='dash')))

fig.update_layout(xaxis_rangeslider_visible=False)
fig.show()

# Parabolic SAR
The technical indicator uses a trailing stop and reverse method called "SAR," or stop and reverse, to identify suitable exit and entry points. 
The parabolic SAR indicator appears on a chart as a series of dots, either above or below an asset's price, depending on the direction the price is moving.
A dot is placed below the price when it is trending upward, and above the price when it is trending downward.

In [118]:
def parabolic_sar(high, low, a_inc=0.02, a_stop=0.2):
    a = a_inc
    nan_to_avg(high)
    nan_to_avg(low)

    prev_sar = 0
    sar_u = np.zeros(high.size)
    sar_l = np.zeros(high.size)
    sar_u[0] = high[0]
    sar_l[0] = low[0]

    for i in range(1, high.size):
        sar_u[i] = sar_u[i-1] + a * (high[i] - sar_u[i-1])
        sar_l[i] = sar_l[i-1] - a * (sar_u[i-1] - low[i])
        if (a < a_stop):
            a += a_inc
    return sar_u, sar_l
        

In [119]:
"""
Test Parabolic SAR(.02, 0.02, .2)
"""
data = load("DAX")

period = data[:200]

fig = go.Figure(data=[go.Candlestick(x=period[period.columns[0]],
                open=period['open'], high=period['high'],
                low=period['low'], close=period['close'])
                     ])

sar_u, sar_l = parabolic_sar(period['high'], period['low'])

fig.add_trace(go.Scatter(x=data[:200][data.columns[0]], y=sar_u, name='SARU',
                         line = dict(color='black', width=3, dash='dash')))

fig.add_trace(go.Scatter(x=data[:200][data.columns[0]], y=sar_l, name='SARL',
                         line = dict(color='yellow', width=3, dash='dash')))

fig.update_layout(xaxis_rangeslider_visible=False)
fig.show()

# Testing TA-Lib package


In [127]:
import talib

"""
SMA
"""
data = load("DAX")

period = data[:200]

fig = go.Figure(data=[go.Candlestick(x=period[period.columns[0]],
                open=period['open'], high=period['high'],
                low=period['low'], close=period['close'])
                     ])
close = period['close']
nan_to_avg(close)

n1 = 20
pad = np.pad(close, (n1//2, n1-1-n1//2), mode='edge')
ma20 = talib.SMA(pad, timeperiod=20)
n1 = 50
pad = np.pad(close, (n1//2, n1-1-n1//2), mode='edge')
ma50 = talib.SMA(pad, timeperiod=50)

fig.add_trace(go.Scatter(x=data[:200][data.columns[0]], y=ma20, name='MA20',
                         line = dict(color='black', width=3, dash='dash')))

fig.add_trace(go.Scatter(x=data[:200][data.columns[0]], y=ma50, name='MA50',
                         line = dict(color='yellow', width=3, dash='dash')))

fig.update_layout(xaxis_rangeslider_visible=False)
fig.show()

In [129]:
"""
EMA
"""
ema9 = talib.EMA(close, timeperiod=9)
ema26 = talib.EMA(close, timeperiod=26)

fig.add_trace(go.Scatter(x=data[:200][data.columns[0]], y=ema9, name='EMA9',
                         line = dict(color='black', width=3, dash='dash')))

fig.add_trace(go.Scatter(x=data[:200][data.columns[0]], y=ema26, name='EMA26',
                         line = dict(color='yellow', width=3, dash='dash')))

fig.update_layout(xaxis_rangeslider_visible=False)
fig.show()

In [130]:
"""
Bollinger bands
"""
upper, middle, lower = talib.BBANDS(close, timeperiod=20)


fig.add_trace(go.Scatter(x=data[:200][data.columns[0]], y=lower, name='Lower',
                         line = dict(color='black', width=3, dash='dash')))

fig.add_trace(go.Scatter(x=data[:200][data.columns[0]], y=upper, name='Upper',
                         line = dict(color='royalblue', width=3, dash='dash'), fill='tonexty'))

fig.add_trace(go.Scatter(x=data[:200][data.columns[0]], y=middle, name='Middle',
                         line = dict(color='yellow', width=3, dash='dash')))

fig.update_layout(xaxis_rangeslider_visible=False)
fig.show()

In [12]:
import talib

In [15]:
"""
Parabolic SAR
"""
high = period['high']
low = period['low']
nan_to_avg(high)
nan_to_avg(low)

sar = talib.SAR(high, low, 0.02, 0.2)

fig.add_trace(go.Scatter(x=data[2000:2200][data.columns[0]], y=sar, name='SAR'))

fig.update_layout(xaxis_rangeslider_visible=False)
fig.show()