# Summary of Cycle Indicators
## Derivative Department, Egret Asset
### Jan. 6th - April. 5th
李心怡

In [3]:
import numpy as np

## I. Filters
First we define some filters that might be useful in understanding indicators. (Although some are just for completeness, and ware never used by any indicators here.)
### Simple Moving Average (4 period)

In [4]:
def sma4(series):
    """
    4 term simple moving average
    :param series: (np.array) price
    :return: (np.array) smoothed price
    """
    newseries = (series + 2 * np.roll(series, 1) + 2 * np.roll(series, 2)
                 + np.roll(series, 3)) / 6
    newseries[:3] = series[:3]
    return newseries

### Exponencial Moving Average
cutoff $(0, +\infty)$: for wave that has a period less than the cutoff period (i.e higher frequency part), its power is lessened to 1/2 or more.

In [5]:
def ema(series, cutoff):
    """
    exponential moving average
    alpha = 1/(lag+1)
    :param series: (np.array) price
    :param cutoff: (float) cutoff period of the filter
    :return: (np.array) filtered price
    """
    K = 1
    alpha = 1 + (np.sin(2 * np.pi * K / cutoff) - 1) / np.cos(2 * np.pi * K / cutoff)
    for i in range(1, series.shape[0]):
        series[i] = alpha * series[i] \
                    + (1 - alpha) * series[i - 1]
    return series

### Regularized EMA
cutoff $(0, +\infty)$: for wave that has a period less than the cutoff period (i.e higher frequency part), its power is lessened to 1/2 or more.

In [6]:
def regularized_ema(series, cutoff):
    """
    add an additional penalty term to enhance filter effect while introducing no more lag
    :param series: (np.array) price
    :param cutoff: (float) cutoff period of the filter
    :return: (np.array) filtered price
    """
    K = 1
    alpha = 1 + (np.sin(2 * np.pi * K / cutoff) - 1) / np.cos(2 * np.pi * K / cutoff)
    l = np.exp(0.16 / alpha)
    newseries = np.copy(series)
    for i in range(2, series.shape[0]):
        newseries[i] = alpha / (1 + l) * series[i] \
                       + (1 - alpha - 2 * l) / (1 + l) * newseries[i - 1] - l / (l + 1) * newseries[i - 2]
    return newseries

### Lowpass Filter 2 poles
cutoff $(0, +\infty)$: for wave that has a period less than the cutoff period (i.e higher frequency part), its power is lessened to 1/2 or more.

In [7]:
def lowpass2pole(series, cutoff):
    """
    2 pole low-pass filter
    :param series: (np.array) price
    :param cutoff: (float) cutoff period of the filter
    :return: (np.array) filtered price
    """
    K = 1.414
    alpha = 1 + (np.sin(2 * np.pi * K / cutoff) - 1) / np.cos(2 * np.pi * K / cutoff)
    for i in range(2, series.shape[0]):
        series[i] = alpha ** 2 * series[i] \
                    + 2 * (1 - alpha) * series[i - 1] + (1 - alpha) ** 2 * series[i - 2]
    return series

### Decycler
= 1 - highpass

In [8]:
def decycler(series, cutoff):
    """
    subtract high frequency component from the original series to decycle
    :param series: (np.array) price
    :param cutoff: (float) cutoff period of the filter
    :return: (np.array) decycled series
    """
    K = 1
    alpha = 1 + (np.sin(2 * np.pi * K / cutoff) - 1) / np.cos(2 * np.pi * K / cutoff)
    newseries = np.copy(series)
    for i in range(1, series.shape[0]):
        newseries[i] = alpha / 2 * (series[i] + series[i - 1]) \
                       + (1 - alpha) * newseries[i - 1]
    return newseries

### Highpass Filter
cutoff $(0, +\infty)$: for wave that has a period greater than the cutoff period (i.e lower frequency part), its power is lessened to 1/2 or more.

In [9]:
def highpass(series, cutoff):
    """
    (1 pole) high-pass filter
    :param series: (np.array) price
    :param cutoff: (float) cutoff period of the filter
    :return: (np.array) filtered price
    """
    K = 1
    alpha = 1 + (np.sin(2 * np.pi * K / cutoff) - 1) / np.cos(2 * np.pi * K / cutoff)
    newseries = np.copy(series)
    for i in range(1, series.shape[0]):
        newseries[i] = (1 - alpha / 2) * series[i] - (1 - alpha / 2) * series[i - 1] \
                       + (1 - alpha) * newseries[i - 1]
    return newseries

### Highpass Filter 2 poles
cutoff $(0, +\infty)$: for wave that has a period greater than the cutoff period (i.e lower frequency part), its power is lessened to 1/2 or more.

In [10]:
def highpass2pole(series, cutoff):
    """
    2 pole high-pass filter
    :param series: (np.array) price
    :param cutoff: (float) cutoff period of the filter
    :return: (np.array) filtered price
    """
    K = 0.707
    alpha = 1 + (np.sin(2 * np.pi * K / cutoff) - 1) / np.cos(2 * np.pi * K / cutoff)
    newseries = np.copy(series)
    for i in range(2, series.shape[0]):
        newseries[i] = (1 - alpha / 2) ** 2 * series[i] \
                       - 2 * (1 - alpha / 2) ** 2 * series[i - 1] \
                       + (1 - alpha / 2) ** 2 * series[i - 2] \
                       + 2 * (1 - alpha) * newseries[i - 1] - (1 - alpha) ** 2 * newseries[i - 2]
    return newseries

### Super Smoother 2 poles
cutoff $(0, +\infty)$: for wave that has a period less than the cutoff period (i.e higher frequency part), its power is lessened to 1/2 or more.

