In [1]:
from IPython.display import HTML

HTML('''<script>
code_show=true; 
function code_toggle() {
 if (code_show){
 $('div.input').hide();
 } else {
 $('div.input').show();
 }
 code_show = !code_show
} 
$( document ).ready(code_toggle);
</script>
<form action="javascript:code_toggle()"><input type="submit" value="Click here to toggle on/off the raw code."></form>''')

In [2]:
%matplotlib inline

In [3]:
import numpy as np
import pandas as pd
import datetime
import matplotlib.pyplot as plt

from arch.unitroot import VarianceRatio
from itertools import groupby
from scipy.stats import norm
from sklearn.metrics import r2_score
from statsmodels.robust.scale import mad

from sgmbasketball.models.factor_model.play_by_play_data import PlayByPlayCleanData
from stratagemdataprocessing.data_api import find_basketball_events

from sgmresearchbase.coint.space import normalise
from sgmresearchbase.coint.common import hurst_naive
from sgmresearchbase.coint.services import CointegrationService, _zs
from sgmresearchbase.coint.projection import interpolate
from sgmresearchbase.coint.regression import add_constant, coch_regression

from IPython.display import clear_output

In [4]:
plt.style.use('ggplot')
plt.rcParams['figure.figsize'] = (16, 8)

In [5]:
START_DT = datetime.datetime(2016, 10, 1)
END_DT = datetime.datetime(2017, 6, 1)

ALL_EVENTS = find_basketball_events(START_DT, END_DT, True)
NBA_EVENTS = filter(lambda e: e['stage_name'] == 'NBA', ALL_EVENTS)

PBP = PlayByPlayCleanData(str(START_DT)[0:10], str(END_DT)[0:10], 'NBA', 'pbp', fixture_filter=None).get_data_ready()
clear_output()

In [6]:
def mk_test(x, out='both'):
    """
    This function is derived from code originally posted by Sat Kumar Tomer (satkumartomer at gmail.com)
    See also: http://vsp.pnnl.gov/help/Vsample/Design_Trend_Mann_Kendall.htm

    The purpose of the Mann-Kendall (MK) test (Mann 1945, Kendall 1975, Gilbert 1987) is to statistically assess
    if there is a monotonic upward or downward trend of the variable of interest over time. A monotonic upward
    (downward) trend means that the variable consistently increases (decreases) through time, but the trend may
    or may not be linear. The MK test can be used in place of a parametric linear regression analysis, which can
    be used to test if the slope of the estimated linear regression line is different from zero. The regression
    analysis requires that the residuals from the fitted regression line be normally distributed; an assumption
    not required by the MK test, that is, the MK test is a non-parametric (distribution-free) test.

    Hirsch, Slack and Smith (1982, page 107) indicate that the MK test is best viewed as an exploratory analysis
    and is most appropriately used to identify stations where changes are significant or of large magnitude and
    to quantify these findings.

    Input:
        x:   a vector of data
    Output:
        p: p value of the significance test
        z: normalized test statistics

    Examples
    --------
      >>> x = np.random.rand(100)
      >>> p, z = mk_test(x, 0.05)
    """    
    n = len(x)

    # Calculate S
    s = 0
    for k in range(n-1):
        s += np.sign(x[(k + 1):] - x[k]).sum()
    
    # Calculate var(S)
    var_s = float(n * (n - 1) * (2 * n + 5)) / 18
    g = len(np.unique(x))
    if g < n:
        x2 = sorted(x)
        for xx, xg in groupby(x2):
            tp = len(list(xg))
            if tp > 1:
                var_s -= float(tp * (tp - 1) * (2 * tp + 5)) / 18
    
    # Calculate statistic
    if s > 0:
        z = (s - 1) / np.sqrt(var_s)
    elif s == 0:
        z = 0
    elif s < 0:
        z = (s + 1) / np.sqrt(var_s)
    if out == 'stat':
        return z

    # Calculate p_value
    p = 2 * (1 - norm.cdf(abs(z))) # two tail test
    if out == 'pvalue':
        return p
    
    return p, z

