### Apply Strong Full
This notebook implements a full version of the strong implementation of CSP for an input consistent panel.

IMPORTANT: a user-level spending by ticker-qtr table is read from storage to significantly speed up code.
This optimized table was generated by job #178183
 (associated with /Auxiliary/generate-optimized-transaction-table.py)

A dataframe of prod benchmarks calculated in another /Auxiliary notebook is  also read from storage.
  
 __author__ = 'Jake White'
 
 __email__ = 'jake.white@mscience.com'
 
 __status__ = 'Research'

### 0. Dependencies

In [3]:
%run ./functions

In [4]:
from pyspark.sql import Row
from pyspark.sql.dataframe import DataFrame
from pyspark.sql import functions as F

from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
from pyspark.sql.types import *
from pyspark.sql.window import Window

import datetime as dt
import pandas as pd
import matplotlib.pyplot as plt

#### Parameters

In [6]:
# run-specific parameters
dbutils.widgets.text('panels_table', "swipe_dev.{}")
panels_tablename = dbutils.widgets.get("panels_table")
print(panels_tablename)

dbutils.widgets.dropdown('panel_flip', 'False', ['True','False'])
panel_flip = eval(dbutils.widgets.get('panel_flip'))
print(panel_flip)

In [7]:
# dataset-specific parameters
_UID_COL = 'unique_mem_id'
_TFD_COL = 'transaction_file_date'
_IR_COL = 'ir_date'
_PERIOD_DAYS_AFTER = 14

_DAY_AFTER_LAST_FULL_MONTH = '2019-01-01'  # associated w/ optimized tag tables, see merchant_transactions below
_QTRS_TO_TEST = 8

In [8]:
# dynamic tables from prod pipeline
spark.catalog.refreshTable('swipe.financials')

fins = spark.table('swipe.financials')  # for company fiscal quarters
cos = spark.table('swipe_4m.swipe_companies') # for company internals only
panels = spark.table(panels_tablename)

In [9]:
# [caution:  cached (custom-generated) optimized transaction tables, prod-benchmarks, and user spending sums]
merchant_transactions = spark.table('swipe_dev_4m.merchant_tag_union_trimmed_ird_ccc')  # optimized tx tables generated by job #178183, run 10 (Feb 20)
panel_transactions = spark.table('swipe_dev_4m.panel_tag_union_trimmed_ird')  # optimized tx tables generated by job #178183, run 11 (Feb 20)
user_spending_by_ticker_qtr = spark.table('swipe_dev_4m.user_spending_by_ticker_qtr_ccc')  # cached table generated outside of job context (Feb 20)

prod_benchmarks = spark.table('swipe_dev_4m.csp_prod_benchmarks')\
                    .withColumnRenamed('dpuup_yoy', 'dpuup_yoy_bm')  # no-CSP benchmarks. Generated by generate-prod-benchmarks (Feb 20)

### Implement Strong CSP & Generate Raw Spending Index Time Series

In [11]:
def lagFeatures(df, by=['ticker'], over='enddate', vals=[''], lags=[4], lag_suffix='_lag_{}'):
    lag_window = Window.partitionBy(*by).orderBy(over)
    
    for col in vals:
      for l in lags:
        df = df.withColumn(col + lag_suffix, F.lag(F.col(col), l).over(lag_window))
      
    return df

In [12]:
fiscal_quarters = fins.select('ticker','startdate','enddate')
fiscal_quarters = lagFeatures(fiscal_quarters, by=['ticker'], 
                                   over='enddate', vals=['startdate'], lags=[-4], lag_suffix='_year_later')
fiscal_quarters = lagFeatures(fiscal_quarters, by=['ticker'],
                                   over='startdate', vals=['enddate'], lags=[-4], lag_suffix='_year_later')

fiscal_quarters = fiscal_quarters.filter(F.col('enddate') < _DAY_AFTER_LAST_FULL_MONTH)

##### Calculate User Spending by Ticker & Qtr if not Cached

In [14]:
# only generate spending DF if needs to be re-calculated
if type(user_spending_by_ticker_qtr) is not DataFrame:
  ticker_internals = cos.select('ticker','internal')
  merchant_transactions = merchant_transactions.join(ticker_internals, on='internal', how='inner')

  fiscal_quarters = fiscal_quarters.withColumnRenamed('ticker','ticker_alias')
  
  merchant_transactions = merchant_transactions.join(F.broadcast(fiscal_quarters),
                                                     (merchant_transactions[_IR_COL] >= fiscal_quarters['startdate']) & \
                                                     (merchant_transactions[_IR_COL] <= fiscal_quarters['enddate']) & \
                                                     (merchant_transactions['ticker'] == fiscal_quarters['ticker_alias']))\
                                               .drop('ticker_alias')
  
  user_spending_by_ticker_qtr = merchant_transactions.groupby(_UID_COL, 'ticker', 'startdate','enddate')\
                                                     .agg(F.sum('amount').alias('sum_spending'))
  
  user_spending_by_ticker_qtr.write.saveAsTable('swipe_dev_4m.user_spending_by_ticker_qtr_ccc', format='parquet', mode='overwrite')
  #dbutils.notebook.exit('0')

