In [65]:
pip install -U kaleido

Note: you may need to restart the kernel to use updated packages.


In [66]:
pip install plotly

Note: you may need to restart the kernel to use updated packages.


In [67]:
pip install psutil

Note: you may need to restart the kernel to use updated packages.


In [68]:
# LIBRARIES
# After installing pyspark in local device, we need to install and import findspark
# !pip install findspark
import findspark
findspark.init()

# Importing pyspark libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import col,isnan,when,count
from pyspark.sql.functions import to_date

# Visualization
import plotly.express as px
import plotly.io as pio

In [69]:
# SPARK CONTEXT
# Creating Spark Context using SparkSession
spark_session = SparkSession.builder\
                .appName('spark_task_1')\
                .master('local[2]')\
                .getOrCreate()

spark_context_new = spark_session.sparkContext
spark_context_new

In [70]:
# IMPORT DATA TO SPARK
# Creating schema of data types - calendar.csv
schema_calendar = StructType([
    StructField('_c0', IntegerType(), True),
    StructField('date', DateType(), True),
    StructField('start_of_the_year', DateType(), True),
    StructField('start_of_the_quarter', DateType(), True),
    StructField('start_of_the_month', DateType(), True)
])

# Reading the csv data - calendar
calendar = spark_session.read.csv("D:\Billy's\DE\PySpark\calendar.csv", header=True, schema=schema_calendar)

# Deleting unnecessary column
calendar = calendar.drop('_c0')

# Showing the data type of each column from calendar
calendar.printSchema()

# Showing the first 5 data from calendar 
calendar.show(5)

root
 |-- date: date (nullable = true)
 |-- start_of_the_year: date (nullable = true)
 |-- start_of_the_quarter: date (nullable = true)
 |-- start_of_the_month: date (nullable = true)

+----------+-----------------+--------------------+------------------+
|      date|start_of_the_year|start_of_the_quarter|start_of_the_month|
+----------+-----------------+--------------------+------------------+
|2012-01-01|       2012-01-01|          2012-01-01|        2012-01-01|
|2012-01-02|       2012-01-01|          2012-01-01|        2012-01-01|
|2012-01-03|       2012-01-01|          2012-01-01|        2012-01-01|
|2012-01-04|       2012-01-01|          2012-01-01|        2012-01-01|
|2012-01-05|       2012-01-01|          2012-01-01|        2012-01-01|
+----------+-----------------+--------------------+------------------+
only showing top 5 rows



In [71]:
# Creating schema of data types - customer_flight_activity.csv
schema_cust_flight = StructType([
    StructField('_c0', IntegerType(), True),
    StructField('loyalty_number', IntegerType(), True),
    StructField('year', StringType(), True),
    StructField('month', StringType(), True),
    StructField('total_flights', IntegerType(), True),
    StructField('distance', DoubleType(), True),
    StructField('points_accumulated', DoubleType(), True),
    StructField('points_redeemed', IntegerType(), True),
    StructField('dollar_cost_points_redeemed', StringType(), True)
])

# Reading the csv data - customer_flight_activity
cust_flight = spark_session.read.option("nullValue","").csv("D:\Billy's\DE\PySpark\customer_flight_activity.csv", header=True, schema=schema_cust_flight)

# Deleting unnecessary column
cust_flight = cust_flight.drop('_c0')

# Showing the data type of each column from cust_flight
cust_flight.printSchema()

# Showing the first 5 data from cust_flight
cust_flight.show(5)