In [7]:
def hurst_test(s):
    if len(s) <= 40:
        return np.nan

    return hurst_naive(s, lags=range(2, 50))

In [8]:
def vr_test(s, lag):
    if len(s) < 2 * lag:
        return np.nan

    vr = VarianceRatio(s, lag)
    return vr.stat, vr.pvalue

In [9]:
def detect_min_max(vals):
    x = np.arange(vals.shape[0])
    p = np.poly1d(np.polyfit(x, vals, 3))

    if r2_score(vals, p(x)) < 0.5:
        return None, None
    
    crit = p.deriv().r
    r_crit = crit[crit.imag == 0].real
    test = p.deriv(2)(r_crit)

    return r_crit[test > 0], r_crit[test < 0]

In [10]:
def fix_basis(both_bases):
    if both_bases[0, 0]*both_bases[1, 0] < 0:
        basis = both_bases[:, 0]
    elif both_bases[0, 1]*both_bases[1, 1] < 0:
        basis = both_bases[:, 1]
    else:
        basis = both_bases[:, 0]*0.0    

    if basis[0] < 0:
        basis = -basis

    return basis

In [11]:
def get_basis(vals):
    if vals.shape[0] < 10:
        return np.zeros((2,))

    both_bases = CointegrationService.get_cointegrating_bases(vals, model=1)
    if both_bases is None:
        return np.zeros((2,))

    return fix_basis(both_bases)

In [12]:
def get_spread(vals, basis):
    spd = CointegrationService.get_spread_series(vals, basis)
    mad_ = mad(spd, center=np.median, c=1)
    mean_ad = mad(spd, center=np.mean, c=1)

    if abs(mad_) < 1e-7:
        return (spd - np.median(spd)) / 1.253314 / mean_ad
    else:
        return (spd - np.median(spd)) / 1.486 / mad_

In [13]:
def to_signal(spread, test_idx=-1, check_hurst=True):
    if check_hurst and hurst_test(spread) >= 0.5:
        return [0, 0]

    if test_idx == -1:
        test_idx = spread.shape[0]-1

    def co_signal():
        if test_idx < 100:
            return 0

        exog = add_constant(np.arange(100).astype(np.float)/100)
        endog = spread[(test_idx-100):test_idx]

        fit = coch_regression(exog, endog)
        if fit.pvalues[1] < 0.05:
            if fit.params[1] > 0.1:
                return 1
            elif fit.params[1] < -0.1:
                return -1
        else:
            return 0

#     def mk_signal():
#         mk = mk_test(spread[-100:])
#         if mk[0] > 0.05 or spread.shape[0] < 100:
#             return 0

#         min_, max_ = detect_min_max(spread[-100:])
#         if min_ is None or min_.shape[0] == 0 or max_ is None or max_.shape[0] == 0:
#             return 0
#         else:
#             min_, max_ = (X.shape[0]-100)+int(min_[0]), (X.shape[0]-100)+int(max_[0])

#         if mk[1] > 0 and max_ > min_ and max_ > test_idx-5 and max_ <= test_idx+5:
#             return -1
#         elif mk[1] < 0 and min_ > max_ and min_ > test_idx-5 and min_ <= test_idx+5:
#             return 1
#         else:
#             return 0

    def ext_signal():
        if spread[test_idx] > 2:
            return -1
        elif spread[test_idx] < -2:
            return 1
        else:
            return 0

    return [co_signal(), ext_signal()]

In [14]:
def z_based_outlier(points, thresh=3.0):
    mean = np.mean(points, axis=0)
    diff = points - mean

    return np.abs(diff / np.std(points)) > thresh

