<a href="https://colab.research.google.com/github/MWFK/Nike_Case_Assignments/blob/main/Python%20PySpark%20Use%20Case/2_VIP_Promotion_PySpark_Version.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Objectives

**Build** a small data pipeline that outputs **an overview of Nike’s VIP customers**, **ordered by the total sales value of their purchases**. 

---

For this promotion, the marketing department is only interested in **VIPs currently located in The Netherlands.** 


---


**VIPs that have not purchased any products are still eligible** – they are VIP, after all. 


---


This ask is not a one-off, as Marketing is already thinking about running this promotion again next year if it is successful.


---


In the overview, Marketing is looking for:

  The name of the VIP

  The email address of the VIP

  The total sales value of the VIPs purchases

# Install Libs

In [102]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


# Load Libs

In [103]:
import numpy as np
import pandas as pd
import pyarrow.parquet as pq

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import coalesce, when, col,when, abs, lit

# Import Data

In [104]:
# Implementing Parquet file format in PySpark
spark=SparkSession.builder.appName("PySpark USe Case").getOrCreate()

# Reading parquet dataframe
df_transactions = spark.read.parquet('/content/transactions.parquet')
df_transactions.show(truncate = False)

+-----+----------------+------------+-----------+---------------+--------------------+--------+--------+---------------------------------+------------------------+-------------+-----------------+-----------------+-------------+
|store|transaction_date|order_number|line_number|product_id     |product_name        |quantity|currency|recommended_retail_price_per_unit|discount_amount_per_unit|profile_id   |cancellation_flag|cancellation_date|source_system|
+-----+----------------+------------+-----------+---------------+--------------------+--------+--------+---------------------------------+------------------------+-------------+-----------------+-----------------+-------------+
|.com |2020-11-20      |422681959244|1          |720854836340997|zJnSpFriDYyNDJhwxMxD|1       |NL      |721                              |null                    |1-5151-4642-1|null             |null             |DIGITAL      |
|.com |2020-11-20      |422681959244|2          |999789223355744|XVnSxtFOJUDGlfxVQauS|1 

In [105]:
path_vips_1101 = '/content/vips_2020-11-01.csv'
df_vips_1101   = spark.read.csv(path_vips_1101, header=True)
df_vips_1101.show(truncate = False)

+----------+----------+---------+--------+---------------+-----------------------+
|vip_id    |first_name|last_name|vip_type|country        |email                  |
+----------+----------+---------+--------+---------------+-----------------------+
|4544056563|Brandon   |Ritter   |Athlete |The Netherlands|Brandon.Ritter@nike.com|
|9339003839|Adam      |Hall     |Athlete |Czech Republic |Adam.Hall@nike.com     |
|1600627714|null      |null     |Donator |Australia      |Fernando@Ford.com      |
|7531485682|Edwin     |Floyd    |null    |The Netherlands|Edwin.Floyd@nike.com   |
|4271215441|Kristin   |Brooks   |Donator |The Netherlands|Kristin@Brooks.com     |
|4262242729|William   |Smith    |Athlete |United States  |William.Smith@nike.com |
|6159265415|Angela    |Trujillo |Donator |Belarus        |Angela@Trujillo.com    |
+----------+----------+---------+--------+---------------+-----------------------+



In [106]:
path_vips_1115 = '/content/vips_2020-11-15.csv'
df_vips_1115   = spark.read.csv(path_vips_1115, header=True)
df_vips_1115.show(truncate = False)

+----------+----------+-------------+--------+---------------+-----------------------+
|vip_id    |first_name|last_name    |vip_type|country        |email                  |
+----------+----------+-------------+--------+---------------+-----------------------+
|4544056563|Brandon   |Ritter       |Athlete |The Netherlands|Brandon.Ritter@nike.com|
|9339003839|Adam      |Hall         |Athlete |Czech Republic |Adam.Hall@nike.com     |
|1600627714|Fernando  |Ford         |Donator |Australia      |Fernando@Ford.com      |
|7531485682|Edwin     |Floyd        |null    |The Netherlands|Edwin.Floyd@nike.com   |
|4271215441|Kristin   |Brooks-Ritter|Donator |The Netherlands|Kristin@Brooks.com     |
|4262242729|William   |Smith        |Athlete |United States  |William.Smith@nike.com |
|6159265415|Angela    |Trujillo     |Donator |Belarus        |Angela@Trujillo.com    |
|4711973834|Seth      |Lee          |Donator |The Netherlands|Seth@Lee.com           |
+----------+----------+-------------+------

In [107]:
path_vips_1125 = '/content/vips_2020-11-25.csv'
df_vips_1125   = spark.read.csv(path_vips_1125, header=True)
df_vips_1125.show(truncate = False)

