In [0]:
from pyspark.sql.functions import collect_set, round, avg, col, weekofyear, min
from pyspark.sql.window import Window


(5 marks)The file mpg.csv contains data about various automobiles
Load this file into a dataframe. Group by the manufacturer and use a
collection method to add a column which shows all unique models that
manufacturer has.

In [0]:
mpg = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("dbfs:/FileStore/mpg.csv")

unique_models = mpg.groupBy("manufacturer").agg(collect_set("model").alias("unique_models"))
unique_models.show(truncate=False)

+------------+--------------------------------------------------------------------------------------+
|manufacturer|unique_models                                                                         |
+------------+--------------------------------------------------------------------------------------+
|land rover  |[range rover]                                                                         |
|pontiac     |[grand prix]                                                                          |
|toyota      |[4runner 4wd, camry solara, corolla, land cruiser wagon 4wd, toyota tacoma 4wd, camry]|
|lincoln     |[navigator 2wd]                                                                       |
|audi        |[a4, a6 quattro, a4 quattro]                                                          |
|jeep        |[grand cherokee 4wd]                                                                  |
|dodge       |[durango 4wd, ram 1500 pickup 4wd, dakota pickup 4wd, caravan 2wd]  

(5 marks) For the dataframe loaded from the mpg.csv file, generate a
dataframe which is grouped by the year and cylinder (cyl) columns. Use the
pivot method to add columns for the average city mpg(cty) for each
manufacturer. Round the average city mpg to 1 decimal place to make the
output easier to read.

In [0]:
mpg_pivot = mpg.groupBy("year", "cyl").pivot("manufacturer").agg(round(avg("cty"), 1))
mpg_pivot.show(truncate=False)

+----+---+----+---------+-----+----+-----+-------+----+----------+-------+-------+------+-------+------+------+----------+
|year|cyl|audi|chevrolet|dodge|ford|honda|hyundai|jeep|land rover|lincoln|mercury|nissan|pontiac|subaru|toyota|volkswagen|
+----+---+----+---------+-----+----+-----+-------+----+----------+-------+-------+------+-------+------+------+----------+
|2008|8  |16.0|13.6     |11.9 |13.6|null |null   |11.8|12.0      |12.0   |13.0   |12.0  |16.0   |null  |13.5  |null      |
|1999|4  |18.3|19.0     |18.0 |null|24.8 |18.5   |null|null      |null   |null   |20.0  |null   |19.0  |20.0  |23.3      |
|1999|6  |16.2|18.0     |14.9 |15.3|null |18.0   |15.0|null      |null   |14.0   |16.5  |17.0   |null  |16.5  |16.8      |
|2008|6  |16.8|17.5     |15.1 |15.3|null |17.3   |16.0|null      |null   |13.0   |17.8  |18.0   |null  |16.8  |17.0      |
|2008|5  |null|null     |null |null|null |null   |null|null      |null   |null   |null  |null   |null  |null  |20.5      |
|2008|4  |20.0|2

(5 marks) The file flight-summary.csv contains information about flights
between various locations. The file airports.csv contains additional
information about each airport, some of which is not in the flight-
summary.csv file.
Create a dataframe which contains the origin_code, origin_airport, latitude
(for origin airport), longitude (for origin airport), dest_code, dest_airport,
latitude (for destination airport), longitude (for destination airport) for every
flight which originates in the state of Texas (TX).
Which type of join technique ( Shuffle or Broadcast) did Spark likely use for
this exercise? Why?

In [0]:
flights =  spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("dbfs:/FileStore/flight_summary.csv")

airports =  spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("dbfs:/FileStore/airports.csv")

flights_tx = flights.filter(col("origin_state") == "TX")

joined_df = flights_tx.join(airports.alias("orig"), flights_tx["origin_code"] == col("orig.IATA_CODE"), "left") \
    .join(airports.alias("dest"), flights_tx["dest_code"] == col("dest.IATA_CODE"), "left") \
    .select(flights_tx["origin_code"], 
        col("orig.AIRPORT").alias("origin_airport"), col("orig.LATITUDE").alias("origin_lat"), col("orig.LONGITUDE").alias("origin_long"),
        flights_tx["dest_code"], 
        col("dest.AIRPORT").alias("dest_airport"), col("dest.LATITUDE").alias("dest_lat"), col("dest.LONGITUDE").alias("dest_long")
)

joined_df.show()

# Spark is likely using a Broadcast Join since the airports dataset is small, as broadcasting helps avoid costly shuffles.

+-----------+--------------------+----------+-----------+---------+--------------------+--------+----------+
|origin_code|      origin_airport|origin_lat|origin_long|dest_code|        dest_airport|dest_lat| dest_long|
+-----------+--------------------+----------+-----------+---------+--------------------+--------+----------+
|        LBB|Lubbock Preston S...|  33.66364| -101.82278|      DEN|Denver Internatio...|39.85841|  -104.667|
|        AUS|Austin-Bergstrom ...|  30.19453|  -97.66987|      ELP|El Paso Internati...|31.80667|-106.37781|
|        HOU|William P. Hobby ...|  29.64542|  -95.27889|      PDX|Portland Internat...|45.58872| -122.5975|
|        DFW|Dallas/Fort Worth...|  32.89595|   -97.0372|      PNS|Pensacola Interna...|30.47331| -87.18744|
|        DFW|Dallas/Fort Worth...|  32.89595|   -97.0372|      SDF|Louisville Intern...|38.17439|   -85.736|
|        BPT|Jack Brooks Regio...|  29.95083|  -94.02069|      DFW|Dallas/Fort Worth...|32.89595|  -97.0372|
|        DFW|Dallas

(5 marks) The file aapl-2017.csv contains information about daily apple stock
prices. Calculate the weekly average price using a window function inside
the groupBy transformation. Order the result by the start time.

In [0]:
stocks =  spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("dbfs:/FileStore/aapl_2017.csv")

aapl_weekly = stocks.withColumn("week", weekofyear("Date"))

weekly_window = Window.partitionBy("week")

aweekly_avg = aapl_weekly.withColumn("weekly_avg_price", avg("Close").over(weekly_window))

aapl_weekly_start = aapl_weekly.groupBy("week") \
    .agg(min("Date").alias("start_date"), avg("Close").alias("weekly_avg_price")) \
    .orderBy("start_date")

aapl_weekly_start.show(10, truncate=False)

+----+----------+------------------+
|week|start_date|weekly_avg_price  |
+----+----------+------------------+
|1   |2017-01-03|116.67250100000001|
|2   |2017-01-09|119.228           |
|3   |2017-01-17|119.94249925      |
|4   |2017-01-23|121.16399980000001|
|5   |2017-01-30|125.86799920000001|
|6   |2017-02-06|131.6799956       |
|7   |2017-02-13|134.97799980000002|
|8   |2017-02-21|136.75000025      |
|9   |2017-02-27|138.4899994       |
|10  |2017-03-06|139.1359984       |
+----+----------+------------------+
only showing top 10 rows