##### Associate User Spending with Panel Membership

In [16]:
panels_long = to_long(panels, by=[_UID_COL])\
                .withColumnRenamed('key','Period')\
                .withColumnRenamed('val','Included')

panels_long = panels_long.withColumn('Period', panels_long.Period.cast(IntegerType()))\
                         .orderBy('Period')

#### Fiscal Quarters Wrangling

In [18]:
# translate panel periods to fiscal quarters

# panel periods --> start and enddates
distinct_periods = spark.createDataFrame(map(lambda col: Row(period=int(col)), panels.columns[1:]), ['Period']).cache()

period_start_and_end_dates = \
  distinct_periods.select('Period', cal_qtr_to_dates('Period').alias('period_dates'))\
                  .select('Period', F.col('period_dates.startdate').alias('period_startdate'),
                                    F.col('period_dates.enddate').alias('period_enddate'))

# fiscal startdates --> nearest panel enddates
fiscal_quarters = fiscal_quarters.withColumn('startdate_nearest_cq_end',
                                             date_to_nearest_cal_qtr_end(F.col('startdate')))\
                                 .withColumn('startdate_year_later_nearest_cq_end',
                                             date_to_nearest_cal_qtr_end(F.col('startdate_year_later')))

In [19]:
# nearest panel enddates --> panel labels
fiscal_quarters = fiscal_quarters.join(F.broadcast(period_start_and_end_dates.select('Period','period_enddate')),
                                       (fiscal_quarters['startdate_nearest_cq_end'] == \
                                        period_start_and_end_dates['period_enddate']))\
                                 .drop('period_enddate')\
                                 .withColumnRenamed('Period', 'nearest_period')

In [20]:
# nearest panel enddates to panel labels
fiscal_quarters = fiscal_quarters.join(period_start_and_end_dates.select('Period','period_enddate'),
                                       (fiscal_quarters['startdate_year_later_nearest_cq_end'] == \
                                        period_start_and_end_dates['period_enddate']),
                                       how='left')\
                                 .drop('period_enddate')\
                                 .withColumnRenamed('Period', 'nearest_period_year_later')  # left join to preserve most recent qtrs

In [21]:
# hack for following join
fiscal_quarters = fiscal_quarters.withColumnRenamed('ticker', 'ticker_alias')\
                                 .withColumnRenamed('startdate', 'startdate_alias')\
                                 .withColumnRenamed('enddate', 'enddate_alias')

user_spending_with_year_later_fq = \
  user_spending_by_ticker_qtr.join(F.broadcast(fiscal_quarters),
                                   (user_spending_by_ticker_qtr['ticker'] == fiscal_quarters['ticker_alias']) & \
                                   (user_spending_by_ticker_qtr['startdate'] == fiscal_quarters['startdate_alias']) & \
                                   (user_spending_by_ticker_qtr['enddate'] == fiscal_quarters['enddate_alias']))\
                             .drop('ticker_alias', 'startdate_alias', 'enddate_alias')

#### (optional) Panel Flip

In [23]:
if panel_flip:
  if '_frac' in panels_tablename:
    @pandas_udf('double', PandasUDFType.SCALAR)
    def pandas_inclusion_flip(inc):
      inc[inc!=0] = 1 / inc[inc!=0]
      return inc.round(4)
  else:
    @pandas_udf('integer', PandasUDFType.SCALAR)
    def pandas_inclusion_flip(inc):
      return int(inc != 1)
  
  #assert (pandas_inclusion_flip.func(pd.Series([0,1,0])) == [1,0,1]).all(), 'flip function unit test not passed'
  
  print('flipping panel..')
  panels_long = panels_long.withColumn('Included_inverted', pandas_inclusion_flip(panels_long.Included))\
                           .drop('Included')\
                           .withColumnRenamed('Included_inverted','Included')

In [24]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)  # no broadcasting, for big panel join only!

In [25]:
# attach boolean 0/1 columns that indicate if a user was in the panel for the given quarter
# (as determined by the input panel membership dataframe)

panels_long = panels_long.withColumnRenamed(_UID_COL, 'uid_col_alias')

# one membership column for the qtr itself..
user_spending_with_membership = \
  user_spending_with_year_later_fq.join(panels_long, 
                                        (user_spending_with_year_later_fq[_UID_COL] == panels_long['uid_col_alias']) & \
                                        (user_spending_with_year_later_fq['nearest_period'] == panels_long['Period']),
                                        how='left')\
                             .drop('Period', 'uid_col_alias')\
                             .withColumnRenamed('Included', 'Inc_curr_panel')

