In [None]:
import pandas as pd, numpy as np, re, pickle
import pyspark.sql.functions as F

from utilities import DeptRenamer, disp_all

# from sklearn.preprocessing import StandardScaler
# from sklearn.decomposition import PCA

In [None]:
# Initial Import to get Databricks to start

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [None]:
MIN_SQ_FT = 10000
LATEST_OPEN_DT = '2017-06-30'

In [None]:
dept_renamer = DeptRenamer()

In [None]:
query = '''
SELECT fac_sk, fac_nbr
FROM db_enriched.dim_2_fac_view
'''

sk2nbr_df = spark.sql(query).toPandas().apply(lambda x: pd.to_numeric(x))
sk2nbr_df.rename(columns={'fac_nbr' : 'store_id'}, inplace = True)
assert(sk2nbr_df.fac_sk.nunique() == sk2nbr_df.shape[0])

def fac_sk2id(df_with_sk, drop_sk=True):
    global sk2nbr_df
    sk2nbr_df_subset = sk2nbr_df[sk2nbr_df['fac_sk'].isin(df_with_sk['fac_sk'])].copy()
    
    df_with_sk = df_with_sk.merge(sk2nbr_df_subset, how='outer')
    df_with_id = df_with_sk[['store_id'] + df_with_sk.drop(columns=['store_id']).columns.tolist()]
    
    if drop_sk:
        df_with_id.drop(columns=['fac_sk'], inplace=True)
        
    return df_with_id

In [None]:
# active_stores_df = pd.read_excel('../../../data/active_store_list.xlsx', header=3)

# mask = active_stores_df['Operating \nStatus'] == "1-Open"
# active_stores_df = active_stores_df[mask]

# active_store_ids = active_stores_df['Lawson Facility #'].tolist()

# with open('../../../data/active_stores_lst.pkl', 'wb') as f:
#     pickle.dump(active_store_ids, f)

with open('../../../data/active_stores_lst.pkl', 'rb') as f:
    active_store_ids = pickle.load(f)

with open('../../../data/master_stores_list_for_shrink.pkl', 'rb') as f:
    master_stores_list = pickle.load(f)

# Data Processing and Integration

## Establish Skeleton of Active Stores only

In [None]:
query = '''
SELECT *
FROM db_enriched.lu_store_finance_om
WHERE closed_dt="9999-12-31"
AND opened_dt <= "{}"
AND total_building_size_amt >= {}
AND store_id IN ({})'''.format(LATEST_OPEN_DT, MIN_SQ_FT, str(active_store_ids)[1:-1])

In [None]:
df = spark.sql(query).toPandas()

# only_nan_closed_stores = df['closed_dt'] == 'NaT'
# df = df[only_nan_closed_stores]

In [None]:
df.shape

In [None]:
df = df[df.columns[df.nunique(dropna=False) > 1]]

In [None]:
df.drop(columns=[col for col in df.columns if "dw_" in col], inplace=True)

In [None]:
df.replace(r'N/A ?',      np.nan, inplace=True, regex=True)
df.replace('          ', np.nan, inplace=True)

In [None]:
df.shape

In [None]:
numeric_cols = ['store_id', 'store_cd', 
                'region_id', 
                'parent_op_area_id', 'parent_op_area_cd', 
                'op_area_finance_id', 'op_area_finance_cd', 
                'district_finance_id', 'district_finance_cd',
                'store_zip5_id', 
                'total_building_size_amt',
                'total_selling_area_amt',
                'banner_id',
                'prm_banner_id',
                'parent_store_id', 
                'op_area_id',
                'division_id',
                'store_voice_phone_nbr',
                'store_status_id',
                'hours_from_host_tm',
                'rog_id']

dt_cols = [col for col in df.columns if "_dt" in col]

In [None]:
df[numeric_cols] = df[numeric_cols].apply(lambda x: pd.to_numeric(x))

df[dt_cols] = df[dt_cols].apply(lambda x: pd.to_datetime(x, errors='coerce'))

In [None]:
assert(df.store_id.nunique() == df.shape[0])