In [11]:
def supersmoother2pole(series, cutoff):
    """
    simplified 2 pole butterworth smoother
    :param series: (np.array) price
    :param cutoff: (float) cutoff period of the filter
    :return: (np.array) smoothed price
    """
    a = np.exp(-1.414 * np.pi / cutoff)
    b = 2 * a * np.cos(1.414 * np.pi / cutoff)
    newseries = np.copy(series)
    for i in range(2, series.shape[0]):
        newseries[i] = (1 + a ** 2 - b) / 2 * (series[i] + series[i - 1]) \
                       + b * newseries[i - 1] - a ** 2 * newseries[i - 2]
    return newseries

### Super Smoother 3 poles
cutoff $(0, +\infty)$: for wave that has a period less than the cutoff period (i.e higher frequency part), its power is lessened to 1/2 or more.

In [12]:
def supersmoother3pole(series, cutoff):
    """
    simplified 3 pole butterworth smoother
    :param series: (np.array) price
    :param cutoff: (float) cutoff period of the filter
    :return: (np.array) smoothed price
    """
    a = np.exp(-np.pi / cutoff)
    b = 2 * a * np.cos(1.738 * np.pi / cutoff)
    c = a ** 2
    newseries = np.copy(series)
    for i in range(3, series.shape[0]):
        newseries[i] = (1 - c ** 2 - b + b * c) * series[i] \
                       + (b + c) * newseries[i - 1] + (-c - b * c) * newseries[i - 2] + (c ** 2) * newseries[i - 3]
    return newseries

### Allpass filter

In [13]:
def allpass(series, alpha):
    """
    allpass filter (used in laguerre filter)
    :param series: (np.array) price
    :param alpha: (float) damping factor
    :return: (np.array) filtered price
    """
    newseries = np.copy(series)
    for i in range(1, series.shape[0]):
        newseries[i] = -alpha * series[i] + series[i - 1] \
                       + alpha * newseries[i - 1]
    return newseries

### Laguerre filter

In [14]:
def laguerre(series, gamma):
    """
    Laguerre filter
    :param series: (np.array) price
    :param gamma: (float) damping factor
    :return: (np.array) filtered price
    """
    l0 = ema(series, gamma)
    l1 = allpass(l0, gamma)
    l2 = allpass(l1, gamma)
    l3 = allpass(l2, gamma)
    return l0, l1, l2, l3

### Roofing Filter

In [15]:
def roofing(series, cutoff_hp, cutoff_lp):
    """
    roofing filter
    :param series: (np.array) price
    :param cutoff_hp: (np.array) cutoff period for highpass filter
    :param cutoff_lp: (np.array) cutoff period for lowpass filter
    :return:
    """
    hp = highpass2pole(series, cutoff_hp)
    newseries = supersmoother3pole(hp, cutoff_lp)
    return newseries

### Adaptive highpass filter 2 poles
This filter is written for adaptive methods. It takes a series of lag rather than one constant lag as parameter. This input lag series is computed with other methods which approximate the dominant cycle period of the data, such as Hilbert Transformation method for computing period.

In [16]:
def ad_highpass2pole(series, lag):
    """
    2 pole adaptive high-pass filter (variable cutoff period)
    :param series: (np.array) price
    :param lag: (np.array) lag of the filter
    :return: (np.array) filtered price
    """
    alpha = 1 / (1 + lag)
    newseries = np.copy(series)
    for i in range(2, series.shape[0]):
        newseries[i] = (1 - alpha[i] / 2) ** 2 * series[i] \
                       - 2 * (1 - alpha[i] / 2) ** 2 * series[i - 1] \
                       + (1 - alpha[i] / 2) ** 2 * series[i - 2] \
                       + 2 * (1 - alpha[i]) * newseries[i - 1] - (1 - alpha[i]) ** 2 * newseries[i - 2]
    return newseries

## II. Indicator Transformer
### Stochasticize

In [17]:
def stoch(series, period):
    """
    stochaticize the indicator
    :param series: (np.array) indicator or price
    :param period: (int) window length
    :return: (np.array) series of value within [0,1]
    """
    df = pd.Series(series)
    df_max = df.rolling(period, min_periods=1).max()
    df_min = df.rolling(period, min_periods=1).min()
    series = (df - df_min) / (df_max - df_min)
    return series.to_numpy()

### Fisher Transformer
normalize indicators so that it satisfies statistic inference of normally distributed data

period $(0, +\infty)$: window length for applying fisher transformation and stochasticize

stoch_time $(0, +\infty)$: number of time applying stochastic transformation

In [18]:
def fisher(series, period, stoch_time):
    """
    fisher transformer
    :param series: (np.array) indicator or price
    :param period: (int) window length
    :param stoch_time: (int) number of time applying stochastic transformation
    :return: (np.array) normalized series satisfying statistic inference of normally distributed data
    """
    # stochaticize
    for i in range(stoch_time):
        series = stoch(series, period)
    # transform data to [-0.9999,0.9999]
    series = 2 * (series - 0.5)
    for i in range(series.shape[0]):
        series[i] = max(-0.9999, min(0.9999, series[i]))
    # apply fisher transformation
    series = np.log((1 + series) / (1 - series)) / 2
    return series

## III. Method for computing period
According to my observation, this approximation is not a very good one and is rather unstable. So the indicators that based on it -- Adaptive Momentum, Adaptive CCI etc. -- are also unreliable. 

