In [1]:
import findspark
findspark.init()

In [2]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.types import * 
import pyspark.sql.functions as F
from pyspark.sql.functions import col, asc,desc
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from pyspark.sql import SQLContext
from pyspark.mllib.stat import Statistics
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler,StandardScaler
from pyspark.ml import Pipeline
from sklearn.metrics import confusion_matrix

spark=SparkSession.builder \
.master ("local[*]")\
.appName("week3")\
.getOrCreate()
sc=spark.sparkContext
sqlContext=SQLContext(sc)



In [None]:
# This is a kaggle travel dataset that contains information about business travel. 
# You will have 3 different csvs namely flights,hotels and users

## 1. Read all the available csvs into 3 different dataframes

In [3]:
flights= spark.read.option('header','true').option('inferschema','true')\
.csv('flights.csv')
flights.show(3)

+----------+--------+------------------+------------------+----------+-------+----+--------+-----------+----------+
|travelCode|userCode|              from|                to|flightType|  price|time|distance|     agency|      date|
+----------+--------+------------------+------------------+----------+-------+----+--------+-----------+----------+
|         0|       0|       Recife (PE)|Florianopolis (SC)|firstClass|1434.38|1.76|  676.53|FlyingDrops|09/26/2019|
|         0|       0|Florianopolis (SC)|       Recife (PE)|firstClass|1292.29|1.76|  676.53|FlyingDrops|09/30/2019|
|         1|       0|     Brasilia (DF)|Florianopolis (SC)|firstClass|1487.52|1.66|  637.56|    CloudFy|10/03/2019|
+----------+--------+------------------+------------------+----------+-------+----+--------+-----------+----------+
only showing top 3 rows



In [4]:
hotels= spark.read.option('header','true').option('inferschema','true')\
.csv('hotels.csv')
hotels.show(3)

+----------+--------+-------+------------------+----+------+-------+----------+
|travelCode|userCode|   name|             place|days| price|  total|      date|
+----------+--------+-------+------------------+----+------+-------+----------+
|         0|       0|Hotel A|Florianopolis (SC)|   4|313.02|1252.08|09/26/2019|
|         2|       0|Hotel K|     Salvador (BH)|   2|263.41| 526.82|10/10/2019|
|         7|       0|Hotel K|     Salvador (BH)|   3|263.41| 790.23|11/14/2019|
+----------+--------+-------+------------------+----+------+-------+----------+
only showing top 3 rows



In [5]:
users= spark.read.option('header','true').option('inferschema','true')\
.csv('users.csv')
users.show(3)

+----+-------+--------------+------+---+
|code|company|          name|gender|age|
+----+-------+--------------+------+---+
|   0|   4You|     Roy Braun|  male| 21|
|   1|   4You|Joseph Holsten|  male| 37|
|   2|   4You| Wilma Mcinnis|female| 48|
+----+-------+--------------+------+---+
only showing top 3 rows



## 2. Find the max distance travelled by any business traveller

In [6]:
flights = flights.withColumn("distance", flights["distance"].cast("float"))

In [7]:
distance_sum = flights.groupBy("userCode").agg(F.sum("distance").alias("total_distance"))
max_distance = distance_sum.agg(F.max("total_distance").alias("max_distance"))

In [8]:
result = max_distance.join(distance_sum, max_distance["max_distance"]==distance_sum["total_distance"],"inner").show()

+------------------+--------+------------------+
|      max_distance|userCode|    total_distance|
+------------------+--------+------------------+
|271864.20068359375|     925|271864.20068359375|
+------------------+--------+------------------+



In [9]:
result = max_distance.join(distance_sum, max_distance["max_distance"]==distance_sum["total_distance"],"inner").show()

+------------------+--------+------------------+
|      max_distance|userCode|    total_distance|
+------------------+--------+------------------+
|271864.20068359375|     925|271864.20068359375|
+------------------+--------+------------------+



## 3. Find the to and from locations of the longest business travel

In [10]:
longest_travel = flights.agg(F.max("distance").alias("distance"))

In [11]:
route = flights.select("from","to","distance")

In [12]:
longest_travel.join(route,"distance","inner").show(1)
# Only 1 was showed because since there were multiple users who followed the same route, there were many records with those
# distance, from and to values. 

+--------+------------------+-------------+
|distance|              from|           to|
+--------+------------------+-------------+
|  937.77|Florianopolis (SC)|Salvador (BH)|
+--------+------------------+-------------+
only showing top 1 row



## 4. What is the most popular hotel by number of times booked

In [13]:
count_hotels = hotels.groupBy("name").count()
count_hotels.orderBy(desc("count")).show()

+--------+-----+
|    name|count|
+--------+-----+
| Hotel K| 5094|
|Hotel CB| 5029|
|Hotel BD| 4829|
|Hotel AF| 4828|
|Hotel AU| 4467|
|Hotel BP| 4437|
|Hotel BW| 4333|
| Hotel Z| 4205|
| Hotel A| 3330|
+--------+-----+