In [26]:
# ..and another for the year-later qtr
# (helps us answer Q: what was curr qtr's panel spending one year ago?)
user_spending_with_membership_yoy = \
  user_spending_with_membership.join(panels_long,
                                     (user_spending_with_membership[_UID_COL] == panels_long['uid_col_alias']) & \
                                     (user_spending_with_membership['nearest_period_year_later'] == panels_long['Period']),
                                     how='left')\
                             .drop('Period', 'uid_col_alias')\
                             .withColumnRenamed('Included', 'Inc_year_later_panel')\
                             .withColumn('sum_spending_adjusted', F.col('sum_spending') * F.col('Inc_curr_panel'))

user_spending_with_membership = user_spending_with_membership_yoy

##### 2(c) Calculate raw spending index

In [28]:
# current panel's activity in current quarter (adjusted by inclusion weight)
panel_spending_by_ticker_qtr_curr = \
  user_spending_with_membership.filter(F.col('Inc_curr_panel') > 0)\
                               .groupby('ticker', 'enddate')\
                               .agg(F.sum('sum_spending_adjusted').alias('total_dollars_curr'))

# ..and year-later panel's activity in current quarter
panel_spending_by_ticker_qtr_year_ago = \
  user_spending_with_membership.filter(F.col('Inc_year_later_panel') > 0)\
                               .groupby('ticker', 'enddate', 'enddate_year_later')\
                               .agg(F.sum('sum_spending').alias('total_dollars_year_ago'))

# alias for brevity
pstq_curr = panel_spending_by_ticker_qtr_curr

# below clone is a workaround to deal with this Spark bug:
# https://stackoverflow.com/questions/45713290/how-to-resolve-the-analysisexception-resolved-attributes-in-spark
pstq_year_ago = spark.createDataFrame(panel_spending_by_ticker_qtr_year_ago.rdd,
                                     panel_spending_by_ticker_qtr_year_ago.schema)

In [29]:
# inline QA: uniqueness checks
# cts = pstq_year_ago.groupby('ticker','enddate','enddate_year_later').count()
# assert cts.filter(F.col('count')>1).count() == 0

In [30]:
# inline QA: uniqueness checks
# cts = pstq_curr.groupby('ticker','enddate').count()
# assert cts.filter(F.col('count')>1).count() == 0

In [31]:
pstq_year_ago = pstq_year_ago.withColumnRenamed('ticker', 'ticker_alias')  # <workaround for Spark join dup-column bug>

# want to compare current quarter panel's activity to their activity a year ago (2nd logical expr)
panel_spending_by_ticker_qtr_composite = \
  pstq_curr.join(pstq_year_ago.drop('enddate'),
                 (pstq_curr['ticker'] == pstq_year_ago['ticker_alias']) & \
                 (pstq_curr['enddate'] == pstq_year_ago['enddate_year_later']),
                 how='inner')\
           .drop('ticker_alias', 'enddate_alias')  # <eliminate workaround lint>

In [32]:
# inline QA: uniqueness checks
cts = panel_spending_by_ticker_qtr_composite.groupby('ticker','enddate').count()
assert cts.filter(F.col('count')>1).count() == 0

In [33]:
panel_spending_by_ticker_qtr_composite = \
  panel_spending_by_ticker_qtr_composite.withColumn('raw_si_strong', F.col('total_dollars_curr') / \
                                                                     F.col('total_dollars_year_ago'))\
                                        .drop('nearest_period_year_later')

# can also insert weak calculation here

In [34]:
display(
  panel_spending_by_ticker_qtr_composite.orderBy('ticker','enddate')
)

### 3.  Assess Model Performance

In [36]:
ticker_qtr_prod = prod_benchmarks.filter(~F.col('dpuup_yoy_bm').isNull())\
                                 .select('ticker','enddate','dpuup_yoy_bm','reported_value_yoy','d','U')

In [37]:
ticker_qtr_compare = ticker_qtr_prod.join(panel_spending_by_ticker_qtr_composite\
                                            .select('ticker','enddate','raw_si_strong', 'total_dollars_curr'),
                                          on=['ticker','enddate'])

In [38]:
# fiscal_quarters = lagFeatures(fiscal_quarters, by=['ticker'], 
#                                    over='enddate_alias', vals=['startdate'], lags=[-4], lag_suffix='_year_later')

In [39]:
# ticker_qtr_compare_full_pd = pd.read_csv('/dbfs/mnt/mscience-ds-dev/swipe_dev_4m/sandbox/CSP/' + \
#                                          panels_tablename.split('.')[1] + '_summary_results.csv')

# ticker_qtr_compare_full_pd['enddate'] = pd.to_datetime(ticker_qtr_compare_full_pd['enddate']).dt.date