root
 |-- loyalty_number: integer (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- total_flights: integer (nullable = true)
 |-- distance: double (nullable = true)
 |-- points_accumulated: double (nullable = true)
 |-- points_redeemed: integer (nullable = true)
 |-- dollar_cost_points_redeemed: string (nullable = true)

+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
|loyalty_number|year|month|total_flights|distance|points_accumulated|points_redeemed|dollar_cost_points_redeemed|
+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
|        100590|2018|    6|           12| 15276.0|           22914.0|              0|                          0|
|        100590|2018|    7|           12|  9168.0|           13752.0|              0|                          0|
|        100590|2018|    5|            4|  6504.0|           

In [72]:
# Creating schema of data types - customer_loyalty_history.csv
schema_cust_loyalty = StructType([
    StructField('_c0', IntegerType(), True),
    StructField('loyalty_number', IntegerType(), True),
    StructField('country', StringType(), True),
    StructField('province', StringType(), True),
    StructField('city', StringType(), True),
    StructField('postal_code', StringType(), True),
    StructField('gender', StringType(), True),
    StructField('education', StringType(), True),
    StructField('salary', DoubleType(), True),
    StructField('marital_status', StringType(), True),
    StructField('loyalty_card', StringType(), True),
    StructField('customer_lifetime_value', DoubleType(), True),    
    StructField('enrollment_type', StringType(), True),
    StructField('enrollment_year', StringType(), True),
    StructField('enrollment_month', StringType(), True),
    StructField('cancellation_year', StringType(), True),
    StructField('cancellation_month', StringType(), True)
])

# Reading the csv data - customer_loyalty_history
cust_loyalty = spark_session.read.option("nullValue","").csv("D:\Billy's\DE\PySpark\customer_loyalty_history.csv", header=True, schema=schema_cust_loyalty)
# Deleting unnecessary column
cust_loyalty = cust_loyalty.drop('_c0')

# Showing the data type of each column from cust_loyalty
cust_loyalty.printSchema()

# Showing the first 5 data from cust_loyalty
cust_loyalty.show(5)

root
 |-- loyalty_number: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- education: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- loyalty_card: string (nullable = true)
 |-- customer_lifetime_value: double (nullable = true)
 |-- enrollment_type: string (nullable = true)
 |-- enrollment_year: string (nullable = true)
 |-- enrollment_month: string (nullable = true)
 |-- cancellation_year: string (nullable = true)
 |-- cancellation_month: string (nullable = true)

+--------------+-------+----------------+---------+-----------+------+---------+--------+--------------+------------+-----------------------+---------------+---------------+----------------+-----------------+------------------+
|loyalty_number|country|        province|     city|postal_

In [73]:
# Creating temp tables
cust_loyalty.createOrReplaceTempView('cust_loyalty')
cust_flight.createOrReplaceTempView('cust_flight')
calendar.createOrReplaceTempView('calendar')

In [74]:
cust_flight.show(5)

+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
|loyalty_number|year|month|total_flights|distance|points_accumulated|points_redeemed|dollar_cost_points_redeemed|
+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
|        100590|2018|    6|           12| 15276.0|           22914.0|              0|                          0|
|        100590|2018|    7|           12|  9168.0|           13752.0|              0|                          0|
|        100590|2018|    5|            4|  6504.0|            9756.0|              0|                          0|
|        100590|2018|   10|            0|     0.0|               0.0|            512|                         92|
|        100590|2018|    2|            0|     0.0|               0.0|              0|                          0|
+--------------+----+-----+-------------+--------+------------------+---------------+---

In [75]:
# Checking NULL values
columns_check=["loyalty_number","year","month","total_flights","distance","points_accumulated","points_redeemed",
               "dollar_cost_points_redeemed"]
cust_flight.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in columns_check]).show()

+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
|loyalty_number|year|month|total_flights|distance|points_accumulated|points_redeemed|dollar_cost_points_redeemed|
+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
|             0|   0|    0|            0|       0|               473|              0|                          0|
+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+



In [76]:
cust_flight.describe().show()

+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+---------------------------+
|summary|    loyalty_number|              year|             month|     total_flights|          distance|points_accumulated|   points_redeemed|dollar_cost_points_redeemed|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+---------------------------+
|  count|            392936|            392936|            392936|            392936|            392936|            392463|            392936|                     392936|
|   mean|  550527.519033634| 2017.513661257813| 6.513661257812977| 1.294887717083698|1941.4402014577438|2017.3212430216352| 31.30426328969603|          5.635660769183786|
| stddev|258604.58018676273|0.4998139711893131|3.4454277995069966|1.9626753542128417|3239.9758893518842|3845.3226278582965|126.65377540656851|   

In [77]:
cust_flight.summary().show()

+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+---------------------------+
|summary|    loyalty_number|              year|             month|     total_flights|          distance|points_accumulated|   points_redeemed|dollar_cost_points_redeemed|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+---------------------------+
|  count|            392936|            392936|            392936|            392936|            392936|            392463|            392936|                     392936|
|   mean|  550527.519033634| 2017.513661257813| 6.513661257812977| 1.294887717083698|1941.4402014577438|2017.3212430216352| 31.30426328969603|          5.635660769183786|
| stddev|258604.58018676273|0.4998139711893131|3.4454277995069966|1.9626753542128417|3239.9758893518842|3845.3226278582965|126.65377540656851|   

In [78]:
# Based on the tables showed above, it can be seen that:
# cust_flight contains event-based information of flight from each person based on loyalty_number
# cust_loyalty contains user-based information with the details of user based on their loyalty_number

# DATA UNDERSTANDING
# Before cleaning, I want to understand the logic of the table

# For cust_flight, 
# The time period of the data
cust_flight.select('year','month').distinct().orderBy('year','month').show(50)

# 2 years of flight data from 2017-2018

+----+-----+
|year|month|
+----+-----+
|2017|    1|
|2017|   10|
|2017|   11|
|2017|   12|
|2017|    2|
|2017|    3|
|2017|    4|
|2017|    5|
|2017|    6|
|2017|    7|
|2017|    8|
|2017|    9|
|2018|    1|
|2018|   10|
|2018|   11|
|2018|   12|
|2018|    2|
|2018|    3|
|2018|    4|
|2018|    5|
|2018|    6|
|2018|    7|
|2018|    8|
|2018|    9|
+----+-----+