def mad_based_outlier(points, thresh=3.5):
    median = np.median(points, axis=0)
    diff = points - median
    med_abs_deviation = np.median(np.abs(diff))

    return np.abs(0.6745 * diff / med_abs_deviation) > thresh

In [15]:
def smoothed_bases(bases):
    def get_valid(idx):
        return bases[:(idx+1)][~z_based_outlier(bases[:(idx+1), 0], thresh=3.0)]

    return np.array(
        [(np.median(get_valid(i), axis=0)) for i in range(bases.shape[0])]
    )

In [16]:
def get_sample(eid):
    data = PBP.loc[eid]

    return data[['points_h', 'points_a']].values

In [17]:
from sgmbasketball.models.factor_model.raw_data import NaturalHandicapRawData
NH = NaturalHandicapRawData(str(START_DT)[0:10], str(END_DT)[0:10], 'NBA', '', fixture_filter=None, use_cache=False).get_data_ready()

def get_nh(eid):
    return NH.loc[eid, 'ftps_1_line']

def nh_prior(nh):
    p = 0.5 + 0.002*nh
    return np.array([p, p-1])

In [22]:
def do_plot(eid, prior=nh_prior(0)):
    X = get_sample(eid)

    ss = 1
    lb =100
    mv = X.shape[0]

    actual_basis = get_basis(X)
    actual_spread = get_spread(X, actual_basis)

    all_bases = np.array([get_basis(X[:i, :]) for i in range(0, mv)])
    all_bases[:lb] = prior
    all_smoothed = smoothed_bases(all_bases)

    signals = np.array([to_signal(get_spread(X[:i, :], all_smoothed[i])) for i in range(lb, mv, ss)], dtype=np.int)
    signals_cheat_online = np.array([to_signal(get_spread(X[:i, :], actual_basis)) for i in range(lb, mv, ss)])
    signals_cheat_full = np.array([to_signal(actual_spread, test_idx=i, check_hurst=False) for i in range(0, mv, 1)])

    f, axes = plt.subplots(5, figsize=(16, 20), sharex=True)

    axes[0].step(np.arange(mv), X)
    axes[0].set_title('Scores')

    axes[1].step(np.arange(mv), actual_spread)
    axes[1].step(np.arange(lb, mv, ss), [get_spread(X[:i, :], all_smoothed[i])[-1] for i in range(lb, mv, ss)])
    axes[1].step(np.arange(lb, mv, ss), [get_spread(X[:i, :], prior)[-1] for i in range(lb, mv, ss)])
    axes[1].set_title('Spread')
    axes[1].axhline(1, linestyle='--', color='k', alpha=0.5)
    axes[1].axhline(-1, linestyle='--', color='k', alpha=0.5)
    axes[1].axhline(2, linestyle='--', color='k')
    axes[1].axhline(-2, linestyle='--', color='k')
    axes[1].set_xlim([0, mv])
    axes[1].set_ylim([-3, 3])
    axes[1].legend(['Hindsight', 'Online Estimation', 'Regular Point Spread'], loc='best')

    axes[2].step(np.arange(lb, mv, ss), signals)
    axes[2].set_title('Online')
    axes[2].set_xlim([0, mv])
    axes[2].set_ylim([-1.2, 1.2])
    axes[2].legend(['Cochrane-Orcutt', 'Overextension from Mean'], loc='best')

    axes[3].step(np.arange(lb, mv, ss), signals_cheat_online)
    axes[3].set_title('Cheat (partial)')
    axes[3].set_xlim([0, mv])
    axes[3].set_ylim([-1.2, 1.2])

    axes[4].step(np.arange(0, mv, 1), signals_cheat_full)
    axes[4].set_title('Cheat (full)')
    axes[4].set_xlim([0, mv])
    axes[4].set_ylim([-1.2, 1.2])

    plt.tight_layout()

In [1]:
eid = 2352977
do_plot(eid, prior=nh_prior(get_nh(eid)))

NameError: name 'do_plot' is not defined