In [19]:
def hilbert(series):
    """
    Hilbert transformation
    :param series: (np.array) price
    :return: (np.array) InPhase and Quadrature term
    """
    Q = 0.0962 * series + 0.5796 * np.roll(series, 2) \
        - 0.5796 * np.roll(series, 4) - 0.0962 * np.roll(series, 6)
    Q[0:6] = 0
    I = np.roll(series, 3)
    I[0:3] = 0
    return Q, I


def compute_period(series, cutoff):
    """
    use hilbert transformation to compute period for adaptive methods
    :param series: (np.array) price
    :param cutoff: (np.array) cutoff period for highpass filter
    :return: (np.array) period and cycle
    """
    smooth = sma4(series)
    cycle = highpass2pole(smooth, cutoff)
    for i in range(2, 7):
        cycle[i] = (series[i] - 2 * series[i - 1] + series[i - 2]) / 4
    delta_phase = np.zeros_like(series)
    inst_period = np.zeros_like(series)
    period = np.zeros_like(series)
    Q, I = hilbert(cycle)
    for i in range(6, series.shape[0]):
        Q[i] *= 0.5 + 0.08 * inst_period[i - 1]
        if Q[i] != 0 and Q[i - 1] != 0:
            delta_phase[i] = (I[i] / Q[i] - I[i - 1] / Q[i - 1]) / (1 + I[i] * I[i - 1] / (Q[i] * Q[i - 1]))
            delta_phase[i] = max(0.1, min(1.1, delta_phase[i]))
        median_delta = np.median(delta_phase[i - 4:i + 1])
        if median_delta == 0:
            DC = 15
        else:
            DC = 6.29318 / median_delta + 0.5
        inst_period[i] = 0.33 * DC + 0.67 * inst_period[i - 1]
        period[i] = 0.15 * inst_period[i] + 0.85 * period[i - 1]
    return period, cycle

## IV. Trend Indicator
### Instantaneous Trendline

1) **method**
    
Use a highpass (2 poles) filter to smooth the price series.

Note: The first 7 price is computed separately by a SMA4 to reduce the influence of lack of samples at the beginning of the series, however in our case it is rather unnecessary because ① we have a initial period for contract to avoid this influence and ② the number 7 needs to be tailored for our situation as the book uses daily price input.
    
2) **parameter range**
    
* cutoff $(0, +\infty)$: cutoff period of hp
    
3) **trading strategy**
    
$trigger = 2 * trend - lag\_trend$

Buy when trigger is above trend, short otherwise   

In [20]:
def i_trend(series, cutoff):
    """
    instantaneous trendline
    :param series: (np.array) price
    :param cutoff: (float) cutoff period of the hp
    :return: (np.array) trend and its trigger
    """
    # compute inst trend
    K = 0.707
    alpha = 1 + (np.sin(2 * np.pi * K / cutoff) - 1) / np.cos(2 * np.pi * K / cutoff)
    it = np.copy(series)
    for i in range(2, 7):
        it[i] = (series[i] + 2 * series[i - 1] + series[i - 2]) / 4
    for i in range(7, series.shape[0]):
        it[i] = (alpha - alpha ** 2 / 4) * series[i] \
                + alpha ** 2 / 2 * series[i - 1] \
                - (alpha - alpha ** 2 * 3 / 4) * series[i - 2] \
                + 2 * (1 - alpha) * it[i - 1] - (1 - alpha) ** 2 * it[i - 2]

    # compute lead 2 trigger & signal
    lag2 = np.roll(it, 20)
    lag2[:20] = it[:20]
    trigger = 2 * it - lag2
    return it, trigger


### Smoothed Adaptive Monmentum Indicator

1) **method**

compare the price in the current cycle with that in the previous cycle (same phase) to indicate an uptrend or downtrend

2) **parameter range**
    
* cutoff_period $(0, +\infty)$: the cutoff period used to compute the period using Hilbert Transformation

* cutoff_signal $(0, +\infty)$: the cutoff period used to smooth the signal
    
3) **trading strategy**
    
Buy when the smoothed signal is above 0 (in a uptrend), short otherwise. 

In [21]:
def ad_momentum(series, cutoff_period, cutoff_signal):
    """
    smoothed adaptive momentum indicator
    compare the price in the current cycle with that in the previous cycle (same phase)
    to indicate an uptrend or downtrend
    :param series: (np.array) price
    :param cutoff_period: (float) the cutoff period used to compute the period using Hilbert Transformation
    :param cutoff_signal: (float) the cutoff period used to smooth the signal
    :return: (np.array) momentum
    """
    period, _ = compute_period(series, cutoff_period)
    period = period.astype(np.int)
    momen = np.zeros_like(series)
    for i in range(series.shape[0]):
        if (i - period[i]) >= 0:
            momen[i] = series[i] - series[i - period[i]]
    momen = supersmoother3pole(momen, cutoff_signal)
    return momen

## V. Oscillator
### Decycler Oscillator

1) **method**
    
take the difference of 2 decyclers (highpass filters) with different cutoff to get the cycle
    
2) **parameter range**
    
* cutoff1 $(0, +\infty)$: cutoff period of the first highpass filter

* times $(1, +\infty)$: $\frac{\text{cutoff period of the second highpass filter}}{\text{cutoff period of the first highpass filter}}$

3) **trading strategy**
    
Buy when above 0 (in a uptrend), short otherwise.    

In [22]:
def decycler_oscillator(series, cutoff1, times):
    """
    decycler oscillator
    take the difference of 2 decyclers with different cutoff
    :param series: (np.array) price
    :param cutoff1: (float) the smaller cutoff period
    :param times: (float, >1) larger cutoff / smaller cutoff
    :return: (np.array) indicator
    """
    cutoff2 = cutoff1 * times
    hp1 = highpass(series, cutoff1)
    hp2 = highpass(series, cutoff2)
    delta_hp = hp2 - hp1
    # >0: uptrend, <0: downtrend
    return delta_hp