In [79]:
# I want to sort the data based on year and month columns for each loyalty number to look at the data better
spark_session.sql("""
    SELECT * FROM cust_flight
    WHERE loyalty_number = 100590
    ORDER BY year, month ASC
""").show()

# Points was purely based on distance. Points = Distance (in km) * 1.5
# Points accumulated also was based on the flights in the given period, not the total accumulated points over time

+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
|loyalty_number|year|month|total_flights|distance|points_accumulated|points_redeemed|dollar_cost_points_redeemed|
+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
|        100590|2018|   10|            0|     0.0|               0.0|            512|                         92|
|        100590|2018|   11|            0|     0.0|               0.0|              0|                          0|
|        100590|2018|   12|            0|     0.0|               0.0|              0|                          0|
|        100590|2018|    2|            0|     0.0|               0.0|              0|                          0|
|        100590|2018|    3|            0|     0.0|               0.0|              0|                          0|
|        100590|2018|    4|            0|     0.0|               0.0|              0|   

In [80]:
# Checking the mechanism of dollar_cost_points_redeemed
cust_flight.where(cust_flight['dollar_cost_points_redeemed']!=0).show()

# dollar_cost_points_redeemed = points_redeemed * 0.18

+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
|loyalty_number|year|month|total_flights|distance|points_accumulated|points_redeemed|dollar_cost_points_redeemed|
+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
|        100590|2018|   10|            0|     0.0|               0.0|            512|                         92|
|        102376|2018|   12|           15| 16500.0|           24750.0|            556|                        100|
|        105841|2018|    7|           20| 29400.0|           44100.0|            522|                         94|
|        106046|2018|   12|            9| 14337.0|              NULL|            396|                         71|
|        106046|2018|    4|            1|  1490.0|            2235.0|            547|                         98|
|        107519|2018|    3|            1|  1863.0|              NULL|            501|   

In [81]:
# Several conditions to clean this dataset:
# 1. Distance should always be filled everytime there is total_flights, vice versa
# 2. Points accumulated should always be filled everytime there is distance, vice versa
# 3. Dollar cost points redeemed should always be filled everytime there is points redeemed, vice versa

In [82]:
# 1. There's no chance that total_flights is 0 while distance was more than 0

spark_session.sql("""
    SELECT * FROM cust_flight
    WHERE total_flights = 0 AND distance > 0
""").show()

spark_session.sql("""
    SELECT * FROM cust_flight
    WHERE total_flights > 0 AND distance = 0
""").show()

# However, since we didn't know the distance, even with anomalies, we can't recover the data
# Except we have way to calculate distance from the departure airport to arrival airport
# Thus, this will be skipped

+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
|loyalty_number|year|month|total_flights|distance|points_accumulated|points_redeemed|dollar_cost_points_redeemed|
+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+

+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
|loyalty_number|year|month|total_flights|distance|points_accumulated|points_redeemed|dollar_cost_points_redeemed|
+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+



In [83]:
# 2. Points accumulated should always be filled everytime there is distance, vice versa

spark_session.sql("""
    SELECT * FROM cust_flight
    WHERE distance > 0 AND points_accumulated = 0
""").show()

spark_session.sql("""
    SELECT * FROM cust_flight
    WHERE distance = 0 AND points_accumulated > 0
""").show()

spark_session.sql("""
    SELECT * FROM cust_flight
    WHERE distance > 0 AND points_accumulated IS NULL
""").show()

+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
|loyalty_number|year|month|total_flights|distance|points_accumulated|points_redeemed|dollar_cost_points_redeemed|
+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+

+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
|loyalty_number|year|month|total_flights|distance|points_accumulated|points_redeemed|dollar_cost_points_redeemed|
+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+

+--------------+----+-----+-------------+--------+------------------+---------------+-

In [84]:
# Establishing condition based on data understanding
cust_flight = cust_flight.withColumn('points_accumulated', F.round(F.col('distance')*1.5,2))
cust_flight = cust_flight.withColumn('dollar_cost_points_redeemed', F.round(F.col('points_redeemed')*0.18,0))
cust_flight.show()

+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
|loyalty_number|year|month|total_flights|distance|points_accumulated|points_redeemed|dollar_cost_points_redeemed|
+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
|        100590|2018|    6|           12| 15276.0|           22914.0|              0|                        0.0|
|        100590|2018|    7|           12|  9168.0|           13752.0|              0|                        0.0|
|        100590|2018|    5|            4|  6504.0|            9756.0|              0|                        0.0|
|        100590|2018|   10|            0|     0.0|               0.0|            512|                       92.0|
|        100590|2018|    2|            0|     0.0|               0.0|              0|                        0.0|
|        100590|2018|    4|            0|     0.0|               0.0|              0|   

