The first step in using Spark is to connecting to a `cluster`. In practice, the cluster is hosted on a remote machine that's connected to all other nodes. To create a connection we need to create an instance of `SparkContext` class. The class constructor takes in few optional arguments that allows to specify the attributes of the cluster. An object holding these attributes can be created using `SparkConf` class.

### Examining the Spark Context

here `SparkContext` object `sc` is already loaded into the workspace

In [1]:
from pyspark import SparkConf, SparkContext

configure = SparkConf().setAppName("example-app").setMaster("local")
sc = SparkContext(conf=configure)

In [2]:
print(sc)
print(sc.version)

<SparkContext master=local appName=example-app>
2.4.0


In [3]:
flights = sc.parallelize('flights_small.csv')

Spark's core data structure is `Resilient Distributed Dataset` (RDD). This is a low level object that let's Spark work its magic by splitting data across multiple nodes in the cluster. Spark DataFrame behaves lot like SQL table. To start working on Spark DataFrames we have to create `SparkSession` object from `SparkContext` object

`SparkContext` is like to connection to the cluster and `SparkSession` is like an interface with that connection

### Creating a SparkSession

In [4]:
from pyspark.sql import SparkSession

my_spark = SparkSession.builder.getOrCreate()
print(my_spark)

<pyspark.sql.session.SparkSession object at 0x000002C8DBCDE320>


In [5]:
my_spark.sparkContext.getConf().getAll()