### Bandpass Filter

1) **method**
    
* use a highpass filter to avoid spectral dilation

* pass a bandpass filter

* use "fast attack-slow decay" approach to normalize the indicator to $[-1, 1]$
    
2) **parameter range**
    
* cycle $(0, +\infty)$: center of the bandpass period

* bandwidth $(0, 2)$: $\frac{\text{bandwidth (in period)}}{\text{cycle}}$

3) **trading strategy**
    
Trigger line is created by pass the bandpass result through another highpass filter.

Buy when the trigger crosses over the bandpass result, short when crosses under.

In [23]:
def bandpass(series, cycle, bandwidth):
    """
    bandpass filter
    :param series: (np.array) price
    :param cycle: (float) cycle period
    :param bandwidth: (float, >0 <2) length between left and right cutoffs / cycle period
    :return: (np.array) indicator
    """
    # pass a HP to avoid spectral dilation of BP
    hp = highpass(series, 4 * cycle / bandwidth)
    # bandpass filter
    lmd = np.cos(2 * np.pi / cycle)
    gamma = np.cos(2 * np.pi * bandwidth / cycle)
    sigma = 1 / gamma - np.sqrt(1 / gamma ** 2 - 1)
    bp = np.copy(hp)
    for i in range(2, series.shape[0]):
        bp[i] = (1 - sigma) / 2 * hp[i] - (1 - sigma) / 2 * hp[i - 2] \
                + lmd * (1 + sigma) * bp[i - 1] - sigma * bp[i - 2]
    # fast attack-slow decay AGC
    K = 0.991
    peak = np.copy(bp)
    for i in range(series.shape[0]):
        if i > 0:
            peak[i] = peak[i - 1] * K
        if abs(bp[i]) > peak[i]:
            peak[i] = abs(bp[i])
    bp_normalized = bp / peak
    bp_normalized[np.isnan(bp_normalized)] = 0
    # trigger(lead) & signal
    trigger = highpass(bp_normalized, cycle / bandwidth / 1.5)
    return bp, bp_normalized, trigger

### Cyber Cycle Index

1) **method**
    
* smooth the price with SMA4

* pass a highpass (2 poles) filter

* smooth the cycle with EMA
    
2) **parameter range**
    
* cutoff1 $(0, +\infty)$: cutoff period for highpass filter

* cutoff2 $(0, +\infty)$: cutoff period for EMA filter

* fperiod, stoch_time: same as fisher transformation parameters

3) **trading strategy**
    
Buy when signal crosses under lag1, sell when signal crosses over lag1.

Also, it requires a 'stop loss' strategy.

In [24]:
def cci(series, cutoff1, cutoff2, fperiod=None, stoch_time=None):
    """
    CCI - cyber cycle index
    delay is less than half a cycle: buy when signal cross under lag1, sell when signal cross over lag1
    need a 'stop loss' strategy, close out when profit<0 and bars since entry > 8(period)
    :param series: (np.array) price
    :param cutoff1: (float) cutoff period for hp
    :param cutoff2: (float) cutoff period for ema
    :param fperiod, stoch_time: (tuple: ((int)period, (int)stoch_time)) fisher transformation parameters
    :return: (np.array) cycle
    """
    # compute the cycle
    smooth = sma4(series)
    cycle = highpass2pole(smooth, cutoff1)
    for t in range(2, 7):
        cycle[t] = (series[t] - 2 * series[t - 1] + series[t - 2]) / 4
    signal = ema(cycle, cutoff2)
    # apply fisher transformation
    if fperiod != None:
        signal = fisher(signal, fperiod, stoch_time)
    return signal

### center of gravity
1) **method**
    
View the price as weight to compute the center of gravity of the filter. If the center of gravity is closer to today relative to today minus the window length, it suggests that price is higher / moving upward recently.
    
2) **parameter range**
    
* length $(0, +\infty)$: window length

* fperiod, stoch_time: same as fisher transformation parameters

3) **trading strategy**
    
Buy when signal crosses over lag1, sell when signal crosses under lag1.

In [25]:
def cg(series, length, fperiod=None, stoch_time=None):
    """
    CG - center of gravity
    view the price as weight to compute the center of gravity of the filter
    :param series: (np.array) price
    :param length: (int) window length
    :param period: (int) fisher transformation parameter
    :param stoch_time: (int) fisher transformation parameter
    :return: (np.array) cg
    """
    # compute cg
    num = np.zeros_like(series)
    denom = np.ones_like(series)
    for i in range(length - 1, series.shape[0]):
        num[i] = np.sum((np.array(range(length)) + 1) * series[i - length + 1:i + 1][::-1])
        denom[i] = np.sum(series[i - length + 1:i + 1])
    cg = -num / denom + (1 + length) / 2
    # apply fisher transformation
    if fperiod:
        cg = fisher(cg, fperiod, stoch_time)
    return cg

### relative vigor index
1) **method**
    
$\text{rvi} = \text{"smoothed"} \ \frac{\text{c - o}}{\text{h - l}}$
    
2) **parameter range**
    
* length $(0, +\infty)$: window length

* fperiod, stoch_time: same as fisher transformation parameters

3) **trading strategy**
    
Buy when signal crosses over lag1, sell when signal crosses under lag1.

