In [1]:
## Exploring Data w/ Spark DataFrames API & Spark SQL

# Import/load packages to be used
 # To explore data, must load it into programmatic data object such as DataFrame.
    # If structure of data is known ahead of time, can explicitly specify schema for DataFrame.

from pyspark.sql.types import *

flightSchema = StructType([
  StructField("DayofMonth", IntegerType(), False),
  StructField("DayOfWeek", IntegerType(), False),
  StructField("Carrier", StringType(), False),
  StructField("OriginAirportID", IntegerType(), False),
  StructField("DestAirportID", IntegerType(), False),
  StructField("DepDelay", IntegerType(), False),
  StructField("ArrDelay", IntegerType(), False),
])

# import data records w/ flight details

flights = spark.read.csv('wasb:///data/raw-flight-data.csv', schema=flightSchema, header=True)
flights.show()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1543259570503_0006,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.
+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
|        19|        5|     DL|          10397|        15016|      -1|     -19|
|        19|        5|     DL|          15016|        10397|       0|      -1|
|        19|        5|     DL|          10397|        14869|      15|      24|
|        19|        5|     DL|          10397|        10423|      33|      34|
|        19|     

In [2]:
# Infer Data Schema
 # If structure of data source is unknown, can have Spark automatically infer schema.
    # Example- Load data about airports w/out knowing schema
    
airports = spark.read.csv('wasb:///data/airports.csv', header=True, inferSchema=True)
airports.show()

+----------+-----------+-----+--------------------+
|airport_id|       city|state|                name|
+----------+-----------+-----+--------------------+
|     10165|Adak Island|   AK|                Adak|
|     10299|  Anchorage|   AK|Ted Stevens Ancho...|
|     10304|      Aniak|   AK|       Aniak Airport|
|     10754|     Barrow|   AK|Wiley Post/Will R...|
|     10551|     Bethel|   AK|      Bethel Airport|
|     10926|    Cordova|   AK|Merle K Mudhole S...|
|     14709|  Deadhorse|   AK|   Deadhorse Airport|
|     11336| Dillingham|   AK|  Dillingham Airport|
|     11630|  Fairbanks|   AK|Fairbanks Interna...|
|     11997|   Gustavus|   AK|    Gustavus Airport|
|     12523|     Juneau|   AK|Juneau International|
|     12819|  Ketchikan|   AK|Ketchikan Interna...|
|     10245|King Salmon|   AK| King Salmon Airport|
|     10170|     Kodiak|   AK|      Kodiak Airport|
|     13970|   Kotzebue|   AK| Ralph Wien Memorial|
|     13873|       Nome|   AK|        Nome Airport|
|     14256|

In [3]:
# Use DataFrame Methods
 # Spark DataFrames provide functions used to extract & manipulate data.
    # Example- Use SELECT function to return a new DataFrame containing columns selected from an existing DataFrame.
    
cities = airports.select("city", "name")
cities.show()

+-----------+--------------------+
|       city|                name|
+-----------+--------------------+
|Adak Island|                Adak|
|  Anchorage|Ted Stevens Ancho...|
|      Aniak|       Aniak Airport|
|     Barrow|Wiley Post/Will R...|
|     Bethel|      Bethel Airport|
|    Cordova|Merle K Mudhole S...|
|  Deadhorse|   Deadhorse Airport|
| Dillingham|  Dillingham Airport|
|  Fairbanks|Fairbanks Interna...|
|   Gustavus|    Gustavus Airport|
|     Juneau|Juneau International|
|  Ketchikan|Ketchikan Interna...|
|King Salmon| King Salmon Airport|
|     Kodiak|      Kodiak Airport|
|   Kotzebue| Ralph Wien Memorial|
|       Nome|        Nome Airport|
| Petersburg|Petersburg James ...|
|      Sitka|Sitka Rocky Gutie...|
| St. Mary's|  St. Mary's Airport|
|   Unalaska|    Unalaska Airport|
+-----------+--------------------+
only showing top 20 rows

In [4]:
# Combine Operations
 # Can combine functions in a single statement to perform multiple operations on DataFrame.
    # Use join function to combine flight & airports DataFrames.
     # Then use groupBy & count functions to return number of flights from each airport.

flightsByOrigin = flights.join(airports, flights.OriginAirportID == airports.airport_id).groupBy("City").count()
flightsByOrigin.show()

+-----------------+------+
|             City| count|
+-----------------+------+
|          Phoenix| 90281|
|            Omaha| 13537|
|   Raleigh/Durham| 28436|
|        Anchorage|  7777|
|           Dallas| 19503|
|          Oakland| 25503|
|      San Antonio| 23090|
|     Philadelphia| 47659|
|       Louisville| 10953|
|Dallas/Fort Worth|105024|
|      Los Angeles|118684|
|       Sacramento| 25193|
|     Indianapolis| 18099|
|        Cleveland| 25261|
|        San Diego| 45783|
|    San Francisco| 84675|
|        Nashville| 34927|
|    Oklahoma City| 13967|
|          Detroit| 62879|
|         Portland| 30640|
+-----------------+------+
only showing top 20 rows

In [5]:
# Count Rows in DataFrame
 # Exploring data is key task when building predictive solutions
    # Determining statistics that will help understand data before building predictive models.

flights.count()

2719418

In [6]:
# Determine Summary Statistics
 # Use describe function to return DataFrame containing count, mean, std dev, min, & max values for each numeric column.

flights.describe().show()