In [85]:
# Deleting unnecessary data
cust_flight = cust_flight.replace(0.0,None)
cust_flight = cust_flight.na.drop(subset=['total_flights','distance','points_accumulated','points_redeemed','dollar_cost_points_redeemed'], how='all')
cust_flight = cust_flight.na.fill(value=0.0)
cust_flight.show()

+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
|loyalty_number|year|month|total_flights|distance|points_accumulated|points_redeemed|dollar_cost_points_redeemed|
+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+
|        100590|2018|    6|           12| 15276.0|           22914.0|              0|                        0.0|
|        100590|2018|    7|           12|  9168.0|           13752.0|              0|                        0.0|
|        100590|2018|    5|            4|  6504.0|            9756.0|              0|                        0.0|
|        100590|2018|   10|            0|     0.0|               0.0|            512|                       92.0|
|        102376|2018|    6|           24| 21216.0|           31824.0|              0|                        0.0|
|        102376|2018|   12|           15| 16500.0|           24750.0|            556|   

In [86]:
# Getting flight period
cust_flight = cust_flight.withColumn('flight_period', F.concat_ws("-",F.col("year"),F.col("month")))
cust_flight = cust_flight.withColumn('flight_period', F.to_date(F.col("flight_period"), 'yyyy-M'))
cust_flight.show()

+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+-------------+
|loyalty_number|year|month|total_flights|distance|points_accumulated|points_redeemed|dollar_cost_points_redeemed|flight_period|
+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+-------------+
|        100590|2018|    6|           12| 15276.0|           22914.0|              0|                        0.0|   2018-06-01|
|        100590|2018|    7|           12|  9168.0|           13752.0|              0|                        0.0|   2018-07-01|
|        100590|2018|    5|            4|  6504.0|            9756.0|              0|                        0.0|   2018-05-01|
|        100590|2018|   10|            0|     0.0|               0.0|            512|                       92.0|   2018-10-01|
|        102376|2018|    6|           24| 21216.0|           31824.0|              0|                   

In [87]:
cust_loyalty.show(5)

+--------------+-------+----------------+---------+-----------+------+---------+--------+--------------+------------+-----------------------+---------------+---------------+----------------+-----------------+------------------+
|loyalty_number|country|        province|     city|postal_code|gender|education|  salary|marital_status|loyalty_card|customer_lifetime_value|enrollment_type|enrollment_year|enrollment_month|cancellation_year|cancellation_month|
+--------------+-------+----------------+---------+-----------+------+---------+--------+--------------+------------+-----------------------+---------------+---------------+----------------+-----------------+------------------+
|        480934| Canada|         Ontario|  Toronto|    M2Z 4K1|Female| Bachelor| 83236.0|       Married|        Star|                   NULL|       Standard|           NULL|            NULL|             NULL|              NULL|
|        549612| Canada|         Alberta| Edmonton|    T3G 6Y6|  Male|  College|    NULL

In [88]:
# Checking NULL values
columns_check=["loyalty_number","country","province","city","postal_code","gender","education",
                "salary","marital_status","loyalty_card","customer_lifetime_value","enrollment_type","enrollment_year",
                "enrollment_month","cancellation_year","cancellation_month"]
cust_loyalty.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in columns_check]).show()

+--------------+-------+--------+----+-----------+------+---------+------+--------------+------------+-----------------------+---------------+---------------+----------------+-----------------+------------------+
|loyalty_number|country|province|city|postal_code|gender|education|salary|marital_status|loyalty_card|customer_lifetime_value|enrollment_type|enrollment_year|enrollment_month|cancellation_year|cancellation_month|
+--------------+-------+--------+----+-----------+------+---------+------+--------------+------------+-----------------------+---------------+---------------+----------------+-----------------+------------------+
|             0|      0|       0|   0|          0|     0|        0|  4238|             0|           0|                  16737|              0|          16737|           16737|            14670|             16483|
+--------------+-------+--------+----+-----------+------+---------+------+--------------+------------+-----------------------+---------------+------

In [89]:
# Trying to recover salary information based on the frequency of flights and membership status
# Checking whether membership have correlation with total_distance, total_flights
spark_session.sql("""
    WITH calc AS (
    SELECT
    cl.loyalty_card AS membership,
    COUNT(DISTINCT(cl.loyalty_number)) AS n_user,
    SUM(cf.distance) AS total_distance, 
    SUM(cf.total_flights) AS total_flights,
    MEDIAN(cl.salary) AS median_salary
    FROM cust_flight cf
    RIGHT JOIN cust_loyalty cl ON cf.loyalty_number = cl.loyalty_number
    GROUP BY membership
    )

    SELECT *, total_flights/n_user AS avg_flights, total_distance/n_user AS avg_distance FROM calc
""").show()

# No significant difference in average salary, average flights, average distance between memberships
# However, it can be seen that the order of membership from lowest to highest: Star > Nova > Aurora
# I don't think it is wise to change the NULL values with avg_salary based on membership