In [26]:
def rvi(o, h, l, c, length, fperiod=None, stoch_time=None):
    """
    RVI - relative vigor index
    :param o: (np.array) open
    :param h: (np.array) high
    :param l: (np.array) low
    :param c: (np.array) close
    :param length: length to sum the num & denom
    :param period: (int) fisher transformation parameter
    :param stoch_time: (int) fisher transformation parameter
    :return: (np.array) rvi
    """
    co = c - o
    hl = h - l
    num = sma4(co)
    denom = sma4(hl)
    rvi = np.zeros_like(o)
    for i in range(2 + length, o.shape[0]):
        rvi[i] = np.sum(num[i - length + 1:i + 1]) / np.sum(denom[i - length + 1:i + 1])
    # apply fisher transformation
    if fperiod:
        rvi = fisher(rvi, fperiod, stoch_time)
    return rvi

### relative strength index
The famous RSI. Written here to show its similarity to the 3 cycle indicators above.

In [27]:
def rsi(series, length, fperiod=None, stoch_time=None):
    """
    Relative Strength Index
    :param series: (np.array) price
    :param length: length to sum the num & denom
    :param period: (int) fisher transformation parameter
    :param stoch_time: (int) fisher transformation parameter
    :return: (np.array) rsi
    """
    rsi = ta.RSI(series, length)
    # apply fisher transformation
    if fperiod:
        rsi = fisher(rsi, fperiod, stoch_time)
    return rsi

### Laguerre RSI
1) **method**

Not fully understood :(
    
2) **parameter range**

* gamma $(0, 1)$: allpass filter parameter

* period, stoch_time: same as fisher transformation parameters

3) **trading strategy**
    
Same as RSI.

In [28]:
def laguerre_rsi(series, gamma, fperiod=None, stoch_time=None):
    """
    Laguerre RSI
    :param series: (np.array) price
    :param gamma: (float) damping factor
    :param period: (int) fisher transformation parameter
    :param stoch_time: (int) fisher transformation parameter
    :return: (np.array) rsi
    """
    l0, l1, l2, l3 = laguerre(series, gamma)
    rsi = np.zeros_like(series)
    for i in range(series.shape[0]):
        cu = 0
        cd = 0
        if l1[i] > l0[i]:
            cu += l1[i] - l0[i]
        else:
            cd -= l1[i] - l0[i]
        if l2[i] > l1[i]:
            cu += l2[i] - l1[i]
        else:
            cd -= l2[i] - l1[i]
        if l3[i] > l2[i]:
            cu += l3[i] - l2[i]
        else:
            cd -= l3[i] - l2[i]
        rsi[i] = cu / (cu + cd)
    # apply fisher transformation
    if fperiod:
        rsi = fisher(rsi, fperiod, stoch_time)
    return rsi

Note: The 3 indicators below requires large amounts of computation when the input max_period(max_lag) and min_period(min_lag) are large, as it needs to compute for every period between the max and the min. It can be optimized, though, if we let it memotize the situation it has computed. 

Within the 3 methods, this first one is the prefered one as it incurs no spectral dilation.

### auto-correlation periodogram indicator

1) **parameter range**

* hp_period $(0, +\infty)$

* lp_period $(0, +\infty)$

* average_len $(0, +\infty)$

* max_lag, min_lag $0<\text{min_lag}<\text{max_lag}< +\infty)$

2) **trading strategy**

Buy when signal crosses over lag1, sell when signal crosses under lag1.

In [29]:
def corr_periodogram(series, hp_period, lp_period, average_len, max_lag, min_lag):
    """
    auto-correlation periodogram indicator: a preferred method to compute the dominant cycle
    :param hp_period: highpass period to remove the trend from the original price
    :param lp_period: lowpass period to smooth the original price
    :param average_len: # period to compute auto-correlation
    :param max_lag: max lag of period to compute auto_correlation
    :param min_lag: min lag of period to compute auto_correlation
    :return: (np.array) dominant cycle
    """
    num = np.zeros_like(series)
    denom = np.zeros_like(series)
    for lag in range(min_lag, max_lag):

        rho = corr(series, hp_period, lp_period, average_len, lag)
        cos_part = np.zeros_like(series)
        sin_part = np.zeros_like(series)
        for i in range(max_lag):
            cos_part += np.roll(rho, i) * np.cos(2 * np.pi * i / max_lag)
            sin_part += np.roll(rho, i) * np.sin(2 * np.pi * i / max_lag)
        sqsum = cos_part ** 2 + sin_part ** 2

        for i in range(1, sqsum.shape[0]):
            sqsum[i] = 0.2 * sqsum[i] + 0.8 * sqsum[i - 1]

        K = 0.991
        peak = np.copy(sqsum)
        for i in range(sqsum.shape[0]):
            if i > 0:
                peak[i] = peak[i - 1] * K
            if abs(sqsum[i]) > peak[i]:
                peak[i] = abs(sqsum[i])
        sqsum = sqsum / peak
        sqsum[np.isnan(sqsum)] = 0

        num += (sqsum > 0.5) * sqsum * lag
        denom += (sqsum > 0.5) * sqsum

    dc = num / denom
    dc[np.isnan(dc)] = 0
    return dc

### center of gravity indicator based on discrete Fourier tansformation
1) **parameter range**

* hp_period $(0, +\infty)$

* lp_period $(0, +\infty)$

* max_period, min_period $0<\text{min_period}<\text{max_period}< +\infty)$

2) **trading strategy**

Buy when signal crosses over lag1, sell when signal crosses under lag1.

