## Rank Analysis

#### Author: Yiran Jing
#### Date: Jan 2020

In [1]:
import pandas as pd
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib import pyplot
import plotly.express as px
import plotly.graph_objects as go
from dataclasses import dataclass, field
from typing import Dict, List
from pyspark.sql.functions import lit
from Rank_analysis_helperfunction import *
import warnings
warnings.filterwarnings('ignore')

%matplotlib inline
sns.set_style("whitegrid")

import findspark
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
# Import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Row
import pyspark

In [2]:
%%time
"""
Build the SparkSession
"""
# getOrCreate(): get the current Spark session or to create one if there is none running
# manage Spark memory limits programmatically 
# To avoid out of memory error
spark = SparkSession.builder \
   .master("local") \
   .appName("Rank Model") \
   .config("spark.executor.memory", "4G")\
   .config('spark.driver.memory', '45G')\
   .config('spark.driver.maxResultSize', '10G')\
   .getOrCreate()
   
sc = spark.sparkContext

CPU times: user 24.3 ms, sys: 18.6 ms, total: 42.9 ms
Wall time: 3.4 s


#### Load and clean data

In [3]:
def calculate_material_change(dataset: Dataset) -> pyspark.sql.dataframe.DataFrame:
    """
    For each SKU in each store, compare the total sale with 
    the average of total sale overall stores given the same concept in the week before the last week
    
    return df: with column 'material_change':
            Blank:
                sumSales_oldWeek = 0
            True:
                1. for the item by Store in W1, but not in W2
                2. avgSales_lastWeek < sumSales_oldWeek (e.g w1 > w2)
            False:
                the left cases. 
            
            Note: 
                1. For the case: sumSales_oldWeek = 0, avgSales_lastWeek<0, in_W1_not_W2, material_change = ''
    """
    
    # get the average sales of each (SKU, Concept, average sale) in the week before last week
    average_sale_last_week = calculate_average_sale_last_week(dataset)
    
    # calculate last week infor
    old_week_data = dataset.df.filter(col('Date') == dataset.week[-2])
    
    ## net sale 
    old_week_sale = old_week_data.withColumnRenamed("NetSales", "sumSales_oldWeek").select('SKU',
                                                                                           'Store',
                                                                                           'sumSales_oldWeek')
    # merge dataset 
    output = dataset.df.join(average_sale_last_week, on = ['SKU', 'Concept_NEW'], how = 'full')
    output = output.join(old_week_sale, on = ['SKU', 'Store'], how = 'full')   
    output = output.na.fill(0)
    # test dataset
    #test_calculate_material_change(merge_data, last_week_sale)
    
    # generate 'material_change' based on given condition
    output = output.withColumn('material_change', 
                                       when(col('sumSales_oldWeek') ==0, '')
                                       .when((col("sumSales_oldWeek") > col('avgSales_lastWeek')) 
                                            & (col("in_W1_not_W2") =='True'), 'True') 
                                       .otherwise('False'))
    
    return output

#### add more test function 
def test_calculate_material_change(merge_data: pyspark.sql.dataframe.DataFrame, 
                                   last_week_sale: pyspark.sql.dataframe.DataFrame):
    assert merge_data.count() == last_week_sale.count(), 'we want ' + str(last_week_sale.count()) + \
    ". But we get "+str(merge_data.count())# test joined dataset
    
def calculate_unadressed_gap(dataset: Dataset) -> pyspark.sql.dataframe.DataFrame:
    """
    unadressed_gap = True if: Not shown in both W1 and W2
            1. network_expansion = False
            2. and in_W1_not_W2 = False
            3. NetSale = 0
    """
    output = dataset.df.withColumn('unadressed_gap', 
                                       when((col('network_expansion') =='False')&\
                                            (col('in_W1_not_W2') =='False')&\
                                            (col('NetSales') ==0), 'True')
                                       .otherwise('False'))
    return output

def get_top_50(dataset: Dataset, weekChoice: int) -> pyspark.sql.dataframe.DataFrame:
    """
    Get top 50 rank (SKU, Store) of last week or the week before last week
    weekChoice: 
         -1: last week
         -2: the week before last week
    """
    week_data = dataset.df.filter(col('Date') == dataset.week[weekChoice])  
    top_50_week = week_data.filter(col('rank') <=50) # top 50
    return top_50_week.select('SKU', 'Store', 'Date')
    
def calculate_newcomer(dataset: Dataset) -> pyspark.sql.dataframe.DataFrame:
    """
    calculate_newcomer = True if:
        1. Not in top 50 in the week before last week
        2. in top 50 in the last week
    Otherwise False
    """
    top_50_last_week = get_top_50(dataset, -1)
    top_50_week_before_last_week = get_top_50(dataset, -2).withColumnRenamed("Date", "Date_before")
    # Do left outer join to find new items occured in the last week
    newcomer = top_50_last_week.join(top_50_week_before_last_week, on = ['SKU', 'Store'], how = 'left')
    newcomer = newcomer.withColumn('newcomer',
                                    when(col('Date_before').isNull(), 'True')
                                   .otherwise('False')).drop("Date_before").drop('Date')
    
    output = dataset.df.join(newcomer, on = ['SKU', 'Store'], how = 'left').fillna("False", subset=['newcomer'])
    return output

In [4]:
%%time
## load and clean data
df = spark.read.csv("../data/rawData/data Ranking Report.csv", header=True) # raw data from TM1

CPU times: user 2.02 ms, sys: 1.46 ms, total: 3.49 ms
Wall time: 3.63 s


In [5]:
%%time
# create dataclass onject
df = clean_dataset(df)
dataset = Dataset(df = df, store_item_concept = get_store_item_concept_list(df, spark),
                  week = get_week_list(df), concept = get_concept_list(df))

CPU times: user 109 ms, sys: 24.8 ms, total: 134 ms
Wall time: 8.56 s


In [6]:
%%time
dataset.df = calculate_network_expansion(dataset)
dataset.df = calculate_material_change(dataset)
dataset.df = calculate_unadressed_gap(dataset)
dataset.df = calculate_newcomer(dataset)

CPU times: user 29.1 ms, sys: 5.06 ms, total: 34.2 ms
Wall time: 679 ms


In [7]:
dataset.df.printSchema()

root
 |-- SKU: string (nullable = true)
 |-- Store: string (nullable = true)
 |-- Concept_NEW: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- BusinessUnit: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- NetSales: float (nullable = false)
 |-- rank: float (nullable = false)
 |-- network_expansion: string (nullable = true)
 |-- in_W1_not_W2: string (nullable = true)
 |-- avgSales_lastWeek: double (nullable = false)
 |-- sumSales_oldWeek: float (nullable = false)
 |-- material_change: string (nullable = false)
 |-- unadressed_gap: string (nullable = false)
 |-- newcomer: string (nullable = false)



In [8]:
# drop duplicate row and unnecessary column
dataset.df = dataset.df.drop('in_W1_not_W2','sumSales_oldWeek').dropDuplicates()

In [9]:
%%time
dataset.df.toPandas().to_csv('../data/output/checkresult_SKU.csv', 
                             index=False, encoding='utf-8')

CPU times: user 4.88 s, sys: 608 ms, total: 5.49 s
Wall time: 4min 39s