+----------+------+--------------+-------------+-------------+------------------+------------------+
|membership|n_user|total_distance|total_flights|median_salary|       avg_flights|      avg_distance|
+----------+------+--------------+-------------+-------------+------------------+------------------+
|    Aurora|  3429|  1.58509592E8|       104940|      74293.0|30.603674540682416| 46226.18606007582|
|      Nova|  5671|  2.59517873E8|       172817|      73602.0| 30.47381414212661|45762.277023452654|
|      Star|  7637|  3.44834282E8|       231051|      73336.0|30.254157391645933|45153.107502946186|
+----------+------+--------------+-------------+-------------+------------------+------------------+



In [90]:
# Checking unique values of categorical columns
cust_loyalty.select('gender').distinct().show()
cust_loyalty.select('education').distinct().show()
cust_loyalty.select('marital_status').distinct().show()

+------+
|gender|
+------+
|Female|
|  Male|
+------+

+--------------------+
|           education|
+--------------------+
|              Master|
|High School or Below|
|              Doctor|
|            Bachelor|
|             College|
+--------------------+

+--------------+
|marital_status|
+--------------+
|       Married|
|      Divorced|
|        Single|
+--------------+



In [91]:
cust_loyalty.select('loyalty_card').distinct().show()
cust_loyalty.select('education').distinct().show()
cust_loyalty.select('enrollment_type').distinct().show()

+------------+
|loyalty_card|
+------------+
|      Aurora|
|        Nova|
|        Star|
+------------+

+--------------------+
|           education|
+--------------------+
|              Master|
|High School or Below|
|              Doctor|
|            Bachelor|
|             College|
+--------------------+

+---------------+
|enrollment_type|
+---------------+
| 2018 Promotion|
|       Standard|
+---------------+



In [92]:
cust_loyalty.describe().show()

+-------+------------------+-------+--------+--------+-----------+------+---------+-----------------+--------------+------------+-----------------------+---------------+---------------+----------------+------------------+------------------+
|summary|    loyalty_number|country|province|    city|postal_code|gender|education|           salary|marital_status|loyalty_card|customer_lifetime_value|enrollment_type|enrollment_year|enrollment_month| cancellation_year|cancellation_month|
+-------+------------------+-------+--------+--------+-----------+------+---------+-----------------+--------------+------------+-----------------------+---------------+---------------+----------------+------------------+------------------+
|  count|             16737|  16737|   16737|   16737|      16737| 16737|    16737|            12499|         16737|       16737|                      0|          16737|              0|               0|              2067|               254|
|   mean| 549735.8804445241|   NULL|

In [93]:
cust_loyalty.summary().show()
# Negative Salary

+-------+------------------+-------+--------+--------+-----------+------+---------+-----------------+--------------+------------+-----------------------+---------------+---------------+----------------+------------------+------------------+
|summary|    loyalty_number|country|province|    city|postal_code|gender|education|           salary|marital_status|loyalty_card|customer_lifetime_value|enrollment_type|enrollment_year|enrollment_month| cancellation_year|cancellation_month|
+-------+------------------+-------+--------+--------+-----------+------+---------+-----------------+--------------+------------+-----------------------+---------------+---------------+----------------+------------------+------------------+
|  count|             16737|  16737|   16737|   16737|      16737| 16737|    16737|            12499|         16737|       16737|                      0|          16737|              0|               0|              2067|               254|
|   mean| 549735.8804445241|   NULL|

In [94]:
cust_loyalty = cust_loyalty.withColumn('salary', F.abs(F.col('salary')))

In [95]:
cust_loyalty.summary().show()
# Done

+-------+------------------+-------+--------+--------+-----------+------+---------+------------------+--------------+------------+-----------------------+---------------+---------------+----------------+------------------+------------------+
|summary|    loyalty_number|country|province|    city|postal_code|gender|education|            salary|marital_status|loyalty_card|customer_lifetime_value|enrollment_type|enrollment_year|enrollment_month| cancellation_year|cancellation_month|
+-------+------------------+-------+--------+--------+-----------+------+---------+------------------+--------------+------------+-----------------------+---------------+---------------+----------------+------------------+------------------+
|  count|             16737|  16737|   16737|   16737|      16737| 16737|    16737|             12499|         16737|       16737|                      0|          16737|              0|               0|              2067|               254|
|   mean| 549735.8804445241|   N

In [96]:
# CLTV: total invoice value for all flights ever booked by member
# Based on the definition, it should be the correlated with transaction value of flights from users
# No information regarding that, so it can't be recovered

In [97]:
cust_flight.show(5)