# ticker_qtr_compare = spark.createDataFrame(ticker_qtr_compare_full_pd)
# display(ticker_qtr_compare)

In [40]:
# lagFeatures(ticker_qtr_compare, by=['ticker'], 
#                                  over='enddate', vals=['total_dollars_curr'], lags=[-4], lag_suffix='_year_ago')

In [41]:
# ticker_qtr_compare.write.saveAsTable(panels_tablename.replace('_panels', '_strong_results'))
# print(panels_tablename.replace('_panels', '_strong_results'))

##### Calculations

In [43]:
ticker_qtr_compare_pd = ticker_qtr_compare.toPandas()
ticker_qtr_compare_pd['raw_si_strong'] = ticker_qtr_compare_pd['raw_si_strong'].astype(float)

In [44]:
# sort & trim to most-recent
ticker_qtr_compare_pd = ticker_qtr_compare_pd.groupby('ticker').apply(lambda pdf: pdf.sort_values('enddate').tail(_QTRS_TO_TEST))
ticker_qtr_compare_pd.shape

In [45]:
ticker_qtr_compare = spark.createDataFrame(ticker_qtr_compare_pd)

In [46]:
ticker_qtr_compare_pd['dollars_ratio'] = ticker_qtr_compare_pd['total_dollars_curr'].astype(float) / \
                                         ticker_qtr_compare_pd['d'].astype(float)
ticker_qtr_compare_pd['APE_strong'] = (ticker_qtr_compare_pd['dpuup_yoy_bm'].astype(float) - \
                                       ticker_qtr_compare_pd['reported_value_yoy'].astype(float)).abs() / \
                                      ticker_qtr_compare_pd['reported_value_yoy'].astype(float)

In [47]:
def tracking_assessment(pdf, truth_col='reported_value_yoy', prod_col='dpuup_yoy_bm', csp_col='raw_si_strong'):
  truth_corrs = pdf[[truth_col, prod_col, csp_col]].corr()[truth_col].drop(truth_col)
  before_and_after_corr = pdf[[prod_col, csp_col]].corr().iloc[0,1]
  corrs = truth_corrs.append(pd.Series({'before_and_after': before_and_after_corr}))
  corrs.index = [col + '_corr' for col in corrs.index] 
  
  dollars_ratio_avg = pdf['dollars_ratio'].mean()
  mape = pdf['APE_strong'].mean()
  
  res = corrs.append(pd.Series({'mape': mape, 'dollars_ratio_avg' : dollars_ratio_avg}))
  
  return res

In [48]:
correlations_with_truth = ticker_qtr_compare_pd.groupby('ticker')\
                                               .apply(tracking_assessment)

correlations_with_truth['improvement_corr'] = correlations_with_truth['raw_si_strong_corr'] - correlations_with_truth['dpuup_yoy_bm_corr']

correlations_with_truth = correlations_with_truth.applymap(lambda val: round(val,4))

display(correlations_with_truth.reset_index())

In [49]:
correlations_with_truth.describe()[['dpuup_yoy_bm_corr', 'raw_si_strong_corr', 'improvement_corr']]

In [50]:
from scipy.stats import ttest_rel
ttest_rel(correlations_with_truth['dpuup_yoy_bm_corr'].dropna(),
          correlations_with_truth['raw_si_strong_corr'].dropna())  # null hypothesis: means are equal

In [51]:
ticker_qtr_compare_full_pd = ticker_qtr_compare.toPandas()

In [52]:
# write out results to S3
file_name_s3 = panels_tablename.split('.')[1] + ('_flipped' if panel_flip else '') + '_summary_results.csv'
ticker_qtr_compare_full_pd.to_csv('/dbfs/mnt/mscience-ds-dev/swipe_dev_4m/sandbox/CSP/' + file_name_s3, index=False)
print(file_name_s3)

In [53]:
ticker_qtr_compare_pd[ticker_qtr_compare_pd['dollars_ratio'] > 1].index.get_level_values(0).drop_duplicates().values

In [54]:
plt.clf()
# deal with 
ticker_qtr_compare_pd.plot.scatter(x='dollars_ratio', y='APE_strong')
# dollar_ratio no longer directly interpretable as comparing 
display()

In [55]:
dollar_loss_by_ticker = ticker_qtr_compare_pd.groupby('ticker')\
                                             .apply(lambda pdf: (pdf['total_dollars_curr'].astype(float) / \
                                                                 pdf['d']).mean())

dollar_loss_by_ticker.name = 'dollars_ratio'

dollar_loss_by_ticker.describe()

In [56]:
correlations_with_truth_analysis = correlations_with_truth.join(dollar_loss_by_ticker, how='inner')

plt.clf()
correlations_with_truth_analysis.plot.scatter(x='dollars_ratio', y='improvement_corr')
#plt.xlim(.3,.6)
display()