# Analyzing customer flight data using PySpark

By: Cory Morris

In [None]:
from pyspark import SQLContext, SparkContext
from pyspark.sql.types import DateType, FloatType
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import numpy as np
import pandas as pd

"""The Notebook that I am using automatically intializes SparkContext.
Therefore, usual you would need to run the following line of code to Create a SparkContext.
"""
# sc = SparkContext()

## initialize SQL Contexts 
#sqlContext = SQLContext(sc)

## Data Cleaning & Munging

Load Data

In [None]:
file_name = "enter_your_path_here"

orig_data = (sqlContext.read.format("com.databricks.spark.csv").option("header", "true")\
             .option("inferSchema", "true").load(file_name))
orig_data.cache()

Need to get a look at the data

In [None]:
orig_data.describe().toPandas().T

We need to remove rows with faulty Gendercode and BirthdateID

In [None]:
orig_data = orig_data.where(col('birthdateid').isNotNull())
orig_data = orig_data.where(col('GenderCode') != "")

Replace faulty values in Age column with median value

In [None]:
orig_data.fillna(40, subset=['Age'])
orig_data = orig_data.withColumn("Age", when(col('Age') < 0, 40).otherwise(col('Age')))
orig_data = orig_data.withColumn("Age", when(col('Age') > 120, 40).otherwise(col('Age')))

Replace NAs in UflyRewardsNumber with 0

In [None]:
orig_data = orig_data.fillna(0, subset=['UFlyRewardsNumber'])

Replace Missing values in UflyMemberStatus with "Non-Fly"

In [None]:
orig_data = orig_data.withColumn('UflyMemberStatus', when(col("UflyMemberStatus") == "", "non-ufly").\
                                   otherwise(col("UflyMemberStatus")))

Retaining only those rows which have single occurrence of PNRLocatorID, CouponSeqNbr, PaxName, ServiceStartCity, ServiceEndCity, ServiceStartDate combination.

In [None]:
df2 = orig_data.groupBy("PNRLocatorID", "CouponSeqNbr", "PaxName", "ServiceStartCity", "ServiceEndCity", "ServiceStartDate").\
                        agg(count(lit(1)).alias("num_records"))
orig_data = orig_data.join(df2, ["PNRLocatorID", 
                                   "CouponSeqNbr", 
                                   "PaxName", 
                                   "ServiceStartCity", 
                                   "ServiceEndCity", 
                                   "ServiceStartDate"]).filter(col("num_records") ==1)
orig_data.drop(col("test_id")).drop(col("num_records"))

Remove rows with faulty city codes as BookingChannel. Some rows have city names for Booking Channel.

In [None]:
orig_data = orig_data.withColumn("BookingChannel", when((col("BookingChannel") != "Outside Booking") & \
                                                         (col("BookingChannel") != "SCA Website Booking") &\
                                                         (col("BookingChannel") != "Tour Operator Portal") &\
                                                         (col("BookingChannel") != "Reservations Booking") &\
                                                         (col("BookingChannel") != "SY Vacation"), "Other").\
                                   otherwise(col("BookingChannel")))

Remove rows with MarketingAirlineCode code other than "SY" *(the airline code for the airline of interest)*.

In [None]:
orig_data = orig_data.where(col("MarketingAirlineCode")=='SY')

Create a new column called error which contains 1 if the PNR is errored or 0 otehrwise. (Error PNR refers to those which do not start with coupon sequence number 1.)

In [None]:
df3 = orig_data.groupBy("PNRLocatorID").agg(when(min(col("CouponSeqNbr")) != 1, 1).otherwise(0).alias("PNR_error"))
orig_data = orig_data.join(df3, ["PNRLocatorID"])

Retaining only the non-errored rows and check how many rows are remaining.

In [None]:
clean_data = orig_data.where(col("PNR_error")==0)

## Data Transformation and Aggregation
This section will aggregate records for each unique customer using the UID. This is needed since we want to cluster and therefore need one record of data for each customer. 

Create Age Group Bins

In [None]:
clean_data = clean_data.withColumn("age_group", when((col('Age')>=0) & (col('Age')<18), "0-17").\
                                   when((col('Age')>=18) & (col('Age')<25), "18-24").\
                                  when((col('Age')>=25) & (col('Age')<35), "25-34").\
                                  when((col('Age')>=35) & (col('Age')<55), "35-54").\
                                  when((col('Age')>=55), "55+").otherwise(0))

In [None]:
clean_data.cache()

