# 目的
実装された`TemporalAnalyzer`と`Analyzer`が同じ結果を返すことを確認する。

In [1]:
import glob
from itertools import combinations
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import time

import sys
sys.path.append("../pybitcoin/gui/utils/")

from Analyzer import TemporalAnalyzer, Analyzer

%matplotlib inline

In [2]:
for line in TemporalAnalyzer.__doc__.split("\n"):
    print(line)

TemporalAnalyzer(object)
    
    This class offers functions to calculate existent technical indices at the latest time.
    In coming future, this offers `my functions`.

    Examples
    --------
    >>> import numpy as np
    >>> import pandas as pd
    >>> import matplotlib.pyplot as plt
    >>> analyzer = TemporalAnalyzer()
    >>> timestamp = np.arange(1, 100)
    >>> ohlc = []
    >>> close = []
    >>> ema = []
    >>> N = 3
    >>> alpha = 2. / (1. + N)
    >>> for ii in range(len(timestamp)):
    ...     buff = np.zeros(5, dtype=int)
    ...     buff[0] = ii + 1
    ...     buff[1:] = np.random.randint(100, 150, 4)
    ...     ohlc.append(buff)
    ...     close.append(buff[-1])
    ...     ema.append(analyzer.calcEMA(ema, close, alpha))
    ...
    >>> plt.plot(timestamp, close)
    >>> plt.plot(timestamp, ema)
    


In [3]:
for line in Analyzer.__doc__.split("\n"):
    print(line)

Analyzer(object)
    
    This class offers functions to calculate existent technical indices.
    In coming future, this offers `my functions`.

    Examples
    --------
    >>> import numpy as np
    >>> import pandas as pd
    >>> import matplotlib.pyplot as plt
    >>> analyzer = Analyzer()
    >>> timestamp = np.arange(1, 100)
    >>> ohlc = []
    >>> close = []
    >>> N = 3
    >>> alpha = 2. / (1. + N)
    >>> for ii in range(len(timestamp)):
    ...     buff = np.zeros(5, dtype=int)
    ...     buff[0] = ii + 1
    ...     buff[1:] = np.random.randint(100, 150, 4)
    ...     ohlc.append(buff)
    ...     close.append(buff[-1])
    ...
    >>> ema = analyzer.calcEMA(close, alpha)
    >>> plt.plot(timestamp, close)
    >>> plt.plot(timestamp, ema)
    


In [4]:
file_list = glob.glob("../pybitcoin/gui/data/ohlcv/OHLCV*.csv")
data = None
for fpath in file_list:
    if data is None:
        data = pd.read_csv(fpath, index_col=0)
    else:
        data = pd.concat((data, pd.read_csv(fpath, index_col=0)))

In [5]:
len(data)

60481

In [6]:
tohlc = data[["time", "open", "high", "low", "close"]].values

In [7]:
tmpana = TemporalAnalyzer()
ana = Analyzer()
length = 1000

## calcOcUpDown, calcDec

In [8]:
N = 4
st = time.time()
oc_ud1 = []
dec1 = []
for ii in range(length):
    oc_ud1.append(tmpana.calcOcUpDown(tohlc[:ii+1]))
    dec1.append(tmpana.calcDec(oc_ud1, N))
print("TemporalAnalyzer: {0:.3f}sec".format(time.time() - st))

st = time.time()
oc_ud2 = ana.calcOcUpDown(tohlc[:length])
dec2 = ana.calcDec(oc_ud2, N)
print("Analyzer: {0:.3f}sec".format(time.time() - st))

print(np.sum([a - b for a, b in zip(oc_ud1, oc_ud2)]), np.sum([a - b for a, b in zip(dec1, dec2)]))

TemporalAnalyzer: 0.026sec
Analyzer: 0.011sec
0 0


## calcSMA

In [9]:
N = 4
st = time.time()
sma1 = []
for ii in range(length):
    sma1.append(tmpana.calcSMA(tohlc[:ii+1, -1], N))
print("TemporalAnalyzer: {0:.3f}sec".format(time.time() - st))

st = time.time()
sma2 = ana.calcSMA(tohlc[:length, -1], N)
print("Analyzer: {0:.3f}sec".format(time.time() - st))

print(np.sum([a - b for a, b in zip(sma1, sma2)]))

TemporalAnalyzer: 0.051sec
Analyzer: 0.050sec
0.0


## calcEMA

In [10]:
N = 4
alpha = 2. / (1. + N)
st = time.time()
ema1 = []
for ii in range(length):
    ema1.append(tmpana.calcEMA(ema1, tohlc[:ii+1, -1], alpha))
print("TemporalAnalyzer: {0:.3f}sec".format(time.time() - st))