+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+-------------+
|loyalty_number|year|month|total_flights|distance|points_accumulated|points_redeemed|dollar_cost_points_redeemed|flight_period|
+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+-------------+
|        100590|2018|    6|           12| 15276.0|           22914.0|              0|                        0.0|   2018-06-01|
|        100590|2018|    7|           12|  9168.0|           13752.0|              0|                        0.0|   2018-07-01|
|        100590|2018|    5|            4|  6504.0|            9756.0|              0|                        0.0|   2018-05-01|
|        100590|2018|   10|            0|     0.0|               0.0|            512|                       92.0|   2018-10-01|
|        102376|2018|    6|           24| 21216.0|           31824.0|              0|                   

In [98]:
spark_session.sql("""
    SELECT COUNT(DISTINCT(loyalty_number)) AS n_user_flight
    FROM cust_flight
""").show()

spark_session.sql("""
    SELECT COUNT(DISTINCT(loyalty_number)) AS n_user_loyalty
    FROM cust_loyalty
""").show()

# The number loyalty number of flight and loyalty data is the same

+-------------+
|n_user_flight|
+-------------+
|        16737|
+-------------+

+--------------+
|n_user_loyalty|
+--------------+
|         16737|
+--------------+



In [99]:
spark_session.sql("""
    WITH 
    data_flight AS (
    SELECT loyalty_number, MIN(year) AS first_flight 
    FROM cust_flight
    GROUP BY loyalty_number
    ),
    joined AS (
    SELECT cl.loyalty_number, cl.enrollment_year, cl.cancellation_year, df.first_flight 
    FROM cust_loyalty cl
    LEFT JOIN data_flight df ON cl.loyalty_number = df.loyalty_number
    )
    SELECT * FROM joined
    WHERE cancellation_year < first_flight
""").show()
# From this result, it can be concluded that enrollment date didn't have any correlation with flight date
# Thus, the data can't be recovered

+--------------+---------------+-----------------+------------+
|loyalty_number|enrollment_year|cancellation_year|first_flight|
+--------------+---------------+-----------------+------------+
|        103352|           NULL|           2015.0|        2017|
|        103383|           NULL|           2016.0|        2017|
|        104739|           NULL|           2014.0|        2017|
|        105352|           NULL|           2014.0|        2017|
|        105700|           NULL|           2015.0|        2017|
|        106589|           NULL|           2013.0|        2017|
|        110073|           NULL|           2015.0|        2017|
|        111387|           NULL|           2015.0|        2017|
|        112797|           NULL|           2015.0|        2017|
|        113656|           NULL|           2015.0|        2017|
|        117725|           NULL|           2014.0|        2017|
|        117832|           NULL|           2015.0|        2017|
|        118298|           NULL|        

In [100]:
# Drop duplicates
dedup_cust_loyalty = cust_loyalty.dropDuplicates(subset=['loyalty_number'])
dedup_cust_loyalty.show()

+--------------+-------+----------------+------------+-----------+------+---------+--------+--------------+------------+-----------------------+---------------+---------------+----------------+-----------------+------------------+
|loyalty_number|country|        province|        city|postal_code|gender|education|  salary|marital_status|loyalty_card|customer_lifetime_value|enrollment_type|enrollment_year|enrollment_month|cancellation_year|cancellation_month|
+--------------+-------+----------------+------------+-----------+------+---------+--------+--------------+------------+-----------------------+---------------+---------------+----------------+-----------------+------------------+
|        100018| Canada|         Alberta|    Edmonton|    T9G 1W3|Female| Bachelor| 92552.0|       Married|      Aurora|                   NULL|       Standard|           NULL|            NULL|             NULL|              NULL|
|        100102| Canada|         Ontario|     Toronto|    M1R 4K3|  Male|  C

In [101]:
# VISUALIZATION
# Creating temp tables for SQL querying
dedup_cust_loyalty.createOrReplaceTempView('dedup_cust_loyalty')
cust_flight.createOrReplaceTempView('cust_flight')
calendar.createOrReplaceTempView('calendar')

In [102]:
# When is the peak season of flights?
df1 = spark_session.sql("""
    SELECT 
    cf.flight_period AS period,
    cal.start_of_the_quarter AS quarter_cat,
    cal.start_of_the_year AS year_cat,
    SUM(cf.total_flights) AS total_flights,
    SUM(cf.points_redeemed) AS redeemed_points
    FROM cust_flight cf
    LEFT JOIN calendar cal ON cal.date = cf.flight_period
    GROUP BY period, quarter_cat, year_cat
    ORDER BY period ASC
""")

pandas_df1 = df1.toPandas()
pandas_df1

Unnamed: 0,period,quarter_cat,year_cat,total_flights,redeemed_points
0,2017-01-01,2017-01-01,2017-01-01,13059,351520
1,2017-02-01,2017-01-01,2017-01-01,13368,357630
2,2017-03-01,2017-01-01,2017-01-01,18391,445791
3,2017-04-01,2017-04-01,2017-01-01,15449,417694
4,2017-05-01,2017-04-01,2017-01-01,18690,468352
5,2017-06-01,2017-04-01,2017-01-01,23504,578368
6,2017-07-01,2017-07-01,2017-01-01,26312,589656
7,2017-08-01,2017-07-01,2017-01-01,22976,545503
8,2017-09-01,2017-07-01,2017-01-01,17439,477650
9,2017-10-01,2017-10-01,2017-01-01,16458,440584


