In [27]:
val flights = (spark.read.format("com.databricks.spark.csv")
               .option("header", "true")
               .option("inferSchema", "true")
               .option("delimiter", ",")
               .load("/spark-files/2008.csv"))

flights: org.apache.spark.sql.DataFrame = [Year: int, Month: int ... 27 more fields]


In [28]:
flights.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Car

In [29]:
flights.select("UniqueCarrier", "FlightNum", "DepDelay", "ArrDelay")

res14: org.apache.spark.sql.DataFrame = [UniqueCarrier: string, FlightNum: int ... 2 more fields]


In [30]:
val delayedFlights = flights.select("UniqueCarrier", "DepDelay").filter($"DepDelay" > 30)

delayedFlights: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [UniqueCarrier: string, DepDelay: string]


In [31]:
delayedFlights.show

+-------------+--------+
|UniqueCarrier|DepDelay|
+-------------+--------+
|           WN|      34|
|           WN|      67|
|           WN|      94|
|           WN|      51|
|           WN|      32|
|           WN|      87|
|           WN|      82|
|           WN|      39|
|           WN|      82|
|           WN|      56|
|           WN|     315|
|           WN|      45|
|           WN|      53|
|           WN|      38|
|           WN|      44|
|           WN|      58|
|           WN|      53|
|           WN|      83|
|           WN|      57|
|           WN|      97|
+-------------+--------+
only showing top 20 rows



In [35]:
val numTotalFlights = flights.count()
val numDelayedFlights = delayedFlights.count()

numTotalFlights: Long = 7009728
numDelayedFlights: Long = 814451


In [33]:
val percentageOfDelayedFlights = (numDelayedFlights.toFloat/numTotalFlights*100)

percentageOfDelayedFlights: Float = 11.618868


In [36]:
println("The percentage of delayed flights is: " + percentageOfDelayedFlights + "%")

The percentage of delayed flights is: 11.618868%


### Let's write a function to mark delayed flights

In [38]:
import org.apache.spark.sql.functions.udf

import org.apache.spark.sql.functions.udf


In [39]:
val isDelayedUDF = udf((time: String) => if (time == "NA") 0
                       else if (time.toInt > 30) 1 else 0)

isDelayedUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType)))


In [40]:
val flightsWithDelays = flights.select($"Year", $"Month", $"DayOfMonth",
                                       $"UniqueCarrier", $"FlightNum", $"DepDelay",
                                      isDelayedUDF($"DepDelay").alias("IsDelayed")
                                      )

flightsWithDelays: org.apache.spark.sql.DataFrame = [Year: int, Month: int ... 5 more fields]


In [41]:
flightsWithDelays.show(10)

+----+-----+----------+-------------+---------+--------+---------+
|Year|Month|DayOfMonth|UniqueCarrier|FlightNum|DepDelay|IsDelayed|
+----+-----+----------+-------------+---------+--------+---------+
|2008|    1|         3|           WN|      335|       8|        0|
|2008|    1|         3|           WN|     3231|      19|        0|
|2008|    1|         3|           WN|      448|       8|        0|
|2008|    1|         3|           WN|     1746|      -4|        0|
|2008|    1|         3|           WN|     3920|      34|        1|
|2008|    1|         3|           WN|      378|      25|        0|
|2008|    1|         3|           WN|      509|      67|        1|
|2008|    1|         3|           WN|      535|      -1|        0|
|2008|    1|         3|           WN|       11|       2|        0|
|2008|    1|         3|           WN|      810|       0|        0|
+----+-----+----------+-------------+---------+--------+---------+
only showing top 10 rows



### Let's find the average of flights that taxi-in

In [48]:
flights.select("Origin", "Dest", "TaxiIn")
               .groupBy("Origin", "Dest")
               .agg(avg("TaxiIn")
               .alias("AvgTaxiIn"))
               .orderBy(desc("AvgTaxiIn"))
               .show(10)

+------+----+------------------+
|Origin|Dest|         AvgTaxiIn|
+------+----+------------------+
|   BNA| FSD|              48.0|
|   RIC| RDU|              29.0|
|   VPS| LGA|              29.0|
|   XNA| SGF|              21.0|
|   CHA| DTW|20.732394366197184|
|   LAX| JAX|              20.5|
|   HSV| LGA|20.328358208955223|
|   BUR| RNO|              20.0|
|   MFE| ATL|              20.0|
|   MYR| PHL|              19.0|
+------+----+------------------+
only showing top 10 rows