st = time.time()
ema2 = ana.calcEMA(tohlc[:length, -1], alpha)
print("Analyzer: {0:.3f}sec".format(time.time() - st))

print(np.sum([a - b for a, b in zip(ema1, ema2)]))

TemporalAnalyzer: 0.009sec
Analyzer: 0.002sec
0.0


## calcMACD, calcMACDSignal

In [11]:
N1 = 7
N2 = 13
N_macd = 14

alpha1 = 2. / (1. + N1)
alpha2 = 2. / (1. + N2)
alpha_macd = 2. / (1. + N_macd)

st = time.time()
ema1 = []
ema2 = []
macd1 = []
signal1 = []
for ii in range(length):
    ema1.append(tmpana.calcEMA(ema1, tohlc[:ii+1, -1], alpha1))
    ema2.append(tmpana.calcEMA(ema2, tohlc[:ii+1, -1], alpha2))
    macd1.append(tmpana.calcMACD(ema1, ema2))
    signal1.append(tmpana.calcMACDSignal(signal1, macd1, alpha_macd))
print("TemporalAnalyzer: {0:.3f}sec".format(time.time() - st))

st = time.time()
ema1 = ana.calcEMA(tohlc[:length, -1], alpha1)
ema2 = ana.calcEMA(tohlc[:length, -1], alpha2)
macd2 = ana.calcMACD(ema1, ema2)
signal2 = ana.calcMACDSignal(macd2, alpha_macd)
print("Analyzer: {0:.3f}sec".format(time.time() - st))

print(np.sum([a - b for a, b in zip(macd1, macd2)]), np.sum([a - b for a, b in zip(signal1, signal2)]))

TemporalAnalyzer: 0.022sec
Analyzer: 0.009sec
0.0 0.0


## calcBuyingPressure

In [12]:
st = time.time()
bp1 = []
for ii in range(length):
    bp1.append(tmpana.calcBuyingPressure(tohlc[:ii+1]))
print("TemporalAnalyzer: {0:.3f}sec".format(time.time() - st))

st = time.time()
bp2 = ana.calcBuyingPressure(tohlc[:length])
print("Analyzer: {0:.3f}sec".format(time.time() - st))

print(np.sum([a - b for a, b in zip(bp1, bp2)]))

TemporalAnalyzer: 0.010sec
Analyzer: 0.013sec
0.0


## calcTrueRange, calcAverageTrueRange

In [13]:
N = 14
alpha = 2. / (1. + N)

st = time.time()
tr1 = []
atr1 = []
for ii in range(length):
    tr1.append(tmpana.calcTrueRange(tohlc[:ii+1]))
    atr1.append(tmpana.calcAverageTrueRange(atr1, tr1, alpha))
print("TemporalAnalyzer: {0:.3f}sec".format(time.time() - st))

st = time.time()
tr2 = ana.calcTrueRange(tohlc[:length])
atr2 = ana.calcAverageTrueRange(tr2, alpha)
print("Analyzer: {0:.3f}sec".format(time.time() - st))

print(np.sum([a - b for a, b in zip(tr1, tr2)]), np.sum([a - b for a, b in zip(atr1, atr2)]))

TemporalAnalyzer: 0.022sec
Analyzer: 0.028sec
0.0 0.0


## ADX-related functions
* calcDMPlus, calcDMMinus
* calcDMPlusEMA, calcDMMinusEMA
* calcDIPlus, calcDIMinus
* calcDX, calcADX

In [14]:
N = 14
alpha = 2. / (1. + N)

st = time.time()
tr1 = []
atr1 = []
dmp1 = []
dmm1 = []
dmp1_ema = []
dmm1_ema = []
dip1 = []
dim1 = []
dx1 = []
adx1 = []
for ii in range(length):
    tr1.append(tmpana.calcTrueRange(tohlc[:ii+1]))
    atr1.append(tmpana.calcAverageTrueRange(atr1, tr1, alpha))
    dmp1.append(tmpana.calcDMPlus(tohlc[:ii+1]))
    dmm1.append(tmpana.calcDMMinus(tohlc[:ii+1]))
    dmp1_ema.append(tmpana.calcDMPlusEMA(dmp1_ema, dmp1, alpha))
    dmm1_ema.append(tmpana.calcDMPlusEMA(dmm1_ema, dmm1, alpha))
    dip1.append(tmpana.calcDIPlus(dmp1_ema, atr1))
    dim1.append(tmpana.calcDIMinus(dmm1_ema, atr1))
    dx1.append(tmpana.calcDX(dip1, dim1))
    adx1.append(tmpana.calcADX(adx1, dx1, alpha))
print("TemporalAnalyzer: {0:.3f}sec".format(time.time() - st))