In [30]:
def dft(series, hp_period, lp_period, dft_period):
    """
    discrete Fourier Transformation
    :param hp_period: highpass period to remove the trend from the original price
    :param lp_period: lowpass period to smooth the original price
    :param dft_period: period to compute Fourier transformation
    :return: (np.array) normalized power
    """
    HP = highpass2pole(series, hp_period)
    filt = supersmoother2pole(HP, lp_period)

    cos_part = np.zeros_like(series)
    sin_part = np.zeros_like(series)
    for i in range(dft_period):
        cos_part += np.roll(filt, i) * np.cos(2 * np.pi * i / dft_period) / dft_period
        sin_part += np.roll(filt, i) * np.sin(2 * np.pi * i / dft_period) / dft_period
    pwr = cos_part ** 2 + sin_part ** 2

    K = 0.991
    peak = np.copy(pwr)
    for i in range(series.shape[0]):
        if i > 0:
            peak[i] = peak[i - 1] * K
        if abs(pwr[i]) > peak[i]:
            peak[i] = abs(pwr[i])
    pwr = pwr / peak
    pwr[np.isnan(pwr)] = 0
    return pwr


def dft_cg(series, hp_period, lp_period, max_period, min_period):
    """
    center of gravity indicator based on discrete Fourier tansformation: indicates dominant cycle
    :param hp_period: highpass period to remove the trend from the original price
    :param lp_period: lowpass period to smooth the original price
    :param max_period: max period to compute Fourier transformation
    :param min_period: min period to compute Fourier transformation
    :return: (np.array) dominant cycle
    """
    num = np.zeros_like(series)
    denom = np.zeros_like(series)
    for period in range(min_period, max_period):
        pwr = dft(series, hp_period, lp_period, period)
        num += (pwr>0.5)*pwr*period
        denom += (pwr>0.5)*pwr
    dc = num/denom
    dc[np.isnan(dc)] = 0
    return dc

### comb filter
1) **parameter range**

* hp_period $(0, +\infty)$

* lp_period $(0, +\infty)$

* max_period, min_period $0<\text{min_period}<\text{max_period}< +\infty)$

* bandwidth $(0, 2)$

2) **trading strategy**

Buy when signal crosses over lag1, sell when signal crosses under lag1.

In [31]:
def comb(series, hp_period, lp_period, max_period, min_period, bandwidth):
    """
    comb filter spectral estimate: compute the dominant cycle
    :param hp_period: highpass period to remove the trend from the original price
    :param lp_period: lowpass period to smooth the original price
    :param max_period: max period to compute bandpass
    :param min_period: min period to compute bandpass
    :param bandwidth: bandwidth for bandpass filter
    :return: (np.array) dominant cycle
    """
    num = np.zeros_like(series)
    denom = np.zeros_like(series)
    HP = highpass2pole(series, hp_period)
    filt = supersmoother2pole(HP, lp_period)
    for period in range(min_period, max_period):

        bp, _, _ = bandpass(filt, period, bandwidth)
        pwr = np.zeros_like(bp)
        for i in range(period):
            pwr += np.roll(bp, i) ** 2 / period ** 2

        K = 0.991
        peak = np.copy(pwr)
        for i in range(pwr.shape[0]):
            if i > 0:
                peak[i] = peak[i - 1] * K
            if abs(pwr[i]) > peak[i]:
                peak[i] = abs(pwr[i])
        pwr = pwr / peak
        pwr[np.isnan(pwr)] = 0

        num += (pwr > 0.5) * pwr * period
        denom += (pwr > 0.5) * pwr

    dc = num / denom
    dc[np.isnan(dc)] = 0
    return dc

## VI. Predictive Method
These indicators has the ability to actually predict the price in the future.
### Hilbert Indicator
1) **method**

Use the Hilbert Transformation to create the real line and imaginary line.
    
2) **parameter range**

* hp_period $(0, +\infty)$

* lp_period $(0, +\infty)$

* smooth_period $(0, +\infty)$

3) **trading strategy**

Buy when the imaginary line crosses above the real line, short otherwise.

In [32]:
def hilbert_indicator(series, hp_period, lp_period, smooth_period):
    """
    hilbert transformation indicator: the real line moves as the original price, while the imaginary line as predictor
    :param series: (np.array) price
    :param hp_period: highpass period to remove the trend from the original price
    :param lp_period: lowpass period to smooth the original price
    :param smooth_period: lowpass period to smooth the imaginary line
    :return: (np.array) the real and imag line
    """
    HP = highpass2pole(series, hp_period)
    filt = supersmoother2pole(HP, lp_period)

    K = 0.991
    peak = np.copy(filt)
    for i in range(series.shape[0]):
        if i > 0:
            peak[i] = peak[i - 1] * K
        if abs(filt[i]) > peak[i]:
            peak[i] = abs(filt[i])
    real = filt / peak
    real[np.isnan(real)] = 0

    quad = real - np.roll(real, 1)
    quad[0] = 0
    K = 0.991
    peak = np.copy(quad)
    for i in range(series.shape[0]):
        if i > 0:
            peak[i] = peak[i - 1] * K
        if abs(quad[i]) > peak[i]:
            peak[i] = abs(quad[i])
    quad = quad / peak
    quad[np.isnan(quad)] = 0

    imag = supersmoother2pole(quad, smooth_period)

    return real, imag

### Sinewave Indicator
1) **method**

Predict the price as a sinewave whose period is changing and is computed using Hilbert Transformation
    
2) **parameter range**

* cutoff_period $(0, +\infty)$: cutoff period for Hilbert Transformation

* lead $(0, 360)$: lead angle for the trigger

3) **trading strategy**

Trigger is the predicted sinewave plus an angle.

Buy when trigger crosses over sinewave, short otherwise.
    

