In [1]:
import org.apache.spark._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.SparkSession

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.0.113:4040
SparkContext available as 'sc' (version = 3.1.1, master = local[*], app id = local-1621948633552)
SparkSession available as 'spark'


import org.apache.spark._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.SparkSession


In [2]:
//Load flightData.csv into dataframe
val flightDF = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("./flightData.csv")

flightDF: org.apache.spark.sql.DataFrame = [passengerId: int, flightId: int ... 3 more fields]


In [3]:
//Check if schema is properly ingested according to the defined schema
flightDF.printSchema

root
 |-- passengerId: integer (nullable = true)
 |-- flightId: integer (nullable = true)
 |-- from: string (nullable = true)
 |-- to: string (nullable = true)
 |-- date: string (nullable = true)



In [4]:
//display top/bottom 10 records from flightDF to check the data ingested
flightDF.tail(10)

res1: Array[org.apache.spark.sql.Row] = Array([12704,999,co,cg,2017-12-31], [12078,999,co,cg,2017-12-31], [14582,999,co,cg,2017-12-31], [2689,999,co,cg,2017-12-31], [2244,999,co,cg,2017-12-31], [14100,999,co,cg,2017-12-31], [9397,999,co,cg,2017-12-31], [7682,999,co,cg,2017-12-31], [12429,999,co,cg,2017-12-31], [9217,999,co,cg,2017-12-31])


Question 1 - Find the total number of flights for each month

In [5]:
//Extract month from the date field and name the field as month, and get distinct count of flightId grouped by month

val FlightsPerMonth = flightDF.withColumn("Month", month(to_date($"date"))).groupBy($"Month")
    .agg(countDistinct($"flightId").alias("Number of Flights"))
    .orderBy($"Month")

FlightsPerMonth: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Month: int, Number of Flights: bigint]


In [6]:
//Dispaly the result to check if the business logic is correctly implemented on the flightDF
FlightsPerMonth.show()

+-----+-----------------+
|Month|Number of Flights|
+-----+-----------------+
|    1|               97|
|    2|               73|
|    3|               82|
|    4|               92|
|    5|               92|
|    6|               71|
|    7|               87|
|    8|               76|
|    9|               85|
|   10|               76|
|   11|               75|
|   12|               94|
+-----+-----------------+



In [7]:
//Export the result as csv
FlightsPerMonth.coalesce(1)//to create one partition
      .write
      .option("header","true")
      .option("sep",",")
      .mode("overwrite")
      .csv("./Flights Per Month")

Question 2 - Find the names of the 100 most frequent flyers

In [8]:
//Load passengers.csv into dataframe

val passengerDF = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("./passengers.csv")

passengerDF: org.apache.spark.sql.DataFrame = [passengerId: int, firstName: string ... 1 more field]


In [9]:
//Check the schema
passengerDF.printSchema

root
 |-- passengerId: integer (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)



In [10]:
//display top/bottom 20 records to check the data ingested

passengerDF.show()

+-----------+---------+--------+
|passengerId|firstName|lastName|
+-----------+---------+--------+
|      14751| Napoleon| Gaylene|
|       2359| Katherin| Shanell|
|       5872|   Stevie|  Steven|
|       3346|Margarita|   Gerri|
|       3704|    Earle|  Candis|
|       1226|    Trent|    Omer|
|       2677|    Janee|  Lillia|
|        179|     Gita|Chastity|
|       9763|   Hilton|Jaquelyn|
|      11414|      Leo|Margaret|
|       6870|     Tama|     Bok|
|       3290|    Logan|    Anya|
|      13264|   Lowell|Kathryne|
|        455|  Maritza|  Maxima|
|      13006|     Yuri|   Joyce|
|      10323|  Latasha|  Estell|
|       7376|   Kaycee|Kiersten|
|      15015|   Curtis| Abraham|
|       9217|   Verena|Josefine|
|       5183|     Loan| Latonya|
+-----------+---------+--------+
only showing top 20 rows



In [11]:
//Count the flights for each passengers in descending order

val frequentFlyersDF = flightDF
  .join(passengerDF, flightDF("passengerId") ===  passengerDF("passengerId"),"inner")
  .groupBy(flightDF("passengerId").as("Passenger ID"), $"firstName".as("First name"), $"lastName".as("Last name"))
  .agg(count($"flightId")
  .as("Number of Flights")).orderBy($"Number of Flights".desc).limit(100)

frequentFlyersDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Passenger ID: int, First name: string ... 2 more fields]


In [12]:
//Show the top 20 frequent flyers to check if the logic has been implemented properly

frequentFlyersDF.show()

+------------+----------+---------+-----------------+
|Passenger ID|First name|Last name|Number of Flights|
+------------+----------+---------+-----------------+
|        2068|   Yolande|     Pete|               32|
|        1677| Katherina| Vasiliki|               27|
|        4827|     Jaime|    Renay|               27|
|        3173|  Sunshine|    Scott|               26|
|        8961|     Ginny|    Clara|               26|
|         288|    Pamila|    Mavis|               25|
|         917|    Anisha|   Alaine|               25|
|        5096|    Blythe|     Hyon|               25|
|         760|    Vernia|      Mui|               25|
|        6084|      Cole|   Sharyl|               25|
|        8363|    Branda|   Kimiko|               25|
|        5867|     Luise|  Raymond|               25|
|        2857|       Son|  Ginette|               25|
|        3367| Priscilla|    Corie|               24|
|        1343|   Bennett|    Staci|               24|
|        1240| Catherine|   

In [13]:
//Export the result as csv
frequentFlyersDF.coalesce(1)//to create one partition
      .write
      .option("header","true")
      .option("sep",",")
      .mode("overwrite")
      .csv("./Top 100 Frequent Flyers")

Question 3 - Find the greatest number of countries a passenger has been in without being in the UK

In [14]:
//Get the last flight for each passenger to be unioned with the original flight dataframe

val w = Window.partitionBy($"passengerId")

val lastFlightDF = flightDF.withColumn("seq", row_number().over(w.orderBy($"date".desc)))
  .filter($"seq" === "1").drop("seq","from").withColumnRenamed("to", "itinerary")


w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@565c7d8f
lastFlightDF: org.apache.spark.sql.DataFrame = [passengerId: int, flightId: int ... 2 more fields]


In [15]:
//Append the last "to" country to "from" countries for each passenger with a renamed column "itinerary"

val itineraryDF = flightDF
    .drop("to")
    .withColumnRenamed("from", "itinerary")
    .union(lastFlightDF)

itineraryDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [passengerId: int, flightId: int ... 2 more fields]


In [16]:
//Convert the itinerary into a single array for each passenger 

val itineraryArrayDF = itineraryDF
  .groupBy("passengerId")
  .agg(collect_list("itinerary") as "itinerary")

itineraryArrayDF: org.apache.spark.sql.DataFrame = [passengerId: int, itinerary: array<string>]


In [17]:
//Create a udf that will split each array of itinerary by uk and get the max number of the splitted arrays of the
//distinct countires 

val longestRunUDF = udf((ar: Array[String]) => ar.mkString(" ")
      .split("uk") 
      .filter(_.nonEmpty)
      .map(_.trim)
      .map(s => s.split(" ").distinct.length)
      .max)

longestRunUDF: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$4419/571972504@775e7bda,IntegerType,List(Some(class[value[0]: array<string>])),Some(class[value[0]: int]),None,false,true)


In [18]:
//Call the udf on the itineraryArrayDF
val longestRunDF = itineraryArrayDF.withColumn("Longest Run", longestRunUDF($"itinerary")).drop($"itinerary")

longestRunDF: org.apache.spark.sql.DataFrame = [passengerId: int, Longest Run: int]


In [19]:
//Disply the top 20 result and compare it with the orginal flight csv file for testing
longestRunDF.orderBy($"Longest Run".desc).show()

+-----------+-----------+
|passengerId|Longest Run|
+-----------+-----------+
|       9441|         19|
|        288|         19|
|       3608|         18|
|       2437|         18|
|       2867|         18|
|       2068|         18|
|       3117|         17|
|        888|         17|
|       3309|         17|
|       2378|         17|
|        798|         17|
|       1337|         17|
|       5668|         17|
|       1651|         17|
|       1074|         17|
|         92|         17|
|        950|         17|
|       2857|         17|
|       3173|         17|
|        525|         17|
+-----------+-----------+
only showing top 20 rows