st = time.time()
tr2 = ana.calcTrueRange(tohlc[:length])
atr2 = ana.calcAverageTrueRange(tr2, alpha)
dmp2 = ana.calcDMPlus(tohlc[:length])
dmm2 = ana.calcDMMinus(tohlc[:length])
dmp2_ema = ana.calcDMPlusEMA(dmp2, alpha)
dmm2_ema = ana.calcDMMinusEMA(dmm2, alpha)
dip2 = ana.calcDIPlus(dmp2_ema, atr2)
dim2 = ana.calcDIMinus(dmm2_ema, atr2)
dx2 = ana.calcDX(dip2, dim2)
adx2 = ana.calcADX(dx2, alpha)
print("Analyzer: {0:.3f}sec".format(time.time() - st))

print(np.sum([a - b for a, b in zip(dmp1, dmp2)]))
print(np.sum([a - b for a, b in zip(dmm1, dmm2)]))
print(np.sum([a - b for a, b in zip(dmp1_ema, dmp2_ema)]))
print(np.sum([a - b for a, b in zip(dmm1_ema, dmm2_ema)]))
print(np.sum([a - b for a, b in zip(dip1, dip2)]))
print(np.sum([a - b for a, b in zip(dim1, dim2)]))
print(np.sum([a - b for a, b in zip(dx1, dx2)]))
print(np.sum([a - b for a, b in zip(adx1, adx2)]))

TemporalAnalyzer: 0.088sec
Analyzer: 0.051sec
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0


## calcBollingerBands
* calcRMSError
* calcBollingerBands

In [15]:
N1 = 7
N_sigma = 14

alpha1 = 2. / (1. + N1)

st = time.time()
ema1 = []
sigma1 = []
m3band1 = []
m2band1 = []
m1band1 = []
p1band1 = []
p2band1 = []
p3band1 = []
for ii in range(length):
    ema1.append(tmpana.calcEMA(ema1, tohlc[:ii+1, -1], alpha1))
    sigma1.append(tmpana.calcRMSError(tohlc[:ii+1, -1], ema1, N_sigma))
    buff = tmpana.calcBollingerBands(tohlc[:ii+1, -1], ema1, N_sigma)
    m3band1.append(buff[0])
    m2band1.append(buff[1])
    m1band1.append(buff[2])
    p1band1.append(buff[4])
    p2band1.append(buff[5])
    p3band1.append(buff[6])
print("TemporalAnalyzer: {0:.3f}sec".format(time.time() - st))

st = time.time()
ema1 = ana.calcEMA(tohlc[:length, -1], alpha1)
sigma2 = ana.calcRMSError(tohlc[:length, -1], ema1, N_sigma)
m3band2, m2band2, m1band2, _, p1band2, p2band2, p3band2 = ana.calcBollingerBands(tohlc[:length, -1], ema1, N_sigma)
print("Analyzer: {0:.3f}sec".format(time.time() - st))
print(np.sum([a - b for a, b in zip(sigma1, sigma2)]))
print(np.sum([a - b for a, b in zip(m3band1, m3band2)]))
print(np.sum([a - b for a, b in zip(m2band1, m2band2)]))
print(np.sum([a - b for a, b in zip(m1band1, m1band2)]))
print(np.sum([a - b for a, b in zip(p1band1, p1band2)]))
print(np.sum([a - b for a, b in zip(p2band1, p2band2)]))
print(np.sum([a - b for a, b in zip(p3band1, p3band2)]))

TemporalAnalyzer: 0.332sec
Analyzer: 0.318sec
0.0
0.0
0.0
0.0
0.0
0.0
0.0


## Momentum-related functions
* calcMomentum
* calcROC1
* calcROC2

In [16]:
N = 5

st = time.time()
M1 = []
roc11 = []
roc21 = []
for ii in range(length):
    M1.append(tmpana.calcMomentum(tohlc[:ii+1], N))
    roc11.append(tmpana.calcROC1(tohlc[:ii+1], N))
    roc21.append(tmpana.calcROC2(tohlc[:ii+1], N))
print("TemporalAnalyzer: {0:.3f}sec".format(time.time() - st))

st = time.time()
M2 = ana.calcMomentum(tohlc[:length], N)
roc12 = ana.calcROC1(tohlc[:length], N)
roc22 = ana.calcROC2(tohlc[:length], N)
print("Analyzer: {0:.3f}sec".format(time.time() - st))

print(np.sum([a - b for a, b in zip(M1, M2)]))
print(np.sum([a - b for a, b in zip(roc11, roc12)]))
print(np.sum([a - b for a, b in zip(roc21, roc22)]))