In [33]:
def sinewave(series, cutoff_period, lead=0.25 * np.pi):
    """
    sinewave indicator
    :param series: (np.array) price
    :param cutoff_period: (float) the cutoff period used to compute the period using Hilbert Transformation
    :param lead: (float) lead angle, in radians
    :return: (np.array) sinewave and leadsine
    """
    # compute period
    period, cycle = compute_period(series, cutoff_period)
    dcperiod = period.astype(np.int)
    # compute dominant cycle phase
    real = np.zeros_like(series)
    imag = np.zeros_like(series)
    dcphase = np.zeros_like(series)
    for i in range(series.shape[0]):
        for j in range(dcperiod[i]):
            real[i] += np.sin(2 * np.pi * j / dcperiod[i]) * cycle[i]
            imag[i] += np.cos(2 * np.pi * j / dcperiod[i]) * cycle[i]
        if abs(imag[i] > 0.001):
            dcphase[i] = np.arctan(real[i] / imag[i])
        else:
            dcphase[i] = 0.5 * np.pi * np.sign(real[i])
        dcphase[i] += 0.5 * np.pi
        if imag[i] < 0:
            dcphase[i] += np.pi
        if dcphase[i] > 1.75 * np.pi:
            dcphase[i] -= 2 * np.pi
    # compute sinewave
    sinewave = np.sin(dcphase)
    leadsine = np.sin(dcphase + lead)
    return sinewave, leadsine

### Even Better Sinewave Indicator
1) **method**

* Pass the data in a roofing filter

* smoothed $\frac{\text(wave)}{\text{power}}$
    
2) **parameter range**

* hp_period $(0, +\infty)$

* lp_period $(0, +\infty)$

3) **trading strategy**

Buy when the normalized wave exceeds some upper bound, short when below some lower bound.
    

In [34]:
def better_sinewave(series, hp_period, lp_period):
    """
    the even better sinewave indicator: profits better when the market is in a trend mode
    :param hp_period: highpass period to remove the trend from the original price
    :param lp_period: lowpass period to smooth the original price
    :return: (np.array) sinewave
    """
    HP = highpass(series, hp_period)
    filt = supersmoother2pole(HP, lp_period)
    wave = (filt + np.roll(filt,1) + np.roll(filt,2))/3
    wave[0] = filt[0]
    wave[1] = (filt[0]+filt[1])/2
    pwr = (filt**2 + np.roll(filt,1)**2 + np.roll(filt,2)**2)/3
    pwr[0] = filt[0]**2
    pwr[1] = (filt[0]**2+filt[1]**2)/2
    wave = wave/np.sqrt(pwr)
    wave[np.isnan(wave)] = 0
    return wave

## VII. Adaptive Methods
The word adaptive means that in these indicators, period is viewed as an endogenous variable rather than a constant input parameter. Theoretically, if the indicator with an constant period works well, and if the period is approximated well, the adaptive method should work even better as it allows the period to change.

Unfortunately in our case, as far as I know, neither of the "if" is satisfied.

In [35]:
def ad_cci(series, cutoff_period, cutoff_signal, fperiod=None, stoch_time=None):
    """
    adaptive cyber cycle
    :param series: (np.array) price
    :param cutoff_period: (float) the cutoff period used to compute the period using Hilbert Transformation
    :param cutoff_signal: (float) the cutoff period used to smooth the signal
    :param period: (int) fisher transformation parameter
    :param stoch_time: (int) fisher transformation parameter
    :return: (np.array) cycle
    """
    # compute period
    period, _ = compute_period(series, cutoff_period)
    # compute the cycle
    smooth = sma4(series)
    cycle = ad_highpass2pole(smooth, period)
    for t in range(2, 7):
        cycle[t] = (series[t] - 2 * series[t - 1] + series[t - 2]) / 4
    signal = ema(cycle, cutoff_signal)
    # apply fisher transformation
    if fperiod:
        signal = fisher(signal, fperiod, stoch_time)
    return signal

In [36]:
def ad_cg(series, cutoff_period, fperiod=None, stoch_time=None):
    """

    :param series: (np.array) price
    :param cutoff_period: (float) the cutoff period used to compute the period using Hilbert Transformation
    :param period: (int) fisher transformation parameter
    :param stoch_time: (int) fisher transformation parameter
    :return: (np.array) cg
    """
    # compute period
    period, _ = compute_period(series, cutoff_period)
    length = (period / 2).astype(np.int)
    # compute cg
    num = np.zeros_like(series)
    denom = np.ones_like(series)
    for i in range(length[1] - 1, series.shape[0]):
        num[i] = np.sum((np.array(range(length[i])) + 1) * series[i - length[i] + 1:i + 1][::-1])
        denom[i] = np.sum(series[i - length[i] + 1:i + 1])
    cg = -num / denom + (1 + length) / 2
    # apply fisher transformation
    if fperiod:
        cg = fisher(cg, fperiod, stoch_time)
    return cg

## VIII. Turning Point Indicator
### Auto-correlation Indicator
1) **method**

* If the correlation between ①[ now - average_len , now ] and ②[ now - average_len - lag , now - lag ] is small, it indicates a turning point
    
2) **parameter range**

* hp_period $(0, +\infty)$

* lp_period $(0, +\infty)$

* average_len $(0, +\infty)$

* lag $(0, +\infty)$


In [37]:
def corr(series, hp_period, lp_period, average_len, lag):
    """
    auto-correlation indicator: indicates reversal when it's near -1
    :param hp_period: highpass period to remove the trend from the original price
    :param lp_period: lowpass period to smooth the original price
    :param average_len: # period to compute auto-correlation
    :param lag: lag of period to compute auto_correlation
    :return: (np.array) correlation
    """
    HP = highpass2pole(series, hp_period)
    filt = supersmoother2pole(HP, lp_period)
    corr = np.zeros_like(series)
    for i in range(average_len + lag, series.shape[0] + 1):
        s1 = filt[i - average_len:i]
        s2 = filt[i - average_len - lag:i - lag]
        corr[i - 1] = np.corrcoef(s1, s2)[0, 1]
    return corr

