# CoIntegration

A Simple CoIntegration is a stationary relationship between two non-stationary processes, one is the leader and the other is the follower.

![CoInt](CoInt.png)

I copied some codes from [BennyLP's Stationarity](https://github.com/bennylp/Study/blob/master/Introduction%20to%20Time%20Series%20Analysis%20with%20Python/Stationarity.ipynb)

### References
- https://en.wikipedia.org/wiki/Cointegration

In [1]:
import warnings

import pandas as pd
import numpy as np
from statsmodels.tsa.stattools import coint, adfuller

N = 200
WSCALE = 0.001
MAXLAGS = 100

ADFULLER_CRIT = 0.01
COINT_THRESHOLD = 0.05

## The Non-Stationary is a Line

In [2]:
def generateCoIntegration( nonStationary, wscale, multiplier ):
    """Generate a new signal which cointegrates with @nonStationary.
    """
    white_noise = np.random.normal(scale=wscale, size=len( nonStationary ))
    return white_noise + nonStationary * multiplier, white_noise


def negativeTests( test_count, generateNonStationary1, generateNonStationary2, **nonStationaryKwargs ):
    """We expect @generateNonStationary1 and @generateNonStationary2 are both stationary and not being cointegrated.
    """
    test_coint = 0
    stationary_count = 0
    negative = 0
    for _ in range( 0, test_count):
        nonStationary_one = generateNonStationary1( wscale=2, **nonStationaryKwargs )
        nonStationary_two = generateNonStationary2( wscale=3, **nonStationaryKwargs )

        do_coint = True
        adres = adfuller( nonStationary_one )
        if adres[1] < ADFULLER_CRIT: 
            stationary_count += 1
            do_coint = False
        adres = adfuller( nonStationary_two )
        if adres[1] < ADFULLER_CRIT: 
            stationary_count += 1
            do_coint = False

        if do_coint:
            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                
                t, pval, crits = coint( nonStationary_one, nonStationary_two )
                if pval >= COINT_THRESHOLD:
                    negative += 1
                test_coint += 1
                
    # ----- result -----
    print( "Stationary Count: ", stationary_count )
    print( "Negative Rate: %d/%d (%.2f %%)" % ( negative, test_coint, negative * 100. / test_coint ) )
    
    return negative, test_coint, stationary_count

            
def positiveTests( test_count, generateNonStationary, **nonStationaryKwargs  ):
    """Generate a new signal which cointegrates with signal from @generateNonStationary. 
    We expect both signals indeed cointegrate.
    """
    test_coint = 0
    stationary_count = 0
    positive = 0

    for _ in range( 0, test_count ):
        nonStationary_one = generateNonStationary( wscale=2, **nonStationaryKwargs )
        nonStationary_two, white_noise = generateCoIntegration( nonStationary_one, wscale=5, multiplier=2 )

        do_coint = True
        adres = adfuller( nonStationary_one )
        if adres[1] < ADFULLER_CRIT: 
            stationary_count += 1
            do_coint = False
        adres = adfuller( nonStationary_two )
        if adres[1] < ADFULLER_CRIT: 
            stationary_count += 1
            do_coint = False

        if do_coint:
            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                
                t, pval, crits = coint( nonStationary_one, nonStationary_two )
                if pval < COINT_THRESHOLD:
                    positive += 1
                test_coint += 1
    
    # ----- result -----
    print( "Stationary Count: ", stationary_count )
    print( "Positive Rate: %d/%d (%.2f %%)" % ( positive, test_coint, positive * 100. / test_coint) )
    
    return positive, test_coint, stationary_count


def generateLine(n=N, slope=1, wscale=WSCALE):
    w = np.random.normal(scale=wscale, size=n)
    if slope:
        return np.arange(0, n*slope, slope) + w
    else:
        return w

### Negative Tests

In [3]:
test_count = 100
negative, test_coint, stationary_count = negativeTests( test_count, generateLine, generateLine )

Stationary Count:  0
Negative Rate: 0/100 (0.00 %)


### Positive Tests

In [4]:
test_count = 100
positive, test_coint, stationary_count = positiveTests( test_count, generateLine )

Stationary Count:  0
Positive Rate: 100/100 (100.00 %)


## The Non-Stationary is a Random Walk

In [5]:
def generateRandomWalk(n=N, wscale=1):
    x = w = np.random.normal(scale=wscale, size=n)
    for t in range(n):
        x[t] = x[t-1] + w[t]
    return x

### Negative Tests

In [6]:
test_count = 100
negative, test_coint, stationary_count = negativeTests( test_count, generateRandomWalk, generateRandomWalk )

Stationary Count:  2
Negative Rate: 94/98 (95.92 %)


### Positive Tests

In [7]:
test_count = 100
positive, test_coint, stationary_count = positiveTests( test_count, generateRandomWalk )

Stationary Count:  6
Positive Rate: 91/95 (95.79 %)


## The Non-Stationary is a AR(1) Process

In [8]:
def generateAR1(rho, n=N, wscale=1):
    # np.random.seed(seed)

    y = w = np.random.normal(scale=wscale, size=n)
    for t in range(n):
        y[t] = rho * y[t-1] + w[t]
    return y

### Negative Tests

In [9]:
test_count = 100
negative, test_coint, stationary_count = negativeTests( test_count, generateAR1, generateAR1, rho=1.1 )

Stationary Count:  0
Negative Rate: 0/100 (0.00 %)


### Positive Tests

In [10]:
test_count = 100
positive, test_coint, stationary_count = positiveTests( test_count, generateAR1, rho=1.2 )

Stationary Count:  0
Positive Rate: 100/100 (100.00 %)


## The Non-Stationary is a MA(1) Process

We know a MA(1) is stationary process. But, I like to test it anyway.

In [11]:
def generateMA(theta, n=N, mu=0, wscale=1):
    # np.random.seed(seed)

    y = w = np.random.normal(scale=wscale, size=n)
    y[0] = mu
    for t in range(1, n):
        y[t] = mu + w[t] + theta * w[t-1]
    return y

### Negative Tests

In [12]:
test_count = 100
negative, test_coint, stationary_count = negativeTests( test_count, generateMA, generateMA, theta=0.9 )

Stationary Count:  100
Negative Rate: 14/23 (60.87 %)


### Positive Tests

In [13]:
test_count = 100
positive, test_coint, stationary_count = positiveTests( test_count, generateMA, theta=0.9 )

Stationary Count:  106
Positive Rate: 26/31 (83.87 %)
