In [1]:
import pandas as pd
import numpy as np
import scipy
import re
import math
from itertools import repeat

In [2]:
# With minimal adjustment, this code black has been provided by many fellow M5 competitors. 
#And so, for that, I thank you!

def get_sales_df(train_df):
    TARGET     = 'sales'
    END_TRAIN  = 1913        
    MAIN_INDEX = ['id','d']

    index_columns = ['id','item_id','dept_id','cat_id','store_id','state_id']
    sales_df = pd.melt(train_df, 
                      id_vars    = index_columns, 
                      var_name   = 'd', 
                      value_name = TARGET)

    add_grid = pd.DataFrame()
    for i in range(1,29):
        temp_df = train_df[index_columns]
        temp_df = temp_df.drop_duplicates()
        temp_df['d'] = 'd_'+ str(END_TRAIN+i)
        temp_df[TARGET] = np.nan
        add_grid = pd.concat([add_grid,temp_df])

    sales_df = pd.concat([sales_df,add_grid])
    sales_df = sales_df.reset_index(drop=True)

    # Remove some temoprary DFs
    del temp_df, add_grid, train_df

    sales_df['cat'] = sales_df['id'].str.split('_').str.get(0)

    for col in index_columns:
        sales_df[col] = sales_df[col].astype('category')
    return sales_df



def get_events_df(cal):
    event_list=[i for i in cal.event_name_1.fillna(0).unique() if i != 0] 

    #Extract all the days an event has in the span of 1916 days
    day_event_list=[cal[cal.event_name_1==i].d.tolist() for i in event_list]

    #Create the Event_df dataframe which we will use throughout the notebook
    event_df=pd.DataFrame({'Event Name' : event_list, 'Event day':day_event_list})
    restricted_day= set(['d_'+ str(i) for i in np.arange(1916,1970)])
    quantity=[]

    for i in day_event_list:
        # Making sure that we exclude all the days thats are not in the training set
        clean_i=list(set(i)-restricted_day)
        temp=train_df[clean_i].sum().sum() #Adding columns and then rows
        quantity.append(temp)

    event_df['Quantity']=quantity

    all_events = event_df['Event day'].values
    all_events = np.concatenate(all_events, axis=0)
    all_events = all_events.astype(str)
    return all_events

In [3]:
train_df = pd.read_csv('sales_train_validation.csv')
cal = pd.read_csv('calendar.csv')

In [4]:
sales_df = get_sales_df(train_df)
# all_events = get_events_df(cal)

MemoryError: 

In [4]:
'''
QUANTITATIVE FINANCE AND TECHNICAL ANALYSIS METRICS TO APPLY TO EACH ITEM'S 
SALES. The different attributes are used to measure sale volatility, sale 
trend direction, sale trend direction, sale trend strength, sale averages 
of different periods, as well comparing these qualities to its benchmark 
(all items in the item's category.)
'''