+----------+----------+-------------+-----------------+---------------+-----------------------+
|vip_id    |first_name|last_name    |vip_type         |country        |email                  |
+----------+----------+-------------+-----------------+---------------+-----------------------+
|4544056563|Brandon   |Ritter       |Athlete + Donator|The Netherlands|Brandon.Ritter@nike.com|
|9339003839|Adam      |Hall         |Athlete          |Czech Republic |Adam.Hall@nike.com     |
|1600627714|Fernando  |Ford         |Donator          |Australia      |Fernando@Ford.com      |
|7531485682|Edwin     |Floyd        |Donator          |The Netherlands|Edwin.Floyd@nike.com   |
|4271215441|Kristin   |Brooks-Ritter|Donator          |The Netherlands|Kristin@Brooks.com     |
|1154401046|Justin    |Powell       |Athlete          |Finland        |Justin.Powell@nike.com |
|4262242729|William   |Smith        |Athlete          |United States  |William.Smith@nike.com |
|6159265415|Angela    |Trujillo     |Don

In [108]:
path_vips_mapping = '/content/umd_vip_to_profile_mapping.csv'
df_vips_mapping   = spark.read.csv(path_vips_mapping, header=True, sep=';')
df_vips_mapping.show(truncate = False)

+----------+-------------+------+----------------+------------------------------------+
|vip_id    |profile_id   |active|meta_change_date|meta_comments                       |
+----------+-------------+------+----------------+------------------------------------+
|4544056563|1-5151-4642-1|yes   |null            |null                                |
|4544056563|1-5151-4642-1|no    |4-1-2020        |This athlete was in the system twice|
|9339003839|1-4786-9297-9|yes   |null            |null                                |
|1600627714|1-4947-2509-6|yes   |null            |null                                |
|7531485682|1-57006-547-0|no    |25-1-2020       |No longer a donator                 |
|4271215441|1-9860-3438-0|yes   |null            |null                                |
|1154401046|1-85886-104-7|yes   |null            |null                                |
|4262242729|0-89923-575-1|yes   |null            |null                                |
|6159265415|0-10-241138-7|yes   

# VIP Data Preprocessing

In [109]:
def vip_process(df_vips_mapping, start_date):
  
  # Date based data loading
  df_vips = spark.read.csv('/content/vips_'+start_date+'.csv', header=True)

  # Only keep VIP's currently based in the Netherlands
  df_vips = df_vips.filter((df_vips.country=='The Netherlands'))

  ''' 
  An athlete got his status active as no since he was duplicated in the system
  and since from the meta_commets we have no other indication, that means he still an active VIP, 
  so we'll delete that duplicate VIP ID with active status no
  '''
  df_vips_mapping = df_vips_mapping.drop_duplicates(subset=['vip_id'])

  # Merge df_vips_mapping with df_vips_1101 using 'vip_id'
  df_vips_mapped = df_vips.join(df_vips_mapping ,df_vips.vip_id == df_vips_mapping.vip_id, "left")

  # Only keep active VIP's
  df_vips_mapped = df_vips_mapped.filter(df_vips_mapped.active=='yes')

  df_vips_mapped = df_vips_mapped.withColumn("period_start", lit(start_date))

  return df_vips_mapped[['profile_id', 'first_name', 'email', 'period_start']]

In [110]:
start_date = '2020-11-01'
vip_process(df_vips_mapping, start_date).show()

+-------------+----------+--------------------+------------+
|   profile_id|first_name|               email|period_start|
+-------------+----------+--------------------+------------+
|1-9860-3438-0|   Kristin|  Kristin@Brooks.com|  2020-11-01|
|1-5151-4642-1|   Brandon|Brandon.Ritter@ni...|  2020-11-01|
+-------------+----------+--------------------+------------+



In [111]:
start_date = '2020-11-15'
vip_process(df_vips_mapping, start_date).show()

+-------------+----------+--------------------+------------+
|   profile_id|first_name|               email|period_start|
+-------------+----------+--------------------+------------+
|1-9860-3438-0|   Kristin|  Kristin@Brooks.com|  2020-11-15|
|1-5151-4642-1|   Brandon|Brandon.Ritter@ni...|  2020-11-15|
|1-59971-953-3|      Seth|        Seth@Lee.com|  2020-11-15|
+-------------+----------+--------------------+------------+



In [112]:
start_date = '2020-11-25'
vip_process(df_vips_mapping, start_date).show()