Find the True Origins for each customer

In [None]:
true_origins = clean_data.sort(col('PNRLocatorID').asc()).sort(col('PaxName').asc()).sort(col('CouponSeqNbr').asc())
true_origins = true_origins.groupBy('PNRLocatorID', 'PaxName').agg(first("ServiceStartCity").alias("true_origin")).distinct()

aggregate_data = clean_data.join(true_origins, ["PNRLocatorID", "PaxName"])

Find the True Destinations for each customer

In [None]:
final_destination = clean_data.sort(col('PNRLocatorID').asc()).sort(col('PaxName').asc()).sort(col('CouponSeqNbr').asc())
final_destination = final_destination.groupBy('PNRLocatorID', 'PaxName').agg(last("ServiceEndCity")\
                                                                             .alias("final_destination")).distinct()

aggregate_data = aggregate_data.join(final_destination, ["PNRLocatorID","PaxName"])

Convert Service Start date to Date type

In [None]:
aggregate_data = aggregate_data.withColumn("ServiceStartDate", (col("ServiceStartDate").cast('date')))

Find the place of maximum stay during the trip.

In [None]:
windowSpec = Window.partitionBy(aggregate_data.PNRLocatorID, aggregate_data.PaxName)\
                    .orderBy(aggregate_data.ServiceStartDate)
aggregate_data = aggregate_data.withColumn("lead_Date", lead(aggregate_data.ServiceStartDate, 1).over(windowSpec))

In [None]:
aggregate_data = aggregate_data.withColumn("stay", datediff(aggregate_data.lead_Date, aggregate_data.ServiceStartDate))
aggregate_data = aggregate_data.withColumn("stay", when(isnull(aggregate_data.stay), 0).otherwise(aggregate_data.stay))

In [None]:
df_stay = aggregate_data.groupBy("PNRLocatorID", "PaxName").agg(max(col("stay")).alias("max_stay"))

# merge back to aggregate_data dataframe
aggregate_data = aggregate_data.join(df_stay, ["PNRLocatorID", "PaxName"])

In [None]:
df = aggregate_data.groupBy("PNRLocatorID", "PaxName").agg(first(when(col("stay")==col("max_stay"),\
                                                                col("ServiceEndCity"))).alias("true_destination"))
aggregate_data = aggregate_data.join(df, ["PNRLocatorID", "PaxName"])

Next, determine if the trip was a one-way or round-trip. The trip is considered a round trip if the service end city (Final Destination) will be the same as the service start city (True Origin).

In [None]:
aggregate_data = aggregate_data.withColumn("round_trip", when(col("true_origin")==col("final_destination"), 1).otherwise(0))

Next, determine the group size, the number of people who traveled together in each trip.

In [None]:
grp_size = aggregate_data.groupBy("PNRLocatorID").agg(countDistinct(col("uid")).alias("group_size"))

In [None]:
aggregate_data = aggregate_data.join(grp_size, "PNRLocatorID")

Next, create a binary inidcator for whether the group-size was 1 person or more, i.e. flight was flown by a single customer.
*(0 = group size of 1 customer,
1 = group size was more than 1 customer)*

In [None]:
aggregate_data = aggregate_data.withColumn("group", when(col("group_size")>1,1).otherwise(0))

Next, handle seasonality in terms of quaters. Assign Q1 to Q4 based on the quarter of the year in which the trip was made.

In [None]:
aggregate_data = aggregate_data.withColumn("seasonality", 
                                    when((month(col('ServiceStartDate'))>=1) & (month(col('ServiceStartDate'))<=3), "Q1").\
                                   when((month(col('ServiceStartDate'))>=4) & (month(col('ServiceStartDate'))<=6), "Q2").\
                                  when((month(col('ServiceStartDate'))>=7) & (month(col('ServiceStartDate'))<=9), "Q3").\
                                  when((month(col('ServiceStartDate'))>=10) & (month(col('ServiceStartDate'))<=12), "Q4"))

Finally, calculate the number of days the ticket was booked in advance. It is the difference between PNRCreateDate and ServiceStartDate.

In [None]:
aggregate_data = aggregate_data.withColumn("PNRCreateDate", (col("PNRCreateDate").cast('date')))

aggregate_data = aggregate_data.withColumn("days_pre_booked", floor(datediff(aggregate_data.ServiceStartDate, 
                                                                             aggregate_data.PNRCreateDate)))