In [None]:
to_drop = ['store_cd', 'region_cd', 'parent_op_area_cd', 'op_area_finance_cd']
df.drop(columns=to_drop, inplace=True)

In [None]:
df.shape

In [None]:
# with open('../../../data/master_stores_list_for_shrink.pkl', 'wb') as f:
#     pickle.dump(master_stores_list_for_shrink, f)

In [None]:
df.reset_index(drop=True, inplace=True)
df.to_feather('../../../data/temp_dfs/skel_df.feather')

In [None]:
df = pd.read_feather('../../../data/temp_dfs/skel_df.feather')

In [None]:
disp_all(df)

## db_enriched.F_RTL_OPS_WK_FAC

In [None]:
query = '''
SELECT fac_sk, sls_amt, ly_sls_amt, sls_prod_cnt, sls_trips_cnt, wages_amt, labor_manhrs_qty
FROM db_enriched.F_RTL_OPS_WK_FAC
WHERE wk_id >= "201800" AND wk_id < "201900"
'''

rtl_ops_fac_df = spark.sql(query)

In [None]:
rtl_ops_fac_df = rtl_ops_fac_df.toPandas().apply(lambda x: pd.to_numeric(x))

In [None]:
rtl_ops_fac_agg_df = rtl_ops_fac_df.groupby('fac_sk').agg({'sls_amt' : ['sum', 'median', 'std'],
                                                           'wages_amt' : ['sum', 'median', 'std'],
                                                           'labor_manhrs_qty' : ['sum', 'median', 'std'],
                                                           'sls_prod_cnt' : ['sum', 'median', 'std'],
                                                           'sls_trips_cnt' : ['sum', 'median', 'std']})
rtl_ops_fac_agg_df.columns = ["_".join(x) for x in rtl_ops_fac_agg_df.columns.ravel()]
rtl_ops_fac_agg_df.reset_index(inplace=True)

In [None]:
rtl_ops_fac_agg_df.to_clipboard()

In [None]:
df = df.merge(fac_sk2id(rtl_ops_fac_agg_df), on='store_id', how='left')

In [None]:
zeros_mask = rtl_ops_fac_agg_df.where(rtl_ops_fac_agg_df==0).dropna(axis=0, how='all').index

In [None]:
rtl_ops_fac_agg_zeros_df = rtl_ops_fac_agg_df.iloc[zeros_mask]

In [None]:
df.merge(fac_sk2id(rtl_ops_fac_agg_zeros_df, drop_sk=False), how='inner').to_clipboard(index=False)

## db_enriched.F_RTL_OPS_WK_DEPT

In [None]:
query = '''
SELECT *
FROM db_enriched.F_RTL_OPS_WK_DEPT
WHERE wk_id >= "201800" AND wk_id < "201900"
'''

df_sales = spark.sql(query)

### Sales, Wages, Manhours, Inv_cst

In [None]:
def rtl_ops_dept_lvl_data(agg_metric, col_renaming_prefix):
    '''
    agg_metric: sls_amt, wages_amt, labor_manhrs_qty, inv_cst_amt
    '''
    
    global df_sales
    
    df = df_sales.groupby('fac_sk').pivot('dept_nbr').sum(agg_metric).toPandas()
    df = dept_renamer.rename(df, col_renaming_prefix)
    df = df.apply(lambda x: pd.to_numeric(x))
    df['{}_store'.format(col_renaming_prefix)] = df.drop(columns=['fac_sk']).sum(axis=1)
    
    return df

### Time to Incorporate Total and Department-level Sales

In [None]:
dept_sls_df = rtl_ops_dept_lvl_data('sls_amt', 'ttl_sls')
dept_wage_df = rtl_ops_dept_lvl_data('wages_amt', 'ttl_wages')
dept_hrs_df = rtl_ops_dept_lvl_data('labor_manhrs_qty', 'ttl_hrs')
dept_cost_df = rtl_ops_dept_lvl_data('inv_cst_amt', 'ttl_cost')

In [None]:
dept_hrs_df.dropna(axis=1, thresh=int(dept_hrs_df.shape[0]*0.2))

In [None]:
dept_sls_df = df_sales.groupby('fac_sk').pivot('dept_nbr').sum('sls_amt').toPandas()