+-------------+----------+--------------------+------------+
|   profile_id|first_name|               email|period_start|
+-------------+----------+--------------------+------------+
|1-9860-3438-0|   Kristin|  Kristin@Brooks.com|  2020-11-25|
|1-5151-4642-1|   Brandon|Brandon.Ritter@ni...|  2020-11-25|
|1-59971-953-3|      Seth|        Seth@Lee.com|  2020-11-25|
+-------------+----------+--------------------+------------+



# Transaction Data Preprocessing

In [113]:
def transaction_process(df_transactions, start_date, end_date):

  # df_transactions_date = df_transactions[(df_transactions['transaction_date'] >= start_date) & (df_transactions['transaction_date'] < end_date)]
  df_transactions_date = df_transactions.filter(df_transactions.transaction_date >= start_date).filter(df_transactions.transaction_date < end_date) 

  # Replace the null with no
  df_transactions_date = df_transactions_date.fillna('no', subset=['cancellation_flag'])
  # Drop the cancelled items
  df_transactions_date = df_transactions_date.filter(df_transactions_date.cancellation_flag != 'yes')
  
  # Change the 100 with 0, so after applying the discount the customer gets the unit for free. Divide the rest of the values by 100
  df_transactions_date = df_transactions_date.withColumn('discount_amount_per_unit', when(df_transactions_date['discount_amount_per_unit'] == 100, 0).otherwise(df_transactions_date['discount_amount_per_unit']/100))
  # Change the null values with 1, so no discount will be applied
  df_transactions_date = df_transactions_date.fillna(1, subset=['discount_amount_per_unit'])
  
  # Calculate the unit price after applying the discount
  df_transactions_date = df_transactions_date.withColumn('unit_price_after_discount', abs(col('recommended_retail_price_per_unit'))*col('discount_amount_per_unit')*col('quantity'))\
  
  # Filter out unecessary columns
  df_transactions_date = df_transactions_date[['profile_id', 'unit_price_after_discount']]

  df_transactions_date = df_transactions_date.groupBy('profile_id').sum('unit_price_after_discount').withColumnRenamed("sum(unit_price_after_discount)", "period_sales_after_discount")

  df_transactions_date = df_transactions_date.withColumn("period_start", lit(start_date))

  return df_transactions_date

In [114]:
start_date = '2020-11-01'
end_date   = '2020-11-15'
transaction_process(df_transactions, start_date, end_date).show()

+-------------+---------------------------+------------+
|   profile_id|period_sales_after_discount|period_start|
+-------------+---------------------------+------------+
|1-9860-3438-0|                      362.0|  2020-11-01|
|0-89923-575-1|                      782.0|  2020-11-01|
|1-4786-9297-9|                     2694.0|  2020-11-01|
|1-4947-2509-6|                     2156.0|  2020-11-01|
+-------------+---------------------------+------------+



In [115]:
start_date = '2020-11-15'
end_date   = '2020-11-25'
transaction_process(df_transactions, start_date, end_date).show()

+-------------+---------------------------+------------+
|   profile_id|period_sales_after_discount|period_start|
+-------------+---------------------------+------------+
|1-57006-547-0|                      755.0|  2020-11-15|
|1-5151-4642-1|                    1550.75|  2020-11-15|
|1-4786-9297-9|                      644.0|  2020-11-15|
+-------------+---------------------------+------------+



In [116]:
start_date = '2020-11-25'
end_date   = '2020-11-31'
transaction_process(df_transactions, start_date, end_date).show()

+-------------+---------------------------+------------+
|   profile_id|period_sales_after_discount|period_start|
+-------------+---------------------------+------------+
|1-9860-3438-0|                     -181.0|  2020-11-25|
|1-57006-547-0|                     -755.0|  2020-11-25|
|1-85886-104-7|                     1802.0|  2020-11-25|
|1-5151-4642-1|                      372.0|  2020-11-25|
+-------------+---------------------------+------------+



# Weekly Overview

In [117]:
def vip_sales_overview(df_vips_mapping, df_transactions, start_date, end_date):
  df_vips_mapped              = vip_process(df_vips_mapping, start_date)
  df_transactions_date        = transaction_process(df_transactions, start_date, end_date)
  df_vips_mapped_transactions = df_vips_mapped.join(df_transactions_date , on=['profile_id'] , how = 'left').drop(df_transactions_date.period_start)
  return df_vips_mapped_transactions[['profile_id','first_name', 'email', 'period_sales_after_discount', 'period_start']]

In [118]:
start_date = '2020-11-01'
end_date   = '2020-11-15'
vip_sales_overview(df_vips_mapping, df_transactions, start_date, end_date).show()