## 5. What is the most expensive hotel by average price

In [14]:
hotel_avg_price = hotels.groupBy("name").agg(F.avg("price").alias("price"))

In [15]:
hotel_avg_price.orderBy(desc("price")).show()

+--------+------------------+
|    name|             price|
+--------+------------------+
| Hotel A| 313.0200000000128|
|Hotel AU| 312.8300000000049|
| Hotel K|   263.40999999999|
|Hotel BP|247.62000000000103|
|Hotel BD|  242.879999999992|
| Hotel Z|208.04000000000943|
|Hotel CB| 165.9899999999925|
|Hotel AF|139.09999999998743|
|Hotel BW|  60.3900000000067|
+--------+------------------+



## 6. What is the most least hotel by average price

In [16]:
hotel_avg_price.orderBy("price").show()

+--------+------------------+
|    name|             price|
+--------+------------------+
|Hotel BW|  60.3900000000067|
|Hotel AF|139.09999999998743|
|Hotel CB| 165.9899999999925|
| Hotel Z|208.04000000000943|
|Hotel BD|  242.879999999992|
|Hotel BP|247.62000000000103|
| Hotel K|   263.40999999999|
|Hotel AU| 312.8300000000049|
| Hotel A| 313.0200000000128|
+--------+------------------+



## 7. How many hotels are pricier than overall average price (across all hotels)

In [17]:
avg_hotel_price = hotels.agg(F.avg("price").alias("avg_price")).first()["avg_price"]

In [18]:
avg_hotel_price

214.43955390609025

In [19]:
result7 = hotels.filter(hotels["price"]>avg_hotel_price).count()
result7

22157

In [20]:
hotels_avg = hotels.groupBy("name").agg(F.avg("price").alias("price"))

In [21]:
hotels_avg.show()

+--------+------------------+
|    name|             price|
+--------+------------------+
| Hotel K|   263.40999999999|
|Hotel BW|  60.3900000000067|
| Hotel A| 313.0200000000128|
|Hotel AU| 312.8300000000049|
|Hotel BD|  242.879999999992|
|Hotel AF|139.09999999998743|
|Hotel BP|247.62000000000103|
|Hotel CB| 165.9899999999925|
| Hotel Z|208.04000000000943|
+--------+------------------+



In [22]:
result7100 = hotels_avg.filter(hotels_avg["price"]>avg_hotel_price).count()

In [23]:
result7100

5

## 8. Identify the name(s) of user who travelled the most

In [46]:
users = users.withColumnRenamed("code","userCode")

In [51]:
joined_df = flights.join(users,"userCode","left")

In [72]:
count_users = joined_df.groupBy("name").count().alias("times_travelled")

In [74]:
# This line of code was obtained with the help of chat gpt because I was getting an error. 
joined_df.createOrReplaceTempView("joined_data")

In [87]:
# This cell was also generated by chat GPT after my attempt to use the .agg(max) method failed. 
name_counts = spark.sql("""SELECT name, COUNT(*) as name_count FROM joined_data
    GROUP BY name""")

In [105]:
most_travelled = name_counts.orderBy(name_counts.name_count.desc()).first()

In [106]:
most_travelled

Row(name='Sonia Malaspina', name_count=400)

## 9. What is the average spending on hotels categorized by gender

In [97]:
joined_df2 = hotels.join(users,"userCode","inner")

In [103]:
joined_df2.groupBy("Gender").agg(F.avg(joined_df2.total).alias("average spending")).show()

+------+-----------------+
|Gender| average spending|
+------+-----------------+
|  none|535.7719371846295|
|female| 535.570358655982|
|  male|537.3409635627846|
+------+-----------------+



## 10. What is the average spending on flights of users who stayed atleast once in the "Hotel A" categorized by gender

In [118]:
hotels = hotels.withColumnRenamed("name","Hotel Name")

In [127]:
flights = flights.withColumnRenamed("price","Flight Price")

In [128]:
joined_df3 = hotels.join(users,"userCode","inner")

In [129]:
joined_df4 = joined_df3.join(flights,"userCode","inner")

In [130]:
filtered_df4 = joined_df4.filter(joined_df3["Hotel Name"]== "Hotel A")

In [141]:
#These people stayed at least once in Hotel A
user_code_column = filtered_df4.select("userCode","Gender")

In [142]:
df5 = flights.join(user_code_column,"userCode","left")

In [143]:
df5.groupBy("Gender").agg(F.avg(df5["Flight Price"]).alias("average flight spending")).show()

+------+-----------------------+
|Gender|average flight spending|
+------+-----------------------+
|  none|      909.7551104689924|
|  NULL|     1044.0291359370394|
|female|      908.4486398492921|
|  male|      908.8239856333097|
+------+-----------------------+