In [20]:
//Export the result as csv
longestRunDF.coalesce(1)//to create one partition
      .write
      .option("header","true")
      .option("sep",",")
      .mode("overwrite")
      .csv("./Longest Run Per Passenger")

Question 4 - Find the passengers who have been on more than 3 flights together.

In [21]:
//Count the flights grouped by the passengers who are on the same flight & on the same date where the number of
// flights > 3

val flightTogetherDF = flightDF.as("df1").join(flightDF.as("df2"),
    $"df1.passengerId" < $"df2.passengerId" &&
    $"df1.flightId" === $"df2.flightId" &&
    $"df1.date" === $"df2.date",
    "inner"
  ).
  groupBy($"df1.passengerId".as("Passenger 1 ID"), $"df2.passengerId".as("Passenger 2 ID")).
  agg(count("*").as("Number of flights together")).
  where($"Number of flights together" >= 3)

flightTogetherDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Passenger 1 ID: int, Passenger 2 ID: int ... 1 more field]


In [22]:
// disply the top 20 results to compare it with the orginal flight csv for testing
flightTogetherDF.orderBy($"Number of flights together".desc)show()

+--------------+--------------+--------------------------+
|Passenger 1 ID|Passenger 2 ID|Number of flights together|
+--------------+--------------+--------------------------+
|           701|           760|                        15|
|          2717|          2759|                        14|
|          3503|          3590|                        14|
|          2939|          5490|                        13|
|          3278|          5423|                        12|
|          2550|          4441|                        12|
|           382|           392|                        12|
|           760|           763|                        12|
|          4316|          4373|                        12|
|          3021|          9522|                        12|
|           975|          1371|                        12|
|          2759|          4316|                        12|
|           366|           374|                        12|
|          4395|          4399|                        1

In [23]:
//Export the result as csv
flightTogetherDF.coalesce(1)//to create one partition
      .write
      .option("header","true")
      .option("sep",",")
      .mode("overwrite")
      .csv("./More than Three Flights Together")

In [24]:
import java.text.SimpleDateFormat
import java.util.Date

import java.text.SimpleDateFormat
import java.util.Date


In [25]:
//Create function that return the passengers who have been on more than N flights together within the range (from,to)
//Save the reult to a csv file

def flownTogether(atLeastNTimes: Int, from: Date, to: Date) = {
  val flightNTogetherDF = flightDF.as("df1").join(flightDF.as("df2"),
    $"df1.passengerId" < $"df2.passengerId" &&
    $"df1.flightId" === $"df2.flightId" &&
    $"df1.date" === $"df2.date" &&
    $"df1.date" >= from &&
    $"df1.date" <= to,
    "inner"
  ).
  groupBy($"df1.passengerId".as("Passenger 1 ID"), $"df2.passengerId".as("Passenger 2 ID")).
  agg(count("*").as("Number of flights together"), min($"df1.date").as("from"), max($"df1.date").as("to")).
  where($"Number of flights together" >= atLeastNTimes) 
  
   flightNTogetherDF.coalesce(1)//to create one partition
      .write
      .option("header","true")
      .option("sep",",")
      .mode("overwrite")
      .csv("./Flight N Together")
    
    println("The Flight N Together result has been saved as csv")
}


flownTogether: (atLeastNTimes: Int, from: java.util.Date, to: java.util.Date)Unit


In [26]:
//Create a function to generate from and to date to test the function flownTogether

object Utils {

    val DATE_FORMAT = "yyyy-MMM-dd"

    def convertStringToDate(s: String): Date = {
        val dateFormat = new SimpleDateFormat(DATE_FORMAT)
        dateFormat.parse(s)
    }

}


def convertDateStringToLong(dateAsString: String): Long = {
    Utils.convertStringToDate(dateAsString).getTime
}


defined object Utils
convertDateStringToLong: (dateAsString: String)Long


In [27]:
// Testing the function flownTogether and compare the result to the orginal flightData.csv to check the logic
val from = new java.sql.Date(convertDateStringToLong("2017-Dec-18"))
val to = new java.sql.Date(convertDateStringToLong("2017-Dec-28"))

flownTogether(2, from, to)

The Flight N Together result has been saved as csv


from: java.sql.Date = 2017-12-18
to: java.sql.Date = 2017-12-28
