## Spark examples using Airport Data
I started with this, but it doesn't exist anymore. [This intro](http://sparktutorials.net/analyzing-flight-data:-a-gentle-introduction-to-graphx-in-spark)

For more info on the dataset, see: [This page](http://stat-computing.org/dataexpo/2009/the-data.html)

### Getting started
Read the data into a Dataframe

In [None]:
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

In [None]:
import org.apache.spark.sql.functions._

In [None]:
val df_1 = spark.read.format("com.databricks.spark.csv").option("header", "true").load("/Users/Ethan/notebooks/2008.csv")

### How many items are in the dataframe?

In [None]:
df_1.count

### What are all the Column names in the Dataframe?

In [None]:
df_1.printSchema

### Select out the Origin and Dest columns and show the top 20 results

In [None]:
df_1.select(col("Origin"), col("Dest")).show

### Save off the unique airport codes into their own Dataframe. 

In [None]:
val airport_codes = df_1.select(col("Origin")).distinct.union(df_1.select(col("Dest"))).distinct
airport_codes.cache
airport_codes.show

## Make a GraphX graph. make some nodes with properties
The vertices will have the form (id:Long,airportCode:String)

However, we aren't going to use a Hash function to create the ID's, so we will make to lookup Maps along the way:

idToCode and codeToId

In [None]:
val vertices = airport_codes.rdd.map(r => r.getString(0)).zipWithUniqueId.map(_.swap)
vertices.take(5).foreach(println)

In [None]:
val tmp = vertices.collect
val idToCode = Map(tmp:_*)
idToCode(0)

In [None]:
val codeToId = Map(tmp.map(_.swap) :_*)
codeToId("BGM")

## Now we can create our Edges from the data
Each Edge will connect the ID of each airport code (which is why we have the codeToId Map)

In [None]:
val flightsFromTo = df_1.select(col("Origin"),col("Dest"))
flightsFromTo.show

## Create our graph
*Note that we only want to analyze the distinct edges*

In [None]:
val edges = flightsFromTo.rdd.distinct.map(r => Edge(codeToId(r.getString(0)),codeToId(r.getString(1)), "route"))
val graph = Graph(vertices,edges)

## How many vertices and edges does the graph have?

In [None]:
graph.numVertices

In [None]:
graph.numEdges

## Use PageRank to find the most important airport

In [None]:
val ranks = graph.pageRank(0.001).vertices
ranks.cache
ranks.take(10).foreach(println)

In [None]:
ranks.sortBy(r => r._2, false).take(1b0).foreach(r => println(idToCode(r._1) + "\t" + r._2.toString))

## Verify that all of the airports are connected

In [None]:
val components = graph.connectedComponents
components.cache
components.vertices.take(10).foreach(println)

In [None]:
components.vertices.map(r => r._2).distinct.count

## Do some standard dataframe calculations
1. what is the most common origin? 
1. most common dest?
1. what is the most common route?

In [None]:
df_1.groupBy("Origin").agg(count("*") as "times_origin").orderBy(desc("times_origin")).show

In [None]:
df_1.groupBy("Dest").agg(count("*") as "times_dest").orderBy(desc("times_dest")).show

In [None]:
df_1.groupBy("Origin", "Dest").agg(count("*") as "route_count").orderBy(desc("route_count")).show

## Calculate the average carrier delay throughout the day
For now lets aggregate by hour and by airport. This requires standard DataFrame functions

In [None]:
val df_2 = df_1.withColumn("depHour", col("depTime").substr(0,2))
df_2.select("depTime", "depHour").show

### Oops - we need leading 0's on our times
Lets fix our depTime Column first. Write a Scala function called fixTime that formats a String to make sure it has 4 digits with a leading 0 if needed. 

In [None]:
def fixTime(timeStr: String) :String = {
   "%04d".format(timeStr.toInt)
}
fixTime("617")

### Convert the fixTime function into a User Defined function (udf)
*Note: you need to import org.apache.spark.sql._

In [None]:
import org.apache.spark.sql._
val fixTimeUdf = udf(fixTime(_:String))

### Create a new Dataframe and fix the depTime Column using the fixTimeUdf that was just created
* replacing the old column will require creating a new one, dropping the old, then renaming

In [None]:
val df_3 = df_1.withColumn("fixedDepTime",fixTimeUdf(col("depTime"))).drop("depTime").withColumnRenamed("fixedDepTime","depTime")
df_3.select("depTime").show

### Create a new Column called depHour that just contains the hour from depTime

In [None]:
val df_4 = df_3.withColumn("depHour",col("depTime").substr(0,2))
df_4.select("Origin","depHour").show

### Create a Dataframe that contains the total delay from the DepDelay column, grouping by Origin and depHour
*Use the .cast Column operator to convert DepDelay to an Int.*

*This will fail - can you figure out why?* 

In [None]:
val total_delays = df_4.groupBy("Origin","depHour").agg(sum(col("DepDelay").cast("Int")) as "total_delay")
total_delays.show

### Create a Scala method that converts a String to an Integer called failSafeToInt that returns 0 if the String is not a number

In [None]:
import scala.util.matching.Regex._
def failSafeToInt(input: String): Int = {
val num = """^\d+$""".r
   input match {
      case num() => input.toInt
      case _ => 0
   }
}
failSafeToInt("1")

In [None]:
failSafeToInt("NA")

### I don't know why this doesn't work (bonus points for fixing?) : 

In [None]:
val toIntUdf = udf(failSafeToInt(_:String))
val total_delays = df_4.groupBy("Origin","depHour").agg(sum(toIntUdf(col("DepDelay"))) as "total_delay")
total_delays.show

### So ... create a Dataframe after filtering out the N/A's in the DepDelay column and calculate totals from that

In [None]:
val total_delays = df_4.filter(col("DepDelay") !== "NA").groupBy("Origin","depHour").agg(sum(toIntUdf(col("DepDelay"))) as "total_delay")
total_delays.orderBy("Origin","depHour").show

## Now for some Windowing functions
1. Find the top 2 worst carriers in terms of delays at each airport
1. Calculate the difference between the top delayed airline at each airport

### Start by creating a Dataframe that contains the delays of each uniqueCarrier at each origin

In [None]:
val carrier_delays = df_4.groupBy("Origin","uniqueCarrier").agg(sum("CarrierDelay") as "totalCarrierDelay")
carrier_delays.orderBy(asc("Origin"), desc("totalCarrierDelay")).show

### Find the worst two carriers at each airport
The window specifies how we partition the data when later ranking. In our case, we partition by Origin. 

The window also specifies the ordering so that the rank function knows how to do its job

In [None]:
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("Origin").orderBy(desc("totalCarrierDelay"))
val ranked_df = carrier_delays.withColumn("rank", dense_rank().over(windowSpec) as "rank").filter(col("rank") <= 2)
ranked_df.orderBy("Origin","rank").show

### Now we use the window to find diffs between the worst and all the rest of the carriers in terms of delays
delay_diff is a Column expression built up from calling the max Column function on the carrier_delays dataframe. 

The .over() call turns the max into a windowing function. 

So max will operate in the partition specified by the window spec. Again, in our example we partitioned by airport. So the max function finds the max totalCarrierDelay within each partition, and subtracts the totalCarrierDelay for each carrier in that partition. 

In [None]:
val delay_diff = max(carrier_delays("totalCarrierDelay")).over(windowSpec) - carrier_delays("totalCarrierDelay")
val delay_diffs_df = carrier_delays.withColumn("delayDiff", delay_diff)
delay_diffs_df.orderBy("Origin","delayDiff").show

In [None]:
println(delay_diff)

## Now for some Dataset functions ...
1. create a case class called AirportCarriers that contains Origin and UniqueCarrier. 
1. create a DataFrame called carrier_df that just have the Origin and UniqueCarrier. 
1. create a Dataset from carrier_df using the .as[] methos

In [None]:
case class AirportCarriers(Origin: String, UniqueCarrier: String)
val carrier_df = df_1.dropDuplicates("Origin","UniqueCarrier") // .as[AirportCarriers]
carrier_df.show
val carrier_ds = carrier_df.as[AirpoirtCarriers]
carrier_ds.show

### Bonus points for fixing the next couple cells ...

In [None]:
val x = spark
import x.implicits._

In [None]:
// Okay, I can't import spark.implicits._. and I can't convert to a Dataset of Tuple(String,String,Double,Double)
// I would also like to be able to specify / construct an encoder for a case class made up of primitives
// Leaving this here, but don't need to do this now that I have imported spark.implicits._
val ds_1 = delay_diffs_df.as[(String,String,Double,Double)](Encoders.tuple(Encoders.STRING, Encoders.STRING, Encoders.DOUBLE, Encoders.DOUBLE))

### Create a Dataset from the delay_diffs Dataframe using the as[AirportCarriers] method and show the top 20 items

In [None]:
val ds_1 = delay_diffs_df.as[AirportCarriers]
ds_1.show

### Use groupByKey Dataset function to group ds_1 by the origin

In [None]:
ds_1.groupByKey(r => r.Origin)

### Use the following in the groupByKey and reduceGroups examples

In [None]:
case class AirportCarrierSets(Origin:String, UniqueCarrierSet: Set[String])
ds_1.map(r => AirportCarrierSets(r.Origin, Set(r.UniqueCarrier)))

## groupByKey / reduceGroups

if T is the Type of your Dataset rows (AirportCarriers)
and K is the Key that you are extracting from your Dataset to aggregate on:

groupByKey takes a function [T => K] and maps a Dataset[T] to a keyValueGroupedDataset[K,T]

then reduceGroups takes a function [(T,T) => T] and returns a Dataset of type [K,T]

In other words, group by your Key, and specify a function that takes two AirportCarriers and returns a single AirportCarrier

In [None]:
val ds_2 = ds_1.groupByKey(_.Origin).reduceGroups((a,b) => AirportCarriers(a.Origin, Set(a.UniqueCarrier.split(" ") :_*).union(Set(b.UniqueCarrier.split(" ") :_*)).mkString(" ")))
ds_2.show

In [None]:
val ans = ds_2.map(r => r._2)
println(ans)

In [None]:
ans.show