+-------------+----------+--------------------+---------------------------+------------+
|   profile_id|first_name|               email|period_sales_after_discount|period_start|
+-------------+----------+--------------------+---------------------------+------------+
|1-9860-3438-0|   Kristin|  Kristin@Brooks.com|                      362.0|  2020-11-01|
|1-5151-4642-1|   Brandon|Brandon.Ritter@ni...|                       null|  2020-11-01|
+-------------+----------+--------------------+---------------------------+------------+



Interpretations Brandon did not make a purchase during the first two weeks. Kristin have bought two of the same unit during the first two weeks, Then he returned one of them during the last week Therefore, we should keep the minus values as an indications for returned units.

In [119]:
start_date = '2020-11-15'
end_date   = '2020-11-25'
vip_sales_overview(df_vips_mapping, df_transactions, start_date, end_date).show()

+-------------+----------+--------------------+---------------------------+------------+
|   profile_id|first_name|               email|period_sales_after_discount|period_start|
+-------------+----------+--------------------+---------------------------+------------+
|1-9860-3438-0|   Kristin|  Kristin@Brooks.com|                       null|  2020-11-15|
|1-5151-4642-1|   Brandon|Brandon.Ritter@ni...|                    1550.75|  2020-11-15|
|1-59971-953-3|      Seth|        Seth@Lee.com|                       null|  2020-11-15|
+-------------+----------+--------------------+---------------------------+------------+



Seth did not make any transactions during this period.

Kristin did not make any transactions during this period.


In [120]:
start_date = '2020-11-25'
end_date   = '2020-11-31'
vip_sales_overview(df_vips_mapping, df_transactions, start_date, end_date).show()

+-------------+----------+--------------------+---------------------------+------------+
|   profile_id|first_name|               email|period_sales_after_discount|period_start|
+-------------+----------+--------------------+---------------------------+------------+
|1-9860-3438-0|   Kristin|  Kristin@Brooks.com|                     -181.0|  2020-11-25|
|1-5151-4642-1|   Brandon|Brandon.Ritter@ni...|                      372.0|  2020-11-25|
|1-59971-953-3|      Seth|        Seth@Lee.com|                       null|  2020-11-25|
+-------------+----------+--------------------+---------------------------+------------+



Seth did not make any transactions during this period.

Kristin have returned on eof the two items that he have purchased in the first week of november.


# Monthly Overview

In [121]:
def monthly_overview(df_vips_mapping, df_transactions, start_date, end_date):
  df = vip_sales_overview(df_vips_mapping, df_transactions, start_dates[0], end_dates[0])
  for dt in range(2):
    df = df.union(vip_sales_overview(df_vips_mapping, df_transactions, start_dates[dt+1], end_dates[dt+1]))
  return df

In [122]:
start_dates = ['2020-11-01', '2020-11-15', '2020-11-25']
end_dates   = ['2020-11-15', '2020-11-25', '2020-11-31']
monthly_overview(df_vips_mapping, df_transactions, start_dates, end_dates).show()

+-------------+----------+--------------------+---------------------------+------------+
|   profile_id|first_name|               email|period_sales_after_discount|period_start|
+-------------+----------+--------------------+---------------------------+------------+
|1-9860-3438-0|   Kristin|  Kristin@Brooks.com|                      362.0|  2020-11-01|
|1-5151-4642-1|   Brandon|Brandon.Ritter@ni...|                       null|  2020-11-01|
|1-9860-3438-0|   Kristin|  Kristin@Brooks.com|                       null|  2020-11-15|
|1-5151-4642-1|   Brandon|Brandon.Ritter@ni...|                    1550.75|  2020-11-15|
|1-59971-953-3|      Seth|        Seth@Lee.com|                       null|  2020-11-15|
|1-9860-3438-0|   Kristin|  Kristin@Brooks.com|                     -181.0|  2020-11-25|
|1-5151-4642-1|   Brandon|Brandon.Ritter@ni...|                      372.0|  2020-11-25|
|1-59971-953-3|      Seth|        Seth@Lee.com|                       null|  2020-11-25|
+-------------+------

In [123]:
# Total sales for each customer during one month
monthly_overview(df_vips_mapping, df_transactions, start_date, end_date).groupby(['first_name', 'email']).sum('period_sales_after_discount')\
.withColumnRenamed("sum(period_sales_after_discount)", "period_sales_after_discount").sort(['period_sales_after_discount'],ascending = False).show()

+----------+--------------------+---------------------------+
|first_name|               email|period_sales_after_discount|
+----------+--------------------+---------------------------+
|   Brandon|Brandon.Ritter@ni...|                    1922.75|
|   Kristin|  Kristin@Brooks.com|                      181.0|
|      Seth|        Seth@Lee.com|                       null|
+----------+--------------------+---------------------------+