TemporalAnalyzer: 0.041sec
Analyzer: 0.019sec
0.0
0.0
0.0


## RSI-related functions
* calcOCDownEMA
* calcOCUpEMA
* calcRSI

In [17]:
N = 5

alpha = 2. / (1. + N)

st = time.time()
oc_down_ema1 = []
oc_up_ema1 = []
rsi1 = []
for ii in range(length):
    oc_down_ema1.append(tmpana.calcOCDownEMA(oc_down_ema1, tohlc[:ii+1], alpha))
    oc_up_ema1.append(tmpana.calcOCUpEMA(oc_up_ema1, tohlc[:ii+1], alpha))
    rsi1.append(tmpana.calcRSI(oc_down_ema1, oc_up_ema1))
print("TemporalAnalyzer: {0:.3f}sec".format(time.time() - st))

st = time.time()
oc_down_ema2 = ana.calcOCDownEMA(tohlc[:length], alpha)
oc_up_ema2 = ana.calcOCUpEMA(tohlc[:length], alpha)
rsi2 = ana.calcRSI(oc_down_ema2, oc_up_ema2)
print("Analyzer: {0:.3f}sec".format(time.time() - st))

print(np.sum([a - b for a, b in zip(oc_down_ema1, oc_down_ema2)]))
print(np.sum([a - b for a, b in zip(oc_up_ema1, oc_up_ema2)]))
print(np.sum([a - b for a, b in zip(rsi1, rsi2)]))

TemporalAnalyzer: 0.051sec
Analyzer: 0.016sec
0.0
0.0
0.0


## calcW%R

In [18]:
N = 14

st = time.time()
wpr1 = []
for ii in range(length):
    wpr1.append(tmpana.calcWPercentR(tohlc[:ii+1], N))
print("TemporalAnalyzer: {0:.3f}sec".format(time.time() - st))

st = time.time()
wpr2 = ana.calcWPercentR(tohlc[:length], N)
print("Analyzer: {0:.3f}sec".format(time.time() - st))

print(np.sum([a - b for a, b in zip(wpr1, wpr2)]))

TemporalAnalyzer: 0.099sec
Analyzer: 0.063sec
0.0


## calcRatioOfDeviation

In [19]:
N = 5
alpha = 2. / (1. + N)

st = time.time()
ema1 = []
rod1 = []
for ii in range(length):
    ema1.append(tmpana.calcEMA(ema1, tohlc[:ii+1, -1], alpha))
    rod1.append(tmpana.calcRatioOfDeviation(ema1, tohlc[:ii+1, -1]))
print("TemporalAnalyzer: {0:.3f}sec".format(time.time() - st))

st = time.time()
ema2 = ana.calcEMA(tohlc[:length, -1], alpha)
rod2 = ana.calcRatioOfDeviation(ema2, tohlc[:length, -1])
print("Analyzer: {0:.3f}sec".format(time.time() - st))

print(np.sum([a - b for a, b in zip(rod1, rod2)]))

TemporalAnalyzer: 0.030sec
Analyzer: 0.005sec
0.0


## calcUltimateOscillator

In [20]:
N1 = 7
N2 = 14
N3 = 28

st = time.time()
bp1 = []
tr1 = []
uo1 = []
for ii in range(length):
    bp1.append(tmpana.calcBuyingPressure(tohlc[:ii+1]))
    tr1.append(tmpana.calcTrueRange(tohlc[:ii+1]))
    uo1.append(tmpana.calcUltimateOscillator(bp1, tr1, N1, N2, N3))
print("TemporalAnalyzer: {0:.3f}sec".format(time.time() - st))

st = time.time()
bp2 = ana.calcBuyingPressure(tohlc[:length])
tr2 = ana.calcTrueRange(tohlc[:length])
uo2 = ana.calcUltimateOscillator(bp2, tr2, N1, N2, N3)
print("Analyzer: {0:.3f}sec".format(time.time() - st))

print(np.sum([a - b for a, b in zip(uo1, uo2)]))

TemporalAnalyzer: 0.408sec
Analyzer: 0.400sec
0.0


## calcAwesomeOscillator

In [21]:
N1 = 5
N2 = 34

st = time.time()
ao1 = []
for ii in range(length):
    ao1.append(tmpana.calcAwesomeOscillator(tohlc[:ii+1], N1, N2))
print("TemporalAnalyzer: {0:.3f}sec".format(time.time() - st))

st = time.time()
ao2 = ana.calcAwesomeOscillator(tohlc[:length], N1, N2)
print("Analyzer: {0:.3f}sec".format(time.time() - st))

print(np.sum([a - b for a, b in zip(ao1, ao2)]))

TemporalAnalyzer: 0.341sec
Analyzer: 0.145sec
0.0