class Quant_metrics():
    '''
    Quantitative finance metrics that can be utilized to measure sales 
    attributes of Walmart items.
    '''
    
    def __init__(self, item):
        self.item       = item
        self.sales_df   = sales_df   
        self.all_events = all_events
        
        self.food_benchmark      = self.sales_df.loc[self.sales_df['cat_id']=='FOODS']
        self.hobbies_benchmark   = self.sales_df.loc[self.sales_df['cat_id']=='HOBBIES']
        self.household_benchmark = self.sales_df.loc[self.sales_df['cat_id']=='HOUSEHOLD']
    

    
    #2-STD BOLLINGER BANDS
    def bollingers(self, window):
        '''
        2 standard deviation bollinger bands around the window-day rolling mean items sold per 
        day for specified item.

        Args:
            window(int)   : number of days we want to average over.
        '''
        item_df = pd.DataFrame(self.sales_df.loc[self.sales_df['id'] == self.item][['d','sales']])
        item_df['mean'] = item_df['sales'].rolling(window=window).mean()
        item_df['std'] = item_df['sales'].rolling(window=window).std()
        item_df['upper_band'] = item_df['mean'] + (item_df['std'] * 2)
        item_df['lower_band'] = item_df['mean'] - (item_df['std'] * 2)

        #lets eliminate any outliers by finding values outside of the bands that are not on event days
        item_df['sales'] = np.where(
        (item_df['sales'] > item_df['upper_band']) & (item_df['d'].values not in self.all_events),
        item_df['upper_band'], item_df['sales'])
        item_df['sales'] = np.where(
        (item_df['sales'] < item_df['lower_band']) & (item_df['d'].values not in self.all_events),
        item_df['lower_band'], item_df['sales'])

        return np.floor(item_df['sales'].values)




    #RELATIVE STRENGTH USING EXPONENTIALLY-WEIGHTED MOVING AVERAGE
    def RS(self, window, category=False):
        '''
        Relative strength of specified item compared to the average relative strength of 
        all items in its category.

        Args:
            window(int)   : number of days we want to average over.
            category(bool): whether or not we are finding the RS of an item or a category.
        '''

        series = pd.DataFrame(self.sales_df.loc[self.sales_df['id'] == self.item]['sales'])

        # Get the difference in price from previous step
        delta = series.diff().dropna()
        # Get rid of the first row, which is NaN since it did not have a previous 
        # row to calculate the differences
        delta = delta[1:] 

        #Make the positive gains (up) and negative gains (down) Series
        up, down = delta.copy(), delta.copy()
        up[up < 0] = 0
        down[down > 0] = 0

        # Calculate the EWMA
        roll_up1 = up.ewm(span=window).mean()
        roll_down1 = down.abs().ewm(span=window).mean()

        # Calculate the RSI based on EWMA
        RS1 = roll_up1 / roll_down1
        series['RS'] = RS1
    #     RSI1 = 100.0 - (100.0 / (1.0 + RS1))

        return series['RS']



    
    #INVERSE FISHER TRANSFORM
    def inverse_fisher(self, rsi_period, wma_period):
        """
        Modified Inverse Fisher Transform applied on RSI.
        Suggested method to use any IFT indicator is to buy when the indicator crosses over –0.5
        or crosses over +0.5 if it has not previously crossed over –0.5 and to sell short when 
        the indicators crosses under +0.5 or crosses under –0.5 if it has not previously crossed
        under +0.5.
        
        Args:
            rsi_period(int) : Period over which we calculate the item's Relative Strength Index (RSI).
            wma_period(int) : Period over which we calculate the item's weighted moving average (WMA).
        """
        
            #RELATIVE STRENGTH INDEX
        def RSI2(period, adjust: bool = True):
            """
            Relative Strength Index (RSI) is a momentum oscillator that measures the speed and 
            change of price movements. RSI oscillates between zero and 100. Traditionally, and
            according to Wilder, RSI is considered overbought when above 70 and oversold when 
            below 30. Signals can also be generated by looking for divergences, failure swings 
            and centerline crossovers. RSI can also be used to identify the general trend.

            Args:
                period(int): Period over whcih we calculate the item's Relative Strength Index (RSI).
            """

            ## get the price diff
            delta = series.diff()

            ## positive gains (up) and negative gains (down) Series
            up, down = delta.copy(), delta.copy()
            up[up < 0] = 0
            down[down > 0] = 0

            # EMAs of ups and downs
            _gain = up.ewm(span=period, adjust=adjust).mean()
            _loss = down.abs().ewm(span=period, adjust=adjust).mean()

            RS = _gain / _loss
            return pd.Series((100 - (100 / (1 + RS))))
    
        series = np.squeeze(self.sales_df.loc[self.sales_df['id'] == self.item]['sales'], axis=0)

        v1 = pd.Series(0.1 * (RSI2(series, rsi_period) - 50), name="v1")

        ### v2 = WMA(wma_period) of v1
        d = (wma_period * (wma_period + 1)) / 2  # denominator
        rev = v1.iloc[::-1]  # reverse the series
        wma = []

        def _chunks(series, period):  # split into chunks of n elements
            for i in enumerate(series):
                c = rev.iloc[i[0] : i[0] + period]
                if len(c) != period:
                    yield None
                else:
                    yield c

        def _wma(chunk, period):  # calculate wma for each chunk
            w = []
            for price, i in zip(chunk.iloc[::-1].items(), range(period + 1)[1:]):
                w.append(price[1] * i / d)
            return sum(w)

        for i in _chunks(rev, self.wma_period):
            try:
                wma.append(_wma(i, self.wma_period))
            except:
                wma.append(None)

        wma.reverse()  ##reverse the wma list t
    #     ifish=(np.exp(2*v2)-1)/(np.exp(2*v2)+1)
        v1["v2"] = pd.Series(wma, index=v1.index)
        fish = pd.Series(
            ((2 * v1["v2"]) - 1) ** 2 / ((2 * v1["v2"]) + 1) ** 2, name="IFT_RSI"
        )
        return fish

    

    
    #RELATIVE ROTATION GRAPH
    def RRG(self, window, steps):
        '''Relative Rotation Graph (RRG) of a sample of items and its categories. RRGs are made 
        of an item's/category's relative strength and momentum.

        Args:
            window(int) : Moving average window of item's sales.
            steps(int)  : number of days before next benchmark value is compared to item value. 
        '''

        categ = self.sales_df.loc[self.sales_df['id']==self.item]['cat_id'].iloc[0]
        
        if categ == 'FOODS':
            benchmark = self.food_benchmark
        elif categ == 'HOBBIES':
            benchmark = self.hobbies_benchmark
        else:
            benchmark = self.household_benchmark


        # Get the difference in price from previous step
        benchmark_delta = benchmark.diff().dropna()
        # Get rid of the first row, which is NaN since it did not have a previous 
        # row to calculate the differences  
        benchmark_delta = benchmark_delta[1:]

        # Make the positive gains (up) and negative gains (down) Series
        b_up, b_down = benchmark_delta.copy(), benchmark_delta.copy()
        b_up[b_up < 0] = 0
        b_down[b_down > 0] = 0

        #Calculate EWMA for category series
        b_roll_up2 = b_up.ewm(window).mean()
        b_roll_down2 = b_down.abs().ewm(window).mean()  

        b_RS2 = b_roll_up2 / b_roll_down2
    #     b_RSI2 = 100.0 - (100.0 / (1.0 + b_RS2))
        benchmark['jdk_rs'] = 100 + ((b_RS2 - b_RS2.mean()) / b_RS2.std() + 1)

        b_mom = benchmark.iloc[:,0].diff(periods = window).fillna(0)
        benchmark['jdk_mom'] = 100 + ((b_mom - b_mom.mean()) / b_mom.std() + 1)
        benchmark.fillna(0, inplace=True)

        #Repeat steps for the particular item we are indexing on
        series = self.sales_df.loc[self.sales_df['id'] == self.item]['sales'].to_frame()

        # Get the difference in price from previous step
        item_delta = series.diff().dropna()

        item_delta = item_delta[1:] 

        # Make the positive gains (up) and negative gains (down) Series
        i_up, i_down = item_delta.copy(), item_delta.copy()
        i_up[i_up < 0] = 0
        i_down[i_down > 0] = 0

        # Calculate the EWMA for item series
        i_roll_up2 = i_up.ewm(window).mean()
        i_roll_down2 = i_down.abs().ewm(window).mean()

        # Calculate the RSI based on SMA
        i_RS2 = i_roll_up2 / i_roll_down2
        i_RS2.loc[i_RS2['sales']==np.inf, 'sales']=0
    #     i_RSI2 = 100.0 - (100.0 / (1.0 + i_RS2))
        series['jdk_rs'] = 100 + ((i_RS2 - i_RS2.mean()) / i_RS2.std() + 1)

        i_mom = series.iloc[:,0].diff(periods = window).fillna(0)

        series['jdk_mom'] = 100 + ((i_mom - i_mom.mean()) / i_mom.std() + 1)
        series.fillna(0, inplace=True)

        #create points to compare of steps-day intervals
        ix = series.iloc[::steps]['jdk_rs'].values
        iy = series.iloc[::steps]["jdk_mom"].values
        item_vals = list(zip(ix,iy))

        bx = benchmark.iloc[::steps, benchmark.columns.get_loc("jdk_rs")].values
        by = benchmark.iloc[::steps, benchmark.columns.get_loc("jdk_mom")].values 
        bench_vals = list(zip(bx,by))


        rrg_bench = []
        rrg_item = []

        for i in range(len(item_vals)):
            if (item_vals[i][0]>bench_vals[i][0]) & (item_vals[i][1]>bench_vals[i][1]):
                rrg_bench.extend(repeat(1,steps))
            elif (item_vals[i][0]<bench_vals[i][0]) & (item_vals[i][1]>bench_vals[i][1]):
                rrg_bench.extend(repeat(2,steps))
            elif (item_vals[i][0]>bench_vals[i][0]) & (item_vals[i][1]<bench_vals[i][1]):
                rrg_bench.extend(repeat(3,steps))
            elif (item_vals[i][0]<bench_vals[i][0]) & (item_vals[i][1]<bench_vals[i][1]):
                rrg_bench.extend(repeat(4,steps))
            else:
                rrg_bench.extend(repeat(5,steps))

        for i in range(len(item_vals)):
            if (item_vals[i][0]>item_vals[i-1][0]) & (item_vals[i][1]>item_vals[i-1][1]):
                rrg_item.extend(repeat(1,steps))
            elif (item_vals[i][0]<item_vals[i-1][0]) & (item_vals[i][1]>item_vals[i-1][1]):
                rrg_item.extend(repeat(2,steps))
            elif (item_vals[i][0]>item_vals[i-1][0]) & (item_vals[i][1]<item_vals[i-1][1]):
                rrg_item.extend(repeat(3,steps))
            elif (item_vals[i][0]<item_vals[i-1][0]) & (item_vals[i][1]<item_vals[i-1][1]):
                rrg_item.extend(repeat(4,steps))
            else:
                rrg_item.extend(repeat(5,steps))

        series['rrg_bench'] = np.asarray(rrg_bench)
        series['rrg_item'] = np.asarray(rrg_item)

        return series['rrg_bench'], series['rrg_item']




    #AVERAGE TRUE RANGE
    def ATR(self, window):
        '''
        Average True Range (ATR) is a metric used to measure the how much the data is trending.
        When the ATR raises, that means the data is experiencing a trend in either direction. 
        Direction of trend is not identified with this metric.
        
        Args:
            item(str)   : Walmart item we are evaluating.
            window(int) : Moving average window of item's sales.
        '''
        
        def wwma(values, window):
            """
            J. Welles Wilder's exponentially weighted moving average.

            Args:
                window(int) : Moving average window of item's sales.
            """

            return values.ewm(alpha=1/window, adjust=False).mean()
        
        item_df = pd.DataFrame({'sales':self.sales_df.loc[self.sales_df['id'] == self.item]['sales']})
        item_wwma = item_df.ewm(alpha=1/window, adjust=False).mean() #Wilder's EMA

        high = item_df.rolling(window).max()
        low = item_df.rolling(window).min()
        close = item_df.rolling(window).mean()
        item_df['tr0'] = abs(high - low)
        item_df['tr1'] = abs(high - close.shift())
        item_df['tr2'] = abs(low - close.shift())
        tr = item_df[['tr0', 'tr1', 'tr2']].max(axis=1)
        atr = wwma(tr, window)
        item_df['atr'] = atr
        return item_df['atr'].fillna(method='ffill')

    
    
    
    #ANNUALIZED VOLATILITY
    def annualized_volatility(self, window):
        '''
        Return the annualized standard deviation of daily log returns of item.

        Args:
            window(int) : Moving average window of item's sales.
        '''


        item_df = pd.DataFrame({'sales':self.sales_df.loc[short['id'] == self.item]['sales']})
        item_df['ann_vol'] = item_df.diff().rolling(window).std()*(365**0.5)
        return item_df['ann_vol']

    
    
    
    #ENTROPY
    def calc_entropy(self):
        """
        Calculate entropy given a pandas series, list, or numpy array.

        Args:
            window(int) : Moving average window of item's sales.
        """

        series = pd.DataFrame({'sales':self.sales_df.loc[self.sales_df['id'] == self.item]['sales']})
        series = np.squeeze(series)
        # Compute the counts of each unique value in the column
        counts = np.bincount(series)
        # Divide by the total column length to get a probability
        probabilities = counts / len(series)

        # Initialize the entropy to 0
        entropy = 0
        # Loop through the probabilities, and add each one to the total entropy
        for prob in probabilities:
            if prob > 0:
                entropy += prob * math.log(prob, 2)

        return -entropy



    #BETA
    def beta(self, window):
        '''
        Beta compares the rolling sales of the specified item compared to its 
        benchmark.
        
        Args:
            window(int) : Moving average window of item's sales.
        '''

        categ = self.sales_df.loc[self.sales_df['id']==self.item]['cat_id'].iloc[0]
        if categ   == 'FOODS':
            benchmark = food_benchmark.iloc[:,0]
        elif categ == 'HOBBIES':
            benchmark = hobbies_benchmark.iloc[:,0]
        else:
            benchmark = household_benchmark.iloc[:,0]

        series = self.sales_df.loc[self.sales_df['id'] == self.item]['sales'].to_frame()
        benchmark = benchmark[:len(series)]

        series_roll = np.squeeze(series.rolling(window).std())
        bench_roll = benchmark.rolling(window).std()
        series_roll.fillna(0, inplace=True)
        bench_roll.fillna(0,inplace=True)

        beta = series_roll.values/bench_roll.values

        series['beta'] = beta
        series['beta'].fillna(0, inplace=True)
        return series['beta']



    
    #INFORMATION RATIO
    def information_ratio(self, window):
        '''
        Information ratio measures item sales returns over its standard deviation of returns.
        
        Args:
            window(int) : Moving average window of item's sales.
        '''    

        categ = self.sales_df.loc[self.sales_df['id']==self.item]['cat_id'].iloc[0]
        if categ == 'FOODS':
            benchmark = food_benchmark.iloc[:,0]
        elif categ == 'HOBBIES':
            benchmark = hobbies_benchmark.iloc[:,0]
        else:
            benchmark = household_benchmark.iloc[:,0]

        series = self.sales_df.loc[self.sales_df['id'] == self.item]['sales'].to_frame()
        benchmark = benchmark[:len(series)]

        return_difference = series.values - benchmark.values
        volatility = return_difference.std() * np.sqrt(window) 
        information_ratio = return_difference.mean() / volatility

        series['info_ratio'] = information_ratio

        return series['info_ratio']

    
    
    #TRIPLE EXPONENTIALLY-WEIGHTED MOVING AVERAGE
    def TEMA(self, window, adjust = True):
        """
        Triple exponentially moving average attempts to remove the inherent lag associated
        to Moving Averages by placing more weight on recent values. The name suggests this
        is achieved by applying a triple exponential smoothing which is not the case. The 
        name triple comes from the fact that the value of an EMA (Exponential Moving Average)
        is triple. To keep it in line with the actual data and to remove the lag the value 
        'EMA of EMA' is subtracted 3 times from the previously tripled EMA. Finally 'EMA of 
        EMA of EMA' is added. Because needed by a regular EMA.
        
        Args:
            window(int) : Moving average window of item's sales.
        """
        
            #EXPONENTIALLY-WEIGHTED MOVING AVERGAE
        def EMA(window, adjust = True):
            """
            The exponentially weighted moving average, Like all moving average indicators, 
            they are much better suited for trending markets. When the market is in a strong
            and sustained uptrend, the EMA indicator line will also show an uptrend and 
            vice-versa for a down trend. EMAs are commonly used in conjunction with other 
            indicators to confirm significant market moves and to gauge their validity.

            Args:
                window(int) : Moving average window of item's sales.
            """

            item_df = self.sales_df.loc[self.sales_df['id'] == item]['sales'].to_frame()
            return pd.Series(
                item_df.iloc[:,0]
                .ewm(span=window, adjust=adjust)
                .mean(),
                name="{0} period EMA".format(window),
            )
        
        item_df = self.sales_df.loc[self.sales_df['id'] == self.item]['sales'].to_frame()
        triple_ema = 3 * EMA(window)
        ema_ema_ema = (
            EMA(window)
            .ewm(ignore_na=False, span=window, adjust=adjust)
            .mean()
            .ewm(ignore_na=False, span=window, adjust=adjust)
            .mean()
        )

        TEMA = (
            triple_ema
            - 3
            * EMA(window)
            .ewm(span=window, adjust=adjust)
            .mean()
            + ema_ema_ema
        )

        return pd.Series(TEMA, name="{0} period TEMA".format(window))
    
    
    
    #RELATIVE VOLATILITY
    def relative_volatility(self, w1, w2):
        '''
        The relative volatlity divides an item's sales of by a short-period moving average 
        by the item's sales over a loner period, shifted over on another.
        
        Args:
            w1(int)  : shorter-period moving average window of item's sales.
            w2(int)  : longer-period moving average window of item's sales.
        '''
        
        series = self.sales_df.loc[self.sales_df['id'] == self.item]['sales'].to_frame()
        r1 = series.rolling(w1).std()
        r2 = series.rolling(w2).std()
        r1 = r1.shift(w2)
        r2 = r2.shift(-w1)

        rv =r1/r2
        series['rv'] = rv
        return series['rv']
    
    
    
    
    #SEMIVARIANCE
    def semivariance(self, w1, w2):
    '''
    Semivariance records the variance of sales that fall below the sales average of a period of time.
    
    Args:
        w1(int)  : shorter-period moving average window of item's sales.
        w2(int)  : longer-period moving average window of item's sales.
    '''
    
    series = self.sales_df.loc[(self.sales_df['id']==self.item) & (self.sales_df['sales'].notna())]['sales'].to_frame()
    series['mean'] = series['sales'].rolling(w1).mean().fillna(series['sales'].mean())
    series.loc[series['sales']<series['mean'], 'semivariance'] = series['sales'].rolling(w2).var()
    
    return series['semivariance']