aggregate_data = aggregate_data.withColumn("PostalCode", col("PostalCode").cast('int'))
aggregate_data = aggregate_data.withColumn("EnrollDate", col("EnrollDate").cast('date'))
aggregate_data = aggregate_data.withColumn("MarketingFlightNbr", col("MarketingFlightNbr").cast('int'))

## Group by PNR, UID and PaxName to get Final Dataset to use in Spark ML

In [None]:
final_agg_df = aggregate_data.groupBy('PNRLocatorID','uid','PaxName').agg(first('ServiceStartDate').alias("ServiceStartDate"),\
                                                                first("BookingChannel").alias("BookingChannel"),\
                                                                mean("TotalDocAmt").alias("Avg_Amount"),\
                                                                first("UFlyRewardsNumber").alias("UFlyRewardsNum"),\
                                                                first("UflyMemberStatus").alias("UFly_Status"),\
                                                                last("age_group").alias("Age_Group"),\
                                                                first("true_origin").alias("True_Origin"),\
                                                                first("true_destination").alias("True_Destination"),\
                                                                first("round_trip").alias("Round_Trip"),\
                                                                first("group_size").alias("Group_Size"),\
                                                                first("group").alias("Group"),\
                                                                last("seasonality").alias("Seasonality"),\
                                                                max("days_pre_booked").alias("Days_pre_Booked"))

final_agg_df.cache()

## Convert Aggregated Data into RDD

This is needed as Spark RDD's (Spark MLlib) have more options when it comes to clustering algorithms than do dataframes (Spark ML). 

*Other clustering algorithms were used but are not shown here.*

In [None]:
sunRDD = final_agg_df.rdd.map(lambda row: (row[0:16]))
sunRDD_req = final_agg_df.rdd.map(lambda row: (row[5], row[12], row[15]))

## Normalize the Data using Min-Max Scaling

In [None]:
avg_amount_spent = sunRDD_req.map(lambda x: x[0])
group_size = sunRDD_req.map(lambda x: x[1])
booking_days = sunRDD_req.map(lambda x: x[2])

avg_amount_min = avg_amount_spent.min()
avg_amount_max = avg_amount_spent.max()
group_size_min = group_size.min()
group_size_max = group_size.max()
booking_days_min = booking_days.min()
booking_days_max = booking_days.max()

sunRDD_norm = sunRDD_req.map(lambda row: ((float(row[0])-avg_amount_min)/(avg_amount_max - avg_amount_min),
                                          (float(row[1])-group_size_min)/(group_size_max-group_size_min),
                                          (float(row[2])-booking_days_min)/(booking_days_max-booking_days_min)))

## Run K-Means Clustering

In [None]:
from pyspark.mllib.clustering import KMeans
clusters = KMeans.train(sunRDD_norm, 4, maxIterations=100, initializationMode='random')

Append the cluster label to each RDD

In [None]:
cluster_id = clusters.predict(sunRDD_norm)
sunRDD_final = sunRDD.zip(cluster_id)

In [None]:
sunRDD_final = sunRDD_final.map(lambda line: (line[0][0],
                                             line[0][1],
                                             line[0][2],
                                             line[0][3],
                                             line[0][4],
                                             line[0][5],
                                             line[0][6],
                                             line[0][7],
                                             line[0][8],
                                             line[0][9],
                                             line[0][10],
                                             line[0][11],
                                             line[0][12],
                                             line[0][13],
                                             line[0][14],
                                             line[0][15],
                                             line[1]))

## Convert RDD's back to Dataframe

I converted the RDD's back to a Dataframe for the team to then visualize the clusters.

In [None]:
from pyspark.sql import Row

sun_final_df = sunRDD_final.map(lambda p: Row(PNRLocatorID = (p[0]),
                                             uid = (p[1]),
                                             PaxName = (p[2]),
                                             ServiceStartDate = (p[3]),
                                             BookingChannel = (p[4]),
                                             Avg_Amount_Spent = float(p[5]),
                                             UFlyRewardsNum = (p[6]),
                                             UFly_Status = (p[7]),
                                             Age_Group = (p[8]),
                                             True_Origin = (p[9]),
                                             True_Destination = (p[10]),
                                             Round_Trip = (p[11]),
                                             Group_Size = int(p[12]),
                                             Group_Flag = (p[13]),
                                             Season = (p[14]),
                                             Days_pre_Booked = int(p[15]),
                                             Cluster = (p[16])))

In [None]:
sun_final_df.limit(5).toPandas()