In [103]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots


fig = make_subplots(specs=[[{"secondary_y": True}]])

fig.add_trace(go.Scatter(x=pandas_df1['period'], y=pandas_df1['total_flights'],
                    mode='lines',
                    name='total_flights'), secondary_y=False)
fig.add_trace(go.Scatter(x=pandas_df1['period'], y=pandas_df1['redeemed_points'],
                    mode='lines',
                    name='redeemed_points'), secondary_y=True)


fig.update_xaxes(title_text="Period")
fig.update_yaxes(title_text="Total Flights", secondary_y=False)
fig.update_yaxes(title_text="Redeemed Points", secondary_y=True)

fig.show()
# pio.write_image(fig, "line.jpeg")
# px.line(pandas_df1, x='period', y='total_flights')
# The seasonal trend was observed with peaks on March, July, and December.
# It is surprising that the peak of flights season of the dataset located mostly in July.

In [104]:
cust_loyalty.select('country').distinct().show()
# Considering the location of the dataset, the peak in July was caused by Summer Break Holiday.
# Reference: https://www.edarabia.com/school-holidays-canada/#:~:text=Summer%20break%20in%20Canada%20typically,and%20ends%20in%20New%20Year).

+-------+
|country|
+-------+
| Canada|
+-------+



In [105]:
dedup_cust_loyalty.show(5)

+--------------+-------+----------------+------------+-----------+------+---------+-------+--------------+------------+-----------------------+---------------+---------------+----------------+-----------------+------------------+
|loyalty_number|country|        province|        city|postal_code|gender|education| salary|marital_status|loyalty_card|customer_lifetime_value|enrollment_type|enrollment_year|enrollment_month|cancellation_year|cancellation_month|
+--------------+-------+----------------+------------+-----------+------+---------+-------+--------------+------------+-----------------------+---------------+---------------+----------------+-----------------+------------------+
|        100018| Canada|         Alberta|    Edmonton|    T9G 1W3|Female| Bachelor|92552.0|       Married|      Aurora|                   NULL|       Standard|           NULL|            NULL|             NULL|              NULL|
|        100102| Canada|         Ontario|     Toronto|    M1R 4K3|  Male|  Colle

In [106]:
cust_flight.show(5)

+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+-------------+
|loyalty_number|year|month|total_flights|distance|points_accumulated|points_redeemed|dollar_cost_points_redeemed|flight_period|
+--------------+----+-----+-------------+--------+------------------+---------------+---------------------------+-------------+
|        100590|2018|    6|           12| 15276.0|           22914.0|              0|                        0.0|   2018-06-01|
|        100590|2018|    7|           12|  9168.0|           13752.0|              0|                        0.0|   2018-07-01|
|        100590|2018|    5|            4|  6504.0|            9756.0|              0|                        0.0|   2018-05-01|
|        100590|2018|   10|            0|     0.0|               0.0|            512|                       92.0|   2018-10-01|
|        102376|2018|    6|           24| 21216.0|           31824.0|              0|                   

In [107]:
# Is there correlation between salary and the frequency or distance of flights?
df2 = spark_session.sql("""
    SELECT 
    cl.loyalty_number,
    cl.loyalty_card,
    SUM(cf.total_flights) AS total_flights,
    SUM(cf.distance) AS total_distance,
    AVG(cl.salary) AS salary
    FROM dedup_cust_loyalty cl
    LEFT JOIN cust_flight cf ON cf.loyalty_number = cl.loyalty_number
    GROUP BY cl.loyalty_number, cl.loyalty_card
    HAVING salary IS NOT NULL
""")

pandas_df2 = df2.toPandas()
pandas_df2

Unnamed: 0,loyalty_number,loyalty_card,total_flights,total_distance,salary
0,100018,Aurora,46.0,81190.0,92552.0
1,100214,Star,22.0,38236.0,63253.0
2,100272,Star,37.0,54997.0,91163.0
3,100301,Nova,41.0,62849.0,70323.0
4,100364,Nova,33.0,45955.0,76849.0
...,...,...,...,...,...
12494,975846,Aurora,41.0,62901.0,96817.0
12495,976004,Star,9.0,14046.0,81510.0
12496,979115,Aurora,32.0,42893.0,66784.0
12497,984115,Star,28.0,33574.0,92551.0


In [108]:
fig = px.scatter(pandas_df2,x='total_flights',y='total_distance',color='loyalty_card')
fig.show()
# pio.write_image(fig, "total_flights_vs_distance.jpeg")
# Since total flights and total distance was highly correlated, let's just pick one to compare with salary
# Add loyalty card information to see whether there are differences between groups