### Auto-correlation Reversal Indicator
1) **method**

* Based on the correlation indicator, it takes the differece of 2 correlations. If the sum of change amounts to a certain threshold, it would signal a turning point.
    
2) **parameter range**

* hp_period $(0, +\infty)$

* lp_period $(0, +\infty)$

* average_len $(0, +\infty)$

* lag $(0, +\infty)$

* thresh $(0, 2*lag)$



In [38]:
def corr_reversal(series, hp_period, lp_period, average_len, lag, thresh):
    """
    auto-correlation reversal indicator indicates the reversals of the price
    :param hp_period: highpass period to remove the trend from the original price
    :param lp_period: lowpass period to smooth the original price
    :param average_len: # period to compute auto-correlation
    :param lag: lag of period to compute auto_correlation
    :param: thresh: if the num of reversal-indicator delta happens more than thresh times in the lag period,
                    it indicates a overall reversal
    :return: (np.array) reversal indicator based on the threshold as well as the original sum of delta
    """
    HP = highpass2pole(series, hp_period)
    filt = supersmoother2pole(HP, lp_period)
    corr = np.zeros_like(series)
    for i in range(average_len + lag, series.shape[0] + 1):
        s1 = filt[i - average_len:i]
        s2 = filt[i - average_len - lag:i - lag]
        corr[i - 1] = np.corrcoef(s1, s2)[0, 1]
    corr_1 = corr > 0
    corr_2 = np.roll(corr_1, 1)
    delta = np.abs(corr_1 - corr_2) / 2
    delta[0] = 0
    sumdelta = np.zeros_like(delta)
    for i in range(lag):
        sumdelta += np.roll(delta, i)
    reversal = sumdelta >= thresh
    return reversal, sumdelta

### Concolution Indicator
1) **method**

* If the convolution of [ now - lookback_period , now ] is larger, it indicates a turning point.
    
2) **parameter range**

* hp_period $(0, +\infty)$

* lp_period $(0, +\infty)$

* lookback_period $(0, +\infty)$


In [39]:
def convolution(series, hp_period, lp_period, lookback_period):
    """
    convolution indicator: use convolution within a lookback period to determine whether a turning point has occurred
    :param series: (np.array) price
    :param hp_period: highpass period to remove the trend from the original price
    :param lp_period: lowpass period to smooth the original price
    :param lookback_period: lookback period to compute convolution
    :return: (np.array) normalized convolution [0,1] and original convolution [-1,1]
    """
    HP = highpass2pole(series, hp_period)
    filt = supersmoother2pole(HP, lp_period)

    corr = np.zeros_like(series)
    for i in range(lookback_period, series.shape[0] + 1):
        lookback_ = filt[i - lookback_period:i]
        corr[i - 1] = np.corrcoef(lookback_, np.flip(lookback_))[0, 1]
    conv = (1 + (np.exp(3 * corr) - 1) / (np.exp(3 * corr) + 1)) / 2

    return conv, corr

## IX. Lead Indicator
1) **method**

* This indicator generates "lead" in the indicators to somehow achieve the effect similiar to prediction.
    
2) **parameter range**

* alpha1 $(0, 1)$

* alpha2 $(\text{alpha1}, 1)$

In [40]:
def lead(series, alpha1, alpha2):
    """
    lead indicator
    :param series: (np.array) price
    :param alpha1: (float) alpha to generate lead
    :param alpha2: (float) alpha to smooth while offsetting some lead
    :return: (np.array) netlead
    """
    assert alpha1 < alpha2
    lead = np.zeros_like(series)
    netlead = np.zeros_like(series)
    for i in range(1, series.shape[0]):
        lead[i] = 2 * series[i] + (alpha1 - 2) * series[i - 1] \
                  + (1 - alpha1) * lead[i - 1]
        netlead[i] = alpha2 * lead[i] + (1 - alpha2) * netlead[i - 1]
    return netlead

In [None]:
! jt -t oceans16
! jupyter notebook

[32m[I 10:35:54.313 NotebookApp][m The port 8888 is already in use, trying another port.
[32m[I 10:35:54.315 NotebookApp][m The port 8889 is already in use, trying another port.
[32m[I 10:35:54.316 NotebookApp][m The port 8890 is already in use, trying another port.
[32m[I 10:35:54.482 NotebookApp][m JupyterLab extension loaded from /Users/ayi/opt/anaconda3/lib/python3.7/site-packages/jupyterlab
[32m[I 10:35:54.482 NotebookApp][m JupyterLab application directory is /Users/ayi/opt/anaconda3/share/jupyter/lab
[32m[I 10:35:54.485 NotebookApp][m Serving notebooks from local directory: /Users/ayi/Desktop/blzg/working
[32m[I 10:35:54.485 NotebookApp][m The Jupyter Notebook is running at:
[32m[I 10:35:54.485 NotebookApp][m http://localhost:8891/?token=1e7165ef31aeec48ac118ef387b1db13d69926283521ff19
[32m[I 10:35:54.486 NotebookApp][m  or http://127.0.0.1:8891/?token=1e7165ef31aeec48ac118ef387b1db13d69926283521ff19
[32m[I 10:35:54.486 NotebookApp][m Use Control-C to stop th

In [2]:
from scipy.stats import norm
norm.cdf(2.5)

0.9937903346742238