### Let's run HIVE commands

In [49]:
sc.stop

In [50]:
import org.apache.spark.{SparkContext,SparkConf}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode


In [51]:
val conf = new SparkConf().setMaster("local").setAppName("HiveContext")

conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@1de0ea41


In [52]:
val sc = new SparkContext(conf);

sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@793d4617


In [56]:
val hiveContext:SQLContext = new HiveContext(sc)                                                

hiveContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@119f7694


In [58]:
hiveContext.setConf("hive.metastore.uiris","thrift://localhost:9083")

In [59]:
hiveContext.tables("default").show

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default| employee|      false|
+--------+---------+-----------+



### Let's load a parquet file and write back in another format - then verify

In [60]:
val babyNamesDF = hiveContext.read.parquet("/spark-files/baby_names.parquet")

babyNamesDF: org.apache.spark.sql.DataFrame = [year: int, name: string ... 2 more fields]


In [69]:
babyNamesDF.write.format("orc").mode(SaveMode.Append).saveAsTable("babynames")

In [78]:
hiveContext.tables("default").show

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default|babynames|      false|
| default| employee|      false|
+--------+---------+-----------+



<pre>(base) <font color="#4E9A06"><b>hadoopuser@hadoopuser-VirtualBox</b></font>:<font color="#3465A4"><b>~</b></font>$ hdfs dfs -ls /user/hive/warehouse/
Found 1 items
drwxr-xr-x   - hadoopuser supergroup          0 2024-06-17 12:53 /user/hive/warehouse/babynames

In [82]:
val babyNamesHIVE = hiveContext.table("babynames")

babyNamesHIVE: org.apache.spark.sql.DataFrame = [year: int, name: string ... 2 more fields]


In [84]:
babyNamesHIVE.registerTempTable("namesforbaby")

In [85]:
hiveContext.sql("SELECT * FROM namesforbaby").show

+----+-------+--------+---+
|year|   name|    prob|sex|
+----+-------+--------+---+
|1880|   John|0.081541|boy|
|1880|William|0.080511|boy|
|1880|  James|0.050057|boy|
|1880|Charles|0.045167|boy|
|1880| George|0.043292|boy|
|1880|  Frank| 0.02738|boy|
|1880| Joseph|0.022229|boy|
|1880| Thomas|0.021401|boy|
|1880|  Henry|0.020641|boy|
|1880| Robert|0.020404|boy|
|1880| Edward|0.019965|boy|
|1880|  Harry|0.018175|boy|
|1880| Walter|0.014822|boy|
|1880| Arthur|0.013504|boy|
|1880|   Fred|0.013251|boy|
|1880| Albert|0.012609|boy|
|1880| Samuel|0.008648|boy|
|1880|  David|0.007339|boy|
|1880|  Louis|0.006993|boy|
|1880|    Joe|0.006174|boy|
+----+-------+--------+---+
only showing top 20 rows



### Let's use another parquet file

In [110]:
val sampleDF = hiveContext.read.parquet("/spark-files/sample.parquet")

sampleDF: org.apache.spark.sql.DataFrame = [firstName: string, gender: string ... 2 more fields]


In [113]:
sampleDF.show

+---------+------+-----+----+
|firstName|gender|total|year|
+---------+------+-----+----+
|     Mary|     F| 7065|1880|
|     Anna|     F| 2604|1880|
|     Emma|     F| 2003|1880|
|Elizabeth|     F| 1939|1880|
|   Minnie|     F| 1746|1880|
| Margaret|     F| 1578|1880|
|      Ida|     F| 1472|1880|
|    Alice|     F| 1414|1880|
|   Bertha|     F| 1320|1880|
|    Sarah|     F| 1288|1880|
|    Annie|     F| 1258|1880|
|    Clara|     F| 1226|1880|
|     Ella|     F| 1156|1880|
| Florence|     F| 1063|1880|
|     Cora|     F| 1045|1880|
|   Martha|     F| 1040|1880|
|    Laura|     F| 1012|1880|
|   Nellie|     F|  995|1880|
|    Grace|     F|  982|1880|
|   Carrie|     F|  949|1880|
+---------+------+-----+----+
only showing top 20 rows