In [109]:
fig = px.scatter(pandas_df2,x='total_flights',y='salary',color='loyalty_card')
fig.show()
# pio.write_image(fig, "total_flights_vs_salary.jpeg")
# No pattern was seen from the graph. 
# Even without statistics, we can assume that no significant correlation between total_flights and salary

In [110]:
# Is there any difference of behaviour between loyalty card?
fig = px.histogram(pandas_df2,x='total_flights',color='loyalty_card')
fig.show()
# pio.write_image(fig, "hist_loyalty1.jpeg")
# No difference of behaviour between groups

In [111]:
fig = px.histogram(pandas_df2,x='salary',color='loyalty_card')
fig.show()
# pio.write_image(fig, "hist_loyalty2.jpeg")
# No difference of behaviour between groups

In [112]:
dedup_cust_loyalty.show(5)

+--------------+-------+----------------+------------+-----------+------+---------+-------+--------------+------------+-----------------------+---------------+---------------+----------------+-----------------+------------------+
|loyalty_number|country|        province|        city|postal_code|gender|education| salary|marital_status|loyalty_card|customer_lifetime_value|enrollment_type|enrollment_year|enrollment_month|cancellation_year|cancellation_month|
+--------------+-------+----------------+------------+-----------+------+---------+-------+--------------+------------+-----------------------+---------------+---------------+----------------+-----------------+------------------+
|        100018| Canada|         Alberta|    Edmonton|    T9G 1W3|Female| Bachelor|92552.0|       Married|      Aurora|                   NULL|       Standard|           NULL|            NULL|             NULL|              NULL|
|        100102| Canada|         Ontario|     Toronto|    M1R 4K3|  Male|  Colle

In [113]:
# Gender, Education

df3 = spark_session.sql("""
    SELECT 
    cl.gender,
    cl.education,
    SUM(cf.total_flights) AS total_flights,
    SUM(cf.distance) AS total_distance
    FROM dedup_cust_loyalty cl
    LEFT JOIN cust_flight cf ON cf.loyalty_number = cl.loyalty_number
    GROUP BY cl.gender, cl.education
""")

pandas_df3 = df3.toPandas()
pandas_df3

Unnamed: 0,gender,education,total_flights,total_distance
0,Male,Master,7448,11240949.0
1,Female,Bachelor,158977,238622785.0
2,Female,Doctor,10944,16302635.0
3,Male,Bachelor,158025,237184483.0
4,Male,High School or Below,11628,17569994.0
5,Male,Doctor,11318,16855251.0
6,Female,High School or Below,12612,18747053.0
7,Female,Master,8127,12358796.0
8,Female,College,63412,94735859.0
9,Male,College,66317,99243942.0


In [114]:
fig = px.bar(pandas_df3,x='gender',y='total_flights',color='education',title='Flights based on Gender and Education')
fig.show()
# pio.write_image(fig, "bar_gender_edu.jpeg")

In [115]:
# Province

df4 = spark_session.sql("""
    SELECT 
    cl.province,
    SUM(cf.total_flights) AS total_flights,
    SUM(cf.distance) AS total_distance
    FROM dedup_cust_loyalty cl
    LEFT JOIN cust_flight cf ON cf.loyalty_number = cl.loyalty_number
    GROUP BY cl.province
    ORDER BY total_flights DESC
""")

pandas_df4 = df4.toPandas()
pandas_df4

Unnamed: 0,province,total_flights,total_distance
0,Ontario,164105,246414270.0
1,British Columbia,135178,202238759.0
2,Quebec,99973,149695312.0
3,Alberta,29468,44212860.0
4,New Brunswick,19960,29888519.0
5,Manitoba,19175,28712662.0
6,Nova Scotia,15514,23422689.0
7,Saskatchewan,12278,18324350.0
8,Newfoundland,7635,11601707.0
9,Yukon,3699,5598097.0


In [116]:
fig = px.bar(pandas_df4,x='province',y='total_flights',title='Top Cities with Most Flights')
fig.show()
# pio.write_image(fig, "bar_cities.jpeg")

In [117]:
# Province and Salary

df5 = spark_session.sql("""
    SELECT 
    cl.province,
    MEDIAN(cl.salary) AS salary
    FROM dedup_cust_loyalty cl
    LEFT JOIN cust_flight cf ON cf.loyalty_number = cl.loyalty_number
    GROUP BY cl.province
    ORDER BY salary DESC
""")

pandas_df5 = df5.toPandas()
pandas_df5

Unnamed: 0,province,salary
0,Prince Edward Island,79325.0
1,Nova Scotia,75586.0
2,Saskatchewan,75318.0
3,British Columbia,74353.0
4,Ontario,74046.0
5,Quebec,73715.0
6,New Brunswick,73135.0
7,Alberta,71992.0
8,Newfoundland,71952.0
9,Manitoba,71411.0
