# Manipulating Data

In this notebook, we'll learn about the `pyspark.sql` module, which provides optimized data queries to your Spark session.

## Imports

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

## Spark Session

In [2]:
spark = (SparkSession
        .builder
        .getOrCreate())

## Display setting

In [3]:
from IPython.core.display import HTML
display(HTML("<style>pre {white-space: pre !important}</style>"))

## Load data
Load data from csv to spark

In [4]:
import os
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

data_path = 'file:///' + os.getcwd() + '/data'

file_path = data_path + '/flights_small.csv'
schema = StructType([
    StructField('year', StringType(), True),
    StructField('month', StringType(), True),
    StructField('day', StringType(), True),
    StructField('dep_time', StringType(), True),
    StructField('dep_delay', IntegerType(), True),
    StructField('arr_time', StringType(), True),
    StructField('arr_delay', StringType(), True),
    StructField('carrier', StringType(), True),
    StructField('tailnum', StringType(), True),
    StructField('flight', StringType(), True),
    StructField('origin', StringType(), True),
    StructField('dest', StringType(), True),
    StructField('air_time', IntegerType(), True),
    StructField('distance', IntegerType(), True),
    StructField('hour', StringType(), True),
    StructField('minute', StringType(), True)
])

flights_df = (spark
        .read
        .option("header", True)
        .schema(schema)
        .csv(file_path))

flights_df.show(5)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
+----+-----+---+--------+---------+-----

## Flights Table

Create a temporary flights table.

In [5]:
flights_df.createOrReplaceTempView("flights")

print(spark.catalog.listTables())

[Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


## Creating Columns

In [6]:
# Create a DataFrame flights
flights = spark.table("flights")

# Add duration_hrs
flights = (
    flights.withColumn("duration_hrs", flights.air_time / 60)
)

# Show the head, notice the duration_hrs column
flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|               2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|              1.85|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|1.3833333333333333|
|2014|    3|  9|     754|  

## Filtering data

In [7]:
# Filter flights by passing a string
long_flights1 = flights.filter("distance > 1000")

# Filter flights by passing a column of boolean values
long_flights2 = flights.filter(flights.distance > 1000)

# Print the data to check they're equal
long_flights1.show()
long_flights2.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0|
|2014|    4| 19|    1236|       -4|    1508|       -7|     AS| N309AS|   490|   SEA| SAN|     135|    1050|  12|    36|              2.25|
|2014|   11| 19|    1812|       -3|    2352|       -4|     AS| N564AS|    26|   SEA| ORD|     198|    1721|  18|    12|               3.3|
|2014|    8|  3|    1120|        0|    1415|        2|     AS| N305AS|   656|   SEA| PHX|     154|    1107|  11|    20| 2.566666666666667|
|2014|   11| 12|    2346|  

## Selecting 

We use the `.select()` method to perform column-wise operations.

In [8]:
# Select the first set of columns
selected1 = flights.select("tailnum", "origin", "dest")

# Select the second set of columns
temp = flights.select(flights.origin, flights.dest, flights.carrier)

# Define first filter
filterA = flights.origin == "SEA"

# Define second filter
filterB = flights.dest == "PDX"

# Filter the data, first by filterA then by filterB
selected2 = temp.filter(filterA).filter(filterB)

In [9]:
flights.select(flights.distance).printSchema()

root
 |-- distance: integer (nullable = true)



## Selecting 

We use the `.select()` method to perform operations and `.alias()` to rename a column.

In [10]:
# Define avg_speed
avg_speed = (
    (flights.distance / (flights.air_time / 60))
    .alias("avg_speed")
)

# Select the correct columns
speed1 = flights.select("origin", "dest", "tailnum", avg_speed)

# Create the same table using a SQL expression
speed2 = (
    flights
    .selectExpr("origin", "dest", 
                "tailnum", "distance/(air_time/60) as avg_speed"))

In [11]:
# Show the first five rows of both the dataframes
speed1.show(5)

speed2.show(5)

+------+----+-------+------------------+
|origin|dest|tailnum|         avg_speed|
+------+----+-------+------------------+
|   SEA| LAX| N846VA| 433.6363636363636|
|   SEA| HNL| N559AS| 446.1666666666667|
|   SEA| SFO| N847VA|367.02702702702703|
|   PDX| SJC| N360SW| 411.3253012048193|
|   SEA| BUR| N612AS| 442.6771653543307|
+------+----+-------+------------------+
only showing top 5 rows

+------+----+-------+------------------+
|origin|dest|tailnum|         avg_speed|
+------+----+-------+------------------+
|   SEA| LAX| N846VA| 433.6363636363636|
|   SEA| HNL| N559AS| 446.1666666666667|
|   SEA| SFO| N847VA|367.02702702702703|
|   PDX| SJC| N360SW| 411.3253012048193|
|   SEA| BUR| N612AS| 442.6771653543307|
+------+----+-------+------------------+
only showing top 5 rows



## Aggregating

All the common aggregation methods, like `.min()`, `.max()` and `.count()` are `GroupedData` methods. These are created by calling the `.groupBy()` DataFrame method.

In [12]:
# Find the shortest flight from PDX in terms of distance
(flights
     .filter(flights.origin == "PDX")
     .groupBy()
     .min("distance")
     .show())


+-------------+
|min(distance)|
+-------------+
|          106|
+-------------+



In [13]:
# Find the longest flight from SEA in terms of air time
(flights
    .filter(flights.origin == "SEA")
    .groupBy()
    .max("air_time")
    .show())

+-------------+
|max(air_time)|
+-------------+
|          409|
+-------------+



## Aggregating II 

In [14]:
# Average duration of Delta flights
(flights
     .filter(flights.carrier == "DL")
     .filter(flights.origin == "SEA")
     .groupBy()
     .avg("air_time")
     .show()
)

+------------------+
|     avg(air_time)|
+------------------+
|188.20689655172413|
+------------------+



In [15]:
# Total hours in the air
(flights
     .withColumn("duration_hrs", flights.air_time/60)
     .groupBy()
     .sum("duration_hrs")
     .show()
)

+------------------+
| sum(duration_hrs)|
+------------------+
|25289.600000000126|
+------------------+



## Grouping and Aggregating I

Part of what makes aggregating so powerful is the addition of groups. PySpark has a whole class devoted to grouped data frames: `pyspark.sql.GroupedData`, which we have seen in the last two sections.

In [16]:
# Group by tailnum
by_plane = flights.groupBy("tailnum")

# Number of flights each plane made
by_plane.count().show()

# Group by origin
by_origin = flights.groupBy("origin")

# Average duration of flights from PDX and SEA
by_origin.avg("air_time").show()

+-------+-----+
|tailnum|count|
+-------+-----+
| N442AS|   38|
| N102UW|    2|
| N36472|    4|
| N38451|    4|
| N73283|    4|
| N513UA|    2|
| N954WN|    5|
| N388DA|    3|
| N567AA|    1|
| N516UA|    2|
| N927DN|    1|
| N8322X|    1|
| N466SW|    1|
|  N6700|    1|
| N607AS|   45|
| N622SW|    4|
| N584AS|   31|
| N914WN|    4|
| N654AW|    2|
| N336NW|    1|
+-------+-----+
only showing top 20 rows

+------+------------------+
|origin|     avg(air_time)|
+------+------------------+
|   SEA| 160.4361496051259|
|   PDX|137.11543248288737|
+------+------------------+



## Grouping and Aggregating II

In addition to the `GroupedData` methods you've already seen, there is also the `.agg()` method. This method lets you pass aggregate column expression that uses any of the aggregate functions from the `pyspark.sql.function` submodule.

In [17]:
import pyspark.sql.functions as F

# Group by month and dest
by_month_dest = flights.groupBy("month", "dest")

# Average departure delay by month and destination
by_month_dest.avg("dep_delay").show()

# Standard deviation of departure delay
by_month_dest.agg(F.stddev("dep_delay")).show()

+-----+----+--------------------+
|month|dest|      avg(dep_delay)|
+-----+----+--------------------+
|   11| TUS| -2.3333333333333335|
|   11| ANC|   7.529411764705882|
|    1| BUR|               -1.45|
|    1| PDX| -5.6923076923076925|
|    6| SBA|                -2.5|
|    5| LAX|-0.15789473684210525|
|   10| DTW|                 2.6|
|    6| SIT|                -1.0|
|   10| DFW|  18.176470588235293|
|    3| FAI|                -2.2|
|   10| SEA|                -0.8|
|    2| TUS| -0.6666666666666666|
|   12| OGG|  25.181818181818183|
|    9| DFW|   4.066666666666666|
|    5| EWR|               14.25|
|    3| RDM|                -6.2|
|    8| DCA|                 2.6|
|    7| ATL|   4.675675675675675|
|    4| JFK| 0.07142857142857142|
|   10| SNA| -1.1333333333333333|
+-----+----+--------------------+
only showing top 20 rows

+-----+----+----------------------+
|month|dest|stddev_samp(dep_delay)|
+-----+----+----------------------+
|   11| TUS|    3.0550504633038935|
|   11| ANC|  

## Joining

In PySpark, joins are performed using the DataFrame method `.join()`. This method takes three arguments. The first is the second DataFrame that you want to join in with the first one.The second argument, on, is the name of the key column(s) as a string. The names of the key column(s) must be the same in each table. The third argument, how, specifies the kind of join to perform.

## Load the airports data

In [18]:
file_path = data_path + '/airports.csv'

airports_df = (spark
        .read
        .option("header", True)
        .csv(file_path))

# Examine the data
airports_df.show(5)

+---+--------------------+----------+-----------+----+---+---+
|faa|                name|       lat|        lon| alt| tz|dst|
+---+--------------------+----------+-----------+----+---+---+
|04G|   Lansdowne Airport|41.1304722|-80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|32.4605722|-85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|41.9893408|-88.1012428| 801| -6|  A|
|06N|     Randall Airport| 41.431912|-74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|31.0744722|-81.4277778|  11| -4|  A|
+---+--------------------+----------+-----------+----+---+---+
only showing top 5 rows



In [19]:
# Rename the faa column to dest
airports_df = airports_df.withColumnRenamed("faa", "dest")

In [20]:
# Join the DataFrames 
flights_with_airports_df = (
    flights_df.join(airports_df, on="dest", how="leftouter")
)

In [21]:
# Examine the new DataFrame
flights_with_airports_df.show(5)

+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+--------------------+---------+-----------+---+---+---+
|dest|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|air_time|distance|hour|minute|                name|      lat|        lon|alt| tz|dst|
+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+--------------------+---------+-----------+---+---+---+
| LAX|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA|     132|     954|   6|    58|    Los Angeles Intl|33.942536|-118.408075|126| -8|  A|
| HNL|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA|     360|    2677|  10|    40|       Honolulu Intl|21.318681|-157.922428| 13|-10|  N|
| SFO|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA|     111|     679|  14|    43|  San 