In [114]:
sampleDF.cache()

res61: sampleDF.type = [firstName: string, gender: string ... 2 more fields]


In [115]:
sampleDF.printSchema

root
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- total: integer (nullable = true)
 |-- year: integer (nullable = true)



In [116]:
val firstName = sampleDF.select("firstname", "year")

firstName: org.apache.spark.sql.DataFrame = [firstname: string, year: int]


In [117]:
firstName.show

+---------+----+
|firstname|year|
+---------+----+
|     Mary|1880|
|     Anna|1880|
|     Emma|1880|
|Elizabeth|1880|
|   Minnie|1880|
| Margaret|1880|
|      Ida|1880|
|    Alice|1880|
|   Bertha|1880|
|    Sarah|1880|
|    Annie|1880|
|    Clara|1880|
|     Ella|1880|
| Florence|1880|
|     Cora|1880|
|   Martha|1880|
|    Laura|1880|
|   Nellie|1880|
|    Grace|1880|
|   Carrie|1880|
+---------+----+
only showing top 20 rows



In [118]:
firstName.count()

res64: Long = 922322


In [119]:
firstName.select("firstname").distinct.count()

res65: Long = 45019


In [128]:
// most popular in 1980...
sampleDF.filter(sampleDF("year") === 1980)
        .orderBy(sampleDF("total").desc, $"firstName")
        .select("firstName")
        .limit(5).show

+-----------+
|  firstName|
+-----------+
|    Michael|
|   Jennifer|
|Christopher|
|      Jason|
|      David|
+-----------+



In [129]:
// or we can use $ for column references
sampleDF.filter($"year" === 1980)
         .orderBy($"total".desc, $"firstName")
         .select("firstName")
         .limit(5)
         .show

+-----------+
|  firstName|
+-----------+
|    Michael|
|   Jennifer|
|Christopher|
|      Jason|
|      David|
+-----------+



In [132]:
// popular names in 1890!!!
// get lower case

val lowerCase = hiveContext.udf.register("lower", (s: String) => s.toLowerCase)

lowerCase: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))


In [134]:
// filter only names in 1890

val names1890 = sampleDF.filter($"year" === 1890)
                               .select($"total".as("total1890"),
                                      $"gender".as("gender1890"),
                                      lower($"firstName")
                                      .as("name1890"))                               

names1890: org.apache.spark.sql.DataFrame = [total1890: int, gender1890: string ... 1 more field]


In [139]:
// let's add popular names in 1880

val names1880 = sampleDF.filter($"year" === 1880)
                               .select($"total".as("total1880"),
                                      $"gender".as("gender1880"),
                                      lower($"firstName")
                                      .as("name1880"))    

names1880: org.apache.spark.sql.DataFrame = [total1880: int, gender1880: string ... 1 more field]


In [144]:
// let's join them

val names1800sJoined = names1890.join(names1880, ($"name1890" === $"name1880")
                                     && ($"gender1890" === $"gender1880"))
                                        .orderBy($"total1890".as("name"),
                                                $"total1880", $"total1890")

names1800sJoined: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [total1890: int, gender1890: string ... 4 more fields]


In [145]:
// let's see what we get
names1800sJoined.show

+---------+----------+--------+---------+----------+--------+
|total1890|gender1890|name1890|total1880|gender1880|name1880|
+---------+----------+--------+---------+----------+--------+
|        5|         M|    rene|        5|         M|    rene|
|        5|         F|  mignon|        5|         F|  mignon|
|        5|         M|  payton|        5|         M|  payton|
|        5|         M|gustavus|        5|         M|gustavus|
|        5|         M|  woodie|        5|         M|  woodie|
|        5|         M|     ely|        5|         M|     ely|
|        5|         M|schuyler|        5|         M|schuyler|
|        5|         M|  samual|        5|         M|  samual|
|        5|         M| unknown|        5|         M| unknown|
|        5|         F|   lella|        5|         F|   lella|
|        5|         M| julious|        5|         M| julious|
|        5|         M| murdock|        5|         M| murdock|
|        5|         M|    nora|        5|         M|    nora|
|       