[('spark.master', 'local'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.port', '57753'),
 ('spark.app.id', 'local-1554348793959'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', 'DESKTOP-K2G4QJD'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'example-app')]

### Loading Data

In [6]:
flights = my_spark.read.csv("flights_small.csv", header=True)

In [7]:
flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

### Converting Spark DataFrame to Pandas DataFrame

In [8]:
flights_pd = flights.toPandas()

In [9]:
flights_pd.head()

Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,hour,minute
0,2014,12,8,658,-7,935,-5,VX,N846VA,1780,SEA,LAX,132,954,6,58
1,2014,1,22,1040,5,1505,5,AS,N559AS,851,SEA,HNL,360,2677,10,40
2,2014,3,9,1443,-2,1652,2,VX,N847VA,755,SEA,SFO,111,679,14,43
3,2014,4,9,1705,45,1839,34,WN,N360SW,344,PDX,SJC,83,569,17,5
4,2014,3,9,754,-1,1015,1,AS,N612AS,522,SEA,BUR,127,937,7,54


### Converting Pandas DataFrame to Spark DataFrame

In [10]:
spark_flights = my_spark.createDataFrame(flights_pd)

now we will register the Spark DataFrame into a table in the Spark catalog

In [11]:
spark_flights.createOrReplaceTempView("spark_flights")

### Viewing tables

In [12]:
my_spark.catalog.listTables()

[Table(name='spark_flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In the above tables, we can run `SQL query` to fetch the data. For example - 

In [13]:
query = "FROM spark_flights SELECT * LIMIT 10"
my_flights = my_spark.sql(query)

### Creating columns

In [14]:
flights = flights.withColumn("duration_hrs", flights.air_time / 60)

In [15]:
flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|               2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|              1.85|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|1.3833333333333333|
|2014|    3|  9|     754|  

### Filtering Data

In Spark DataFrames, we can filter the data using two ways - we can pass a string as we pass `where` clause of a SQL quiery and secondly we can also a list of booleans to filter the data in Spark

In [16]:
flights.filter("distance > 1000").show(3)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|         6.0|
|2014|    4| 19|    1236|       -4|    1508|       -7|     AS| N309AS|   490|   SEA| SAN|     135|    1050|  12|    36|        2.25|
|2014|   11| 19|    1812|       -3|    2352|       -4|     AS| N564AS|    26|   SEA| ORD|     198|    1721|  18|    12|         3.3|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+
only showing top 3 rows



In [17]:
flights.filter(flights["distance"] > 1000).show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0|
|2014|    4| 19|    1236|       -4|    1508|       -7|     AS| N309AS|   490|   SEA| SAN|     135|    1050|  12|    36|              2.25|
|2014|   11| 19|    1812|       -3|    2352|       -4|     AS| N564AS|    26|   SEA| ORD|     198|    1721|  18|    12|               3.3|
|2014|    8|  3|    1120|        0|    1415|        2|     AS| N305AS|   656|   SEA| PHX|     154|    1107|  11|    20| 2.566666666666667|
|2014|   11| 12|    2346|  

### Selecting columns

We can also select the data using two ways - first, we can pass the columns we want to select as strings. Second method is to pass the columns as `df.colName` so that we can also do some data manipulations if we want

In [18]:
flights.select("tailnum", "origin", "dest").show(3)

+-------+------+----+
|tailnum|origin|dest|
+-------+------+----+
| N846VA|   SEA| LAX|
| N559AS|   SEA| HNL|
| N847VA|   SEA| SFO|
+-------+------+----+
only showing top 3 rows



In [19]:
flights.select(flights['distance'], flights['hour'], (flights['distance'] / flights['hour']).alias('speed')).show(3)

+--------+----+-----+
|distance|hour|speed|
+--------+----+-----+
|     954|   6|159.0|
|    2677|  10|267.7|
|     679|  14| 48.5|
+--------+----+-----+
only showing top 3 rows



We can also use the method `selectExpr` to write SQL like queries

In [21]:
flights.selectExpr("distance", "hour", "distance / hour as speed").show(3)

+--------+----+-----+
|distance|hour|speed|
+--------+----+-----+
|     954|   6|159.0|
|    2677|  10|267.7|
|     679|  14| 48.5|
+--------+----+-----+
only showing top 3 rows



### Data Aggregation

to apply common aggregation methods like `min`, `max`, `count` etc we need to call them using `.groupBy` method of the DataFrame

In [25]:
new_flights = flights.selectExpr("origin", "cast(distance as int) distance", "cast(hour as int) hour", 
                                 "cast(air_time as int) air_time")

In [29]:
# finding the shortest flight from PDX in terms of distance
new_flights.filter(new_flights["origin"] == "PDX").groupBy().min("air_time").show()

+-------------+
|min(air_time)|
+-------------+
|           24|
+-------------+



In [30]:
# finding the longest flight from SEA in terms of duration
new_flights.filter(new_flights["origin"] == "SEA").groupBy().max("air_time").show()

+-------------+
|max(air_time)|
+-------------+
|          409|
+-------------+



In [31]:
# otal hours in the air
new_flights.withColumn("duration_hrs", new_flights.air_time/60).groupBy().sum("duration_hrs").show()

+------------------+
| sum(duration_hrs)|
+------------------+
|25289.600000000126|
+------------------+



### Grouping and Aggregating

In [32]:
# Group by tailnum
by_plane = flights.groupBy("tailnum")

# Number of flights each plane made
by_plane.count().show()

+-------+-----+
|tailnum|count|
+-------+-----+
| N442AS|   38|
| N102UW|    2|
| N36472|    4|
| N38451|    4|
| N73283|    4|
| N513UA|    2|
| N954WN|    5|
| N388DA|    3|
| N567AA|    1|
| N516UA|    2|
| N927DN|    1|
| N8322X|    1|
| N466SW|    1|
|  N6700|    1|
| N607AS|   45|
| N622SW|    4|
| N584AS|   31|
| N914WN|    4|
| N654AW|    2|
| N336NW|    1|
+-------+-----+
only showing top 20 rows



In [34]:
# Group by origin
by_origin = new_flights.groupBy("origin")

# Average duration of flights from PDX and SEA
by_origin.avg("air_time").show()

+------+------------------+
|origin|     avg(air_time)|
+------+------------------+
|   SEA| 160.4361496051259|
|   PDX|137.11543248288737|
+------+------------------+



In addition to the `GroupedData` methods like **min**, **max**, **count** etc, there is also `.agg()` method which takes an aggregate column expression that uses any of the aggregate functions from `pyspark.sql.functions` submodule

In [43]:
from pyspark.sql.types import IntegerType
flights = flights.withColumn("dep_delay", flights["dep_delay"].cast(IntegerType()))

In [44]:
# Import pyspark.sql.functions as F
import pyspark.sql.functions as F

# Group by month and dest
by_month_dest = flights.groupBy("month", "dest")

# Average departure delay by month and destination
by_month_dest.avg("dep_delay").show()

# Standard deviation
by_month_dest.agg(F.stddev("dep_delay")).show()

+-----+----+--------------------+
|month|dest|      avg(dep_delay)|
+-----+----+--------------------+
|   11| TUS| -2.3333333333333335|
|   11| ANC|   7.529411764705882|
|    1| BUR|               -1.45|
|    1| PDX| -5.6923076923076925|
|    6| SBA|                -2.5|
|    5| LAX|-0.15789473684210525|
|   10| DTW|                 2.6|
|    6| SIT|                -1.0|
|   10| DFW|  18.176470588235293|
|    3| FAI|                -2.2|
|   10| SEA|                -0.8|
|    2| TUS| -0.6666666666666666|
|   12| OGG|  25.181818181818183|
|    9| DFW|   4.066666666666666|
|    5| EWR|               14.25|
|    3| RDM|                -6.2|
|    8| DCA|                 2.6|
|    7| ATL|   4.675675675675675|
|    4| JFK| 0.07142857142857142|
|   10| SNA| -1.1333333333333333|
+-----+----+--------------------+
only showing top 20 rows

+-----+----+----------------------+
|month|dest|stddev_samp(dep_delay)|
+-----+----+----------------------+
|   11| TUS|    3.0550504633038935|
|   11| ANC|  

### Joining

In PySpark, joins can be performed by using the DataFrame method `.join()`. This method takes in three arguments - 
<ol>
    <li>**dataset name**: second dataframe that we want to join to our parent dataframe</li>
    <li>**on**: name of the key columns as a string</li>
    <li>**how**: specifies the kind of join to perform</li>
</ol>

In [45]:
airports = my_spark.read.csv("airports.csv", header=True)
airports.show()

+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford County Ai...

In [46]:
airports = airports.withColumnRenamed("faa", "dest")

In [47]:
flights_with_airports = flights.join(airports, "dest", "leftouter")
flights_with_airports.show()

+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------------+--------------------+---------+-----------+----+---+---+
|dest|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|air_time|distance|hour|minute|      duration_hrs|                name|      lat|        lon| alt| tz|dst|
+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+--------+----+------+------------------+--------------------+---------+-----------+----+---+---+
| LAX|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA|     132|     954|   6|    58|               2.2|    Los Angeles Intl|33.942536|-118.408075| 126| -8|  A|
| HNL|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA|     360|    2677|  10|    40|               6.0|       Honolulu Intl|21.318681|-157.922428|  13|-10|  N|
| SFO|2014|    3|  9|    

In [48]:
my_spark