+-------+-----------------+------------------+-------+------------------+------------------+-----------------+-----------------+
|summary|       DayofMonth|         DayOfWeek|Carrier|   OriginAirportID|     DestAirportID|         DepDelay|         ArrDelay|
+-------+-----------------+------------------+-------+------------------+------------------+-----------------+-----------------+
|  count|          2719418|           2719418|2719418|           2719418|           2719418|          2691974|          2690385|
|   mean|15.79747468024408|3.8983907586108497|   null| 12742.26441172339|12742.455345592329|10.53686662649788| 6.63768791455498|
| stddev|8.799860168985367|1.9859881390373617|   null|1501.9729397025808|1501.9692528927785|36.09952806643081|38.64881489390021|
|    min|                1|                 1|     9E|             10140|             10140|              -63|              -94|
|    max|               31|                 7|     YV|             15376|             15376|     

In [7]:
# Determine Presence of Duplicates
 # Clean data - detect & remove duplicates that could affect model
    # Use dropDuplicates function to create new DataFrame w/ duplicates removed.
     # Subtracting dropped from original values determines how many rows are duplicates of other rows.

    
flights.count() - flights.dropDuplicates().count()

22435

In [8]:
# Identify Missing Values
 # Either remove rows containing missing data or replace missing values w/ suitable replacement.
    # Use dropna function to create DataFrame w/ any rows containing data removed
     # Can specify subset of columns & whether row should be removed in 'any' or 'all' values are missing.
        # Use new DataFrame to determine how many rows contain missing values.

flights.count() - flights.dropDuplicates().dropna(how="any", subset=["ArrDelay", "DepDelay"]).count()

46233

In [9]:
# Clean Data
 # Data contains both duplicates & missing values
    # Use fillna function to replace missing values w/ specified replacement value.
      # Replace missing ArrDelay & DepDelay values w/ 0, assuming there was no/zero delay.
    # Remove duplicate rows

data = flights.dropDuplicates().fillna(value = 0, subset=["ArrDelay", "DepDelay"])
data.count()

2696983

In [10]:
# Re-check Summary Statistics
 # After removing duplicates & replacing missing values, distribution of data may have changed.
    # This in turn could affect any predictive models created.

data.describe().show()

+-------+------------------+-----------------+-------+------------------+------------------+------------------+------------------+
|summary|        DayofMonth|        DayOfWeek|Carrier|   OriginAirportID|     DestAirportID|          DepDelay|          ArrDelay|
+-------+------------------+-----------------+-------+------------------+------------------+------------------+------------------+
|  count|           2696983|          2696983|2696983|           2696983|           2696983|           2696983|           2696983|
|   mean|15.798996508320593|3.900369412784582|   null|12742.459424846207| 12742.85937657004|10.531134234068217|6.6679285705545785|
| stddev|  8.80126719913545|1.986458242170198|   null|1502.0359941370625|1501.9939589817984| 36.06172819056572|38.583861473580725|
|    min|                 1|                1|     9E|             10140|             10140|               -63|               -94|
|    max|                31|                7|     YV|             15376|          

In [11]:
# Explore Relationships in Data
 # Use corr function to calculate a correlation value b/w -1 & 1, indicating the strength of correlation b/w two fields.
    # Strong positive correlation (near 1) indicates that high values for one column are often found w/ high values of the other.
    # Strong negative correlation (near -1) indicates that low values for one column are often found w/ high values for the other.
    # Correlation near 0 indicates little apparent relationship b/w fields.

data.corr("DepDelay", "ArrDelay")

0.9392630367706964

In [12]:
# Use Spark SQL
 # Can use DataFrame API directly to query data
    # Can persist DataFrames as table & use Spark SQL to query them using SQL language.
    # SQL is often more intuitive to use when querying tabular data structures.

data.createOrReplaceTempView("flightData")
spark.sql("SELECT DayOfWeek, AVG(ArrDelay) AS AvgDelay FROM flightData GROUP BY DayOfWeek ORDER BY DayOfWeek").show()

+---------+------------------+
|DayOfWeek|          AvgDelay|
+---------+------------------+
|        1| 7.077989660973244|
|        2|  4.39237404158651|
|        3| 7.234625279280266|
|        4|10.775574715480056|
|        5|  8.71110560964396|
|        6|2.1437428120738304|
|        7|  5.25403935972552|
+---------+------------------+

In [14]:
# Use Inline SQL Magic
 # 'Magics' w/in Juptyer Notebooks enable inclusion of inline code & functionality.
    # %%sql enables SQL queries & visualize results directly in notebook.

%load_ext sql

%%sql
SELECT DepDelay, ArrDelay FROM flightData

# Can change 'Table' visualization of results to 'Scatter' visualization to better see relationship b/w DepDelay & ArrDelay values.
 # Positive correlation b/w DepDelay & ArrDelay seems to be linear relationship, creating diagonal line structure of plottede points. 

invalid syntax (<stdin>, line 1)
  File "<stdin>", line 1
    SELECT DepDelay, ArrDelay FROM flightData
                  ^
SyntaxError: invalid syntax



In [None]:
# Querying Multiple Tables
 # Can create & query multiple temporary tables.
    # Create a temporary table from 'airports' DataFrame.
     # Use inline query to query it together w/ flights data.
    
airports.createOrReplaceTempView("airportData")

%%sql
SELECT a.name, AVG(f.ArrDelay) AS avgdelay
FROM flightData AS f JOIN airportData AS a
ON f.DestAirportID = a.airport_id
GROUP BY a.name

In [None]:
# Change above visualization to 'Bar' chart o see average lateness (or earliness) of flights at all destinations.