In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
from setup.utils import *
from setup.univ_setup import *

%matplotlib inline

In [2]:
datadir = '/home/derek-qi/Documents/R3000_Data/data/r3000/'
univ = univ_setup(datadir, silent=False)
filt_na(univ)

Setup R3000 universe
use existing binary file
0.397287 seconds


In [8]:
data = stack(univ)
data.date = pd.to_datetime(data.date)
selTickers = np.unique(data['ticker'])[:1000]
data_sel = data

In [4]:
def _rmmt_sn(log_return, head, tail, c=0.5):
    assert head < tail, "head %d is greater than or equal to tail" % (head, tail)
    y = np.array([0] * head + [1] * (tail - head))
    y = y / np.sum(y)
    # rmmt = np.convolve(log_return, y) - c * np.convolve(log_return ** 2, y)
    rmmt = np.convolve(log_return, y) / np.sqrt(np.convolve(log_return ** 2, y) + 1e-6) # momentum for unit variance within the same period
    return rmmt[:-tail+1]


def revised_momentum(univ_table, head, tail, c=0.5, naming='simple'):
    '''
    Calculates the revised momentum factor defined as follows:
    mmt_r[t] = sum(lr[head:tail]) - c * sum(lr[head:tail] ** 2)
    head and tail are numbers of time periods

    Comparing to the vanilla type of momentum, this new momentum added
    penalties on the curvature of the log return series
    '''
    name = 'revised_momentum'
    if naming == 'full':
        name += '_%s_%s' % (head, tail)
    univ_table[name] = np.nan
    rmmt_dict = {}
    datelst = np.unique(univ_table['date'])
    # allTickers = np.unique(univ_table['ticker'])
    
    def _rmmt_single_name(table):
        lr = np.diff(np.log(table['price'])) # log return series
        lr = np.insert(lr, 0, 0)
        rmmt = _rmmt_sn(lr, head, tail, c)
        table.loc[:, name] = rmmt
        return table
    
    univ_table = univ_table.groupby('ticker').apply(_rmmt_single_name)
    
    for t in datelst:
        table = univ_table.loc[univ_table.date == t, ['date', 'ticker', name]].copy()
        table.dropna(inplace = True)
        rmmt_dict[t] = table
    return rmmt_dict

In [10]:
def _mmt_gap(ret_series, q1, q2):
    if ret_series.empty:
        return np.nan

    assert q1 > q2
    return np.percentile(ret_series, q1) - np.percentile(ret_series, q2)


def momentum_gap(univ_table, head, tail, q1=0.75, q2=0.25, naming='simple'):
    '''
    Momentum gap is defined as:
    q1 quantile - q2 quantile of the return series.
    '''
    assert q1 > q2, 'higher quantile %d should be larger than lower quantile %d' % (q1, q2)
    name = 'momentum_gap'
    retname = 'f_log_ret_1'
    if naming == 'full':
        name += 'time_%d_%d_range_%d_%d' % (head, tail, q1, q2)
    univ_table[name] = np.nan

    def _mmt_gap_single_name(table):
        window = tail - head
        table['log_ret'] = np.ediff1d(np.log(table['price']), to_begin=0)
        table['high'] = table['log_ret'].rolling(window).quantile(q1)
        table['low'] = table['log_ret'].rolling(window).quantile(q2)
        table[name] = table['high'].values - table['low'].values
        table[name] = table[name].shift(head)
        table.drop('high', inplace=True)
        table.drop('low', inplace=True)
        return table

    univ_table = univ_table.groupby('ticker').apply(_mmt_gap_single_name)

    mmt_gap_dict = {}
    datelst = np.unique(univ_table['date'])
    for t in datelst:
        table = univ_table.loc[univ_table.date == t, ['date', 'ticker', name]].copy()
        table.dropna(inplace = True)
        mmt_gap_dict[t] = table
    return mmt_gap_dict

In [11]:
%%prun
rmmt = momentum_gap(data_sel, 4, 52)

ValueError: labels ['high'] not contained in axis