In [None]:
dept_sls_df = dept_renamer.rename(dept_sls_df, 'ttl_sales')

In [None]:
dept_sls_df = dept_sls_df.apply(lambda x: pd.to_numeric(x))

In [None]:
dept_sls_df['total_sales'] = dept_sls_df.drop(columns=['fac_sk']).sum(axis=1)

In [None]:
dept_sls_df.head(7)

### Wage Info

In [None]:
df_sales.columns

In [None]:
dept_wages_df = rtl_ops_dept_lvl_data('wages_amt', 'ttl_wages')

In [None]:
dept_wages_df.head()

In [None]:
dept_wages_df = df_sales.groupby('fac_sk').pivot('dept_nbr').sum('wages_amt').toPandas()
dept_wages_df = dept_renamer.rename(dept_sls_df, 'ttl_sales')
dept_wages_df = dept_sls_df.apply(lambda x: pd.to_numeric(x))
dept_wages_df['total_sales'] = dept_sls_df.drop(columns=['fac_sk']).sum(axis=1)

## rpt_paxar_upc_wave_index

In [None]:
query = '''
SELECT store_id, dept_id, period_id, total_discount_amt, total_discount_qty, unknown_discount_amt, net_sales
FROM db_enriched.rpt_paxar_upc_wave_index
WHERE period_id > "201800" AND period_id < "201900"
AND store_id IN ({})
'''.format(str(master_stores_list)[1:-1])

In [None]:
str(master_stores_list)

In [None]:
spk_pax_df = spark.sql(query)

In [None]:
ttl_pax_by_dept_df = spk_pax_df.groupby('store_id').pivot('dept_id').sum('total_discount_amt').toPandas()
ttl_pax_by_dept_df = dept_renamer.rename(ttl_pax_by_dept_df, 'ttl_pax')

In [None]:
ttl_pax_by_dept_df.dropna(axis=1, thresh=.8*ttl_pax_by_dept_df.shape[0], inplace=True)

In [None]:
ttl_pax_by_dept_df.fillna(0, inplace=True)

In [None]:
to_sum = ['total_discount_amt', 'total_discount_qty', 'unknown_discount_amt', 'net_sales']

sum_funcs = [F.sum(col) for col in to_sum]

In [None]:
pax_df = spk_pax_df.groupBy(['store_id', 'period_id']).agg(*sum_funcs).toPandas()
pax_df = pax_df.apply(lambda x: pd.to_numeric(x))



In [None]:
pax_df.columns = [re.sub(r'sum\(([_\w]+)\)', '\g<1>', col) for col in pax_df.columns.tolist()]

In [None]:
trfms = ['sum', 'std', 'mean', 'median']

pax_df = pax_df.groupby('store_id').agg({col : trfms for col in ['total_discount_amt', 'total_discount_qty', 'unknown_discount_amt', 'net_sales']})
pax_df.columns = ["_".join(x) for x in pax_df.columns.ravel()]
pax_df.reset_index(inplace = True)

In [None]:
ttl_disc_amt_col = pax_df['total_discount_amt_mean']

In [None]:
ttl_disc_amt_col.astype()

In [None]:
pax_df.sort_values(['store_id'])

In [None]:
pax_df

In [None]:
pax_df.shape

In [None]:
pax_df.columns

In [None]:
pax_df = pax_df.groupBy('store_id').pivot('dept_id').toPandas()

In [None]:
pax_df.head()

In [None]:
pax_df.shape

In [None]:
pax_df.head(7)

In [None]:
pax_df = dept_renamer.rename(pax_df, 'pax')

In [None]:
pax_df.head()

## db_enriched.by_store_2yr_sd_tenure

In [None]:
query = '''
SELECT *
FROM db_landing.by_store_by_period_turnover
'''

tenure_df = spark.sql(query)

In [None]:
df = pd.read_csv('/Users/corbin/Downloads/by_store_by_period_turnover.csv')

In [None]:
df.termed_cnt.describe([0.05, 0.25, 0.5, 0.75, 0.9, 0.98])

In [None]:
df.shape

In [None]:
df.termed_cnt.value_counts()

# Sandbox