In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 53.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845513 sha256=9d463f4570bee605835e550132b53382170ca0cc3e02ae2e2902439184c27c0a
  Stored in directory: /root/.cache/pip/wheels/42/59/f5/79a5bf931714dcd201b26025347785f087370a10a3329a899c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


    PySpark let us do cluster/parallel computing. Particularly important for big data analytics and science.
    Data is split up and shared to nodes/workers within a cluster by the cluster manager/master. 
    Calculations are done by each node/worker with the data it is given.
    The first step in using Spark is connecting to a cluster. Creating a connection is as simple as creating an instance of the SparkContext class. 
    The class constructor takes a few optional arguments that allow you to specify the attributes of the cluster you're connecting to. 
    An object holding all these attributes can be created with the SparkConf() constructor.

In [None]:
import pandas as pd

In [None]:
from pyspark import SparkContext

# Make a connection to Spark clusters
sc = SparkContext()

# print Spark cluster object and check the version
sc, sc.version

(<SparkContext master=local[*] appName=pyspark-shell>, '3.3.1')

    Spark's core data structure is the Resilient Distributed Dataset (RDD). It is used to split the dataset. 
    However, the Spark DataFrame are better to use because they are optimized for complicated operations than RDDs.
    To create a dataframe, we instantiate a SparkSession object.

    What if you're not sure there already is one? 
    Creating multiple SparkSessions and SparkContexts can cause issues, so it's best practice to use the SparkSession.builder.getOrCreate() method. 
    This returns an existing SparkSession if there's already one in the environment, or creates a new one if necessary!

In [None]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession, HiveContext

# Create my_spark
my_spark = SparkSession.builder.enableHiveSupport().getOrCreate()

# Print my_spark
print(my_spark)

<pyspark.sql.session.SparkSession object at 0x7ff558252b10>


    SparkSession has an attribute called catalog which lists all the data inside the cluster.
    To extract information from the catalog, we use the .listTables() method (returns the name of all tables in the cluster as a list).

In [None]:
# Print the tables in the catalog
print(my_spark.catalog.listTables())

[]


In [None]:
# load files directly from local drive
my_table = my_spark.read.csv('/content/sample_data/california_housing_train.csv', header=True, inferSchema=True)
my_table

DataFrame[longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double]

In [None]:
# load data from url
url = 'https://assets.datacamp.com/production/repositories/1237/datasets/6e5c4ac2a4799338ba7e13d54ce1fa918da644ba/airports.csv'

# import library for url download
from pyspark import SparkFiles

# instantiate the file in the session
my_spark.sparkContext.addFile(url)

new_table = my_spark.read.csv('file://'+SparkFiles.get('airports.csv'), header=True, inferSchema=True)
new_table

DataFrame[faa: string, name: string, lat: double, lon: double, alt: int, tz: int, dst: string]

In [None]:
# show dataframe schema
new_table.printSchema()

root
 |-- faa: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- tz: integer (nullable = true)
 |-- dst: string (nullable = true)



In [None]:
# show my_table content
my_table.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

In [None]:
# create temporary table from dataframes
my_table.createOrReplaceTempView('my_temptable')
new_table.createOrReplaceTempView('my_tempnewtable')

In [None]:
# Print the tables in the catalog
my_spark.catalog.listTables()

[Table(name='my_tempnewtable', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='my_temptable', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [None]:
# Don't change this query
query = "SELECT * FROM my_tempnewtable LIMIT 5"

# Get the first 10 rows of flights
airports = my_spark.sql(query)

airports.show()

+---+--------------------+----------+-----------+----+---+---+
|faa|                name|       lat|        lon| alt| tz|dst|
+---+--------------------+----------+-----------+----+---+---+
|04G|   Lansdowne Airport|41.1304722|-80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|32.4605722|-85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|41.9893408|-88.1012428| 801| -6|  A|
|06N|     Randall Airport| 41.431912|-74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|31.0744722|-81.4277778|  11| -4|  A|
+---+--------------------+----------+-----------+----+---+---+



In [None]:
# save dataframe to warehouse
new_table.write.mode('overwrite').csv('/content/spark-warehouse/Airports.csv')

In [None]:
# show original dataframe content
new_table.show()

+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford County Ai...

In [None]:
# read from warehouse and show saved file content
my_spark.read.csv('/content/spark-warehouse/Airports.csv').show()

+---+--------------------+----------------+-----------------+----+---+---+
|_c0|                 _c1|             _c2|              _c3| _c4|_c5|_c6|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford County Ai...

In [None]:
# load data from url
url = 'https://assets.datacamp.com/production/repositories/1237/datasets/fa47bb54e83abd422831cbd4f441bd30fd18bd15/flights_small.csv'

# instantiate the file in the session
my_spark.sparkContext.addFile(url)

new_table1 = my_spark.read.csv('file://'+SparkFiles.get('flights_small.csv'), header=True, inferSchema=True)
new_table1

DataFrame[year: int, month: int, day: int, dep_time: string, dep_delay: string, arr_time: string, arr_delay: string, carrier: string, tailnum: string, flight: int, origin: string, dest: string, air_time: string, distance: int, hour: string, minute: string]

In [None]:
# instantiate new_table1 as table in spark
my_spark.catalog.dropTempView('test_table')
new_table1.createTempView('test_table')

In [None]:
my_spark.catalog.listTables()

[Table(name='my_tempnewtable', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='my_temptable', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='test_table', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [None]:
new_table1.write.mode('overwrite').saveAsTable('ttest_table')

In [None]:
my_spark.sql('select * from test_table').show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

In [None]:
my_spark.sql('drop table if exists ttest_table')

DataFrame[]

In [None]:
my_spark.sql('drop table if exists test_table') # if catalog has 2 tables with same name but 
# different table types, pyspark will always drop temporary tables first.

DataFrame[]

In [None]:
my_spark.catalog.listTables()

[Table(name='my_tempnewtable', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='my_temptable', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [None]:
# save temporary table to a permanent table.
my_spark.sql('select * from my_temptable').write.mode('overwrite').saveAsTable('test_table')

In [None]:
# check spark catalog
my_spark.catalog.listTables()

[Table(name='test_table', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='my_tempnewtable', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='my_temptable', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [None]:
# convert to pandas dataframe
df = my_spark.sql('select * from my_temptable').toPandas()
df

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value
0,-114.31,34.19,15.0,5612.0,1283.0,1015.0,472.0,1.4936,66900.0
1,-114.47,34.40,19.0,7650.0,1901.0,1129.0,463.0,1.8200,80100.0
2,-114.56,33.69,17.0,720.0,174.0,333.0,117.0,1.6509,85700.0
3,-114.57,33.64,14.0,1501.0,337.0,515.0,226.0,3.1917,73400.0
4,-114.57,33.57,20.0,1454.0,326.0,624.0,262.0,1.9250,65500.0
...,...,...,...,...,...,...,...,...,...
16995,-124.26,40.58,52.0,2217.0,394.0,907.0,369.0,2.3571,111400.0
16996,-124.27,40.69,36.0,2349.0,528.0,1194.0,465.0,2.5179,79000.0
16997,-124.30,41.84,17.0,2677.0,531.0,1244.0,456.0,3.0313,103600.0
16998,-124.30,41.80,19.0,2672.0,552.0,1298.0,478.0,1.9797,85800.0


In [None]:
type(df)

pandas.core.frame.DataFrame

In [None]:
# create spark dataframe from pandas dataframe
spark_df = my_spark.createDataFrame(df)
spark_df

DataFrame[longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double]

In [None]:
spark_df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

**Column Operations**

    Updating a Spark DataFrame is somewhat different than working in pandas because the Spark DataFrame is immutable.
    To overwrite an existing column, just pass the name of the column as the first argument.

In [None]:
# show catalog tables
my_spark.catalog.listTables()

[Table(name='test_table', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='my_tempnewtable', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='my_temptable', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [None]:
# create new column added to my_temptable
my_spark.sql('select * from my_temptable').show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

In [None]:
# create dataframe from my_temptable
my_df = my_spark.table('my_temptable')
my_df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

In [None]:
# add column to my_df
my_df = my_df.withColumn('new_col', my_df.latitude + 50)
my_df

DataFrame[longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double, new_col: double]

In [None]:
my_df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-----------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|          new_col|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-----------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|            84.19|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|             84.4|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|            83.69|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|            83.64|
|  -114.57|   33.57|

    The .filter() method == where in SQL.
    The .filter() method takes either an expression that would follow the WHERE clause of a SQL expression as a string, or a Spark Column of boolean (True/False) values.

In [None]:
my_df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- new_col: double (nullable = true)



In [None]:
# show data with rooms greater than 2000
my_df.filter(my_df.total_rooms > 5000).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-----------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|          new_col|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-----------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|            84.19|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|             84.4|
|  -116.06|   34.15|              15.0|    10377.0|        2331.0|    4507.0|    1807.0|       2.2466|           66800.0|            84.15|
|  -116.09|   34.15|              13.0|     9444.0|        1997.0|    4166.0|    1482.0|       2.6111|           65600.0|            84.15|
|  -116.24|   33.72|

In [None]:
my_df.filter('total_rooms > 5000').show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-----------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|          new_col|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-----------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|            84.19|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|             84.4|
|  -116.06|   34.15|              15.0|    10377.0|        2331.0|    4507.0|    1807.0|       2.2466|           66800.0|            84.15|
|  -116.09|   34.15|              13.0|     9444.0|        1997.0|    4166.0|    1482.0|       2.6111|           65600.0|            84.15|
|  -116.24|   33.72|

    The Spark variant of SQL's SELECT is the .select() method. This method takes multiple arguments.
    The difference between .select() and .withColumn() methods is that .select() returns only the columns you specify, 
    while .withColumn() returns all the columns of the DataFrame in addition to the one you defined.

In [None]:
# Select the first set of columns
selected1 = my_df.select("longitude", "latitude", "population")
selected1.show()

+---------+--------+----------+
|longitude|latitude|population|
+---------+--------+----------+
|  -114.31|   34.19|    1015.0|
|  -114.47|    34.4|    1129.0|
|  -114.56|   33.69|     333.0|
|  -114.57|   33.64|     515.0|
|  -114.57|   33.57|     624.0|
|  -114.58|   33.63|     671.0|
|  -114.58|   33.61|    1841.0|
|  -114.59|   34.83|     375.0|
|  -114.59|   33.61|    3134.0|
|   -114.6|   34.83|     787.0|
|   -114.6|   33.62|    2434.0|
|   -114.6|    33.6|    1182.0|
|  -114.61|   34.84|     580.0|
|  -114.61|   34.83|    1346.0|
|  -114.63|   32.76|     949.0|
|  -114.65|   34.89|    1005.0|
|  -114.65|    33.6|     666.0|
|  -114.65|   32.79|      64.0|
|  -114.66|   32.74|     775.0|
|  -114.67|   33.92|      29.0|
+---------+--------+----------+
only showing top 20 rows



In [None]:
# Select the second set of columns
temp = my_df.select(my_df.longitude, my_df.latitude, my_df.population)
temp.show()

+---------+--------+----------+
|longitude|latitude|population|
+---------+--------+----------+
|  -114.31|   34.19|    1015.0|
|  -114.47|    34.4|    1129.0|
|  -114.56|   33.69|     333.0|
|  -114.57|   33.64|     515.0|
|  -114.57|   33.57|     624.0|
|  -114.58|   33.63|     671.0|
|  -114.58|   33.61|    1841.0|
|  -114.59|   34.83|     375.0|
|  -114.59|   33.61|    3134.0|
|   -114.6|   34.83|     787.0|
|   -114.6|   33.62|    2434.0|
|   -114.6|    33.6|    1182.0|
|  -114.61|   34.84|     580.0|
|  -114.61|   34.83|    1346.0|
|  -114.63|   32.76|     949.0|
|  -114.65|   34.89|    1005.0|
|  -114.65|    33.6|     666.0|
|  -114.65|   32.79|      64.0|
|  -114.66|   32.74|     775.0|
|  -114.67|   33.92|      29.0|
+---------+--------+----------+
only showing top 20 rows



In [None]:
# Define first filter
filterA = my_df.total_rooms > 5000

# Define second filter
filterB = my_df.total_bedrooms > 1000

# Filter the data, first by filterA then by filterB
selected2 = temp.filter(filterA).filter(filterB)
selected2.show(10)

+---------+--------+----------+
|longitude|latitude|population|
+---------+--------+----------+
|  -114.31|   34.19|    1015.0|
|  -114.47|    34.4|    1129.0|
|  -116.06|   34.15|    4507.0|
|  -116.09|   34.15|    4166.0|
|  -116.24|   33.72|    2725.0|
|  -116.24|   33.71|    5525.0|
|  -116.26|   33.65|     574.0|
|  -116.29|   33.74|    4571.0|
|  -116.33|   33.75|    2880.0|
|  -116.33|   33.72|    2450.0|
+---------+--------+----------+
only showing top 10 rows



In [None]:
# Select the second set of columns
temp1 = my_df.select(my_df.longitude, my_df.latitude, (my_df.population/1000).alias('popul'))
temp1.show()

+---------+--------+-----+
|longitude|latitude|popul|
+---------+--------+-----+
|  -114.31|   34.19|1.015|
|  -114.47|    34.4|1.129|
|  -114.56|   33.69|0.333|
|  -114.57|   33.64|0.515|
|  -114.57|   33.57|0.624|
|  -114.58|   33.63|0.671|
|  -114.58|   33.61|1.841|
|  -114.59|   34.83|0.375|
|  -114.59|   33.61|3.134|
|   -114.6|   34.83|0.787|
|   -114.6|   33.62|2.434|
|   -114.6|    33.6|1.182|
|  -114.61|   34.84| 0.58|
|  -114.61|   34.83|1.346|
|  -114.63|   32.76|0.949|
|  -114.65|   34.89|1.005|
|  -114.65|    33.6|0.666|
|  -114.65|   32.79|0.064|
|  -114.66|   32.74|0.775|
|  -114.67|   33.92|0.029|
+---------+--------+-----+
only showing top 20 rows



In [None]:
# Select the second set of columns
temp1 = my_df.selectExpr('longitude', 'latitude', 'population/1000 as popul')
temp1.show()

+---------+--------+-----+
|longitude|latitude|popul|
+---------+--------+-----+
|  -114.31|   34.19|1.015|
|  -114.47|    34.4|1.129|
|  -114.56|   33.69|0.333|
|  -114.57|   33.64|0.515|
|  -114.57|   33.57|0.624|
|  -114.58|   33.63|0.671|
|  -114.58|   33.61|1.841|
|  -114.59|   34.83|0.375|
|  -114.59|   33.61|3.134|
|   -114.6|   34.83|0.787|
|   -114.6|   33.62|2.434|
|   -114.6|    33.6|1.182|
|  -114.61|   34.84| 0.58|
|  -114.61|   34.83|1.346|
|  -114.63|   32.76|0.949|
|  -114.65|   34.89|1.005|
|  -114.65|    33.6|0.666|
|  -114.65|   32.79|0.064|
|  -114.66|   32.74|0.775|
|  -114.67|   33.92|0.029|
+---------+--------+-----+
only showing top 20 rows



In [None]:
# use groupby to obtain max and min total_rooms
my_df.filter(my_df.latitude > 33).groupby().min('total_rooms').show()

+----------------+
|min(total_rooms)|
+----------------+
|             2.0|
+----------------+



In [None]:
# use groupby to obtain max and min total_rooms
my_df.filter(my_df.latitude > 33).groupby().max('total_rooms').show()

+----------------+
|max(total_rooms)|
+----------------+
|         37937.0|
+----------------+



In [None]:
# get the average median house value for latitude greater than 33
my_df.filter(my_df.latitude > 33).groupby().avg('median_house_value').show()

+-----------------------+
|avg(median_house_value)|
+-----------------------+
|     208916.37318498964|
+-----------------------+



In [None]:
new_table.show()

+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford County Ai...

In [None]:
new_table.groupby('dst').count().show()

+---+-----+
|dst|count|
+---+-----+
|  U|   45|
|  A| 1329|
|  N|   23|
+---+-----+



In [None]:
new_table.groupby('dst').avg('alt').show()

+---+-----------------+
|dst|         avg(alt)|
+---+-----------------+
|  U|           1113.8|
|  A|986.6455981941309|
|  N|1908.391304347826|
+---+-----------------+



In [None]:
new_table1.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

In [None]:
new_table1.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



In [None]:
flights = (new_table1.withColumn('year', new_table1.year)
                     .withColumn('month', new_table1.month)
                     .withColumn('day', new_table1.day)
                     .withColumn('carrier', new_table1.carrier)
                     .withColumn('tailnum', new_table1.tailnum)
                     .withColumn('flight', new_table1.flight)
                     .withColumn('origin', new_table1.origin)
                     .withColumn('dest', new_table1.dest)
                     .withColumn('distance', new_table1.distance)
                     .withColumn('air_time', new_table1.air_time.cast('float'))
                     .withColumn('hour', new_table1.hour.cast('integer'))
                     .withColumn('minute', new_table1.minute.cast('integer'))
                     .withColumn('dep_time', new_table1.dep_time.cast('integer'))
                     .withColumn('dep_delay', new_table1.dep_delay.cast('integer'))
                     .withColumn('arr_time', new_table1.arr_time.cast('integer'))
                     .withColumn('arr_delay', new_table1.arr_delay.cast('integer'))
          )

flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|   132.0|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|   360.0|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|   111.0|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|    83.0|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|   127.0|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

In [None]:
flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: integer (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: integer (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: float (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)



In [None]:
# Group by tailnum
by_plane = flights.groupBy("tailnum")

# Number of flights each plane made
by_plane.count().show()

# Group by origin
by_origin = flights.groupBy("origin")

# Average duration of flights from PDX and SEA
by_origin.avg("air_time").show()

+-------+-----+
|tailnum|count|
+-------+-----+
| N442AS|   38|
| N102UW|    2|
| N36472|    4|
| N38451|    4|
| N73283|    4|
| N513UA|    2|
| N954WN|    5|
| N388DA|    3|
| N567AA|    1|
| N516UA|    2|
| N927DN|    1|
| N8322X|    1|
| N466SW|    1|
|  N6700|    1|
| N607AS|   45|
| N622SW|    4|
| N584AS|   31|
| N914WN|    4|
| N654AW|    2|
| N336NW|    1|
+-------+-----+
only showing top 20 rows

+------+------------------+
|origin|     avg(air_time)|
+------+------------------+
|   SEA| 160.4361496051259|
|   PDX|137.11543248288737|
+------+------------------+



In [None]:
# Import pyspark.sql.functions as F
import pyspark.sql.functions as F

# Group by month and dest
by_month_dest = flights.groupBy('month','dest')

# Average departure delay by month and destination
by_month_dest.avg('dep_delay').show()

# Standard deviation of departure delay
by_month_dest.agg(F.stddev('dep_delay')).show()

+-----+----+-------------------+
|month|dest|     avg(dep_delay)|
+-----+----+-------------------+
|    4| PHX| 1.6833333333333333|
|    1| RDM|             -1.625|
|    5| ONT| 3.5555555555555554|
|    7| OMA|               -6.5|
|    8| MDW|               7.45|
|    6| DEN|  5.418181818181818|
|    5| IAD|               -4.0|
|   12| COS|               -1.0|
|   11| ANC|  7.529411764705882|
|    5| AUS|              -0.75|
|    5| COS| 11.666666666666666|
|    2| PSP|                0.6|
|    4| ORD|0.14285714285714285|
|   10| DFW| 18.176470588235293|
|   10| DCA|               -1.5|
|    8| JNU|             18.125|
|   11| KOA|               -1.0|
|   10| OMA|-0.6666666666666666|
|    6| ONT|              9.625|
|    3| MSP|                3.2|
+-----+----+-------------------+
only showing top 20 rows

+-----+----+----------------------+
|month|dest|stddev_samp(dep_delay)|
+-----+----+----------------------+
|    4| PHX|    15.003380033491737|
|    1| RDM|     8.830749846821778|
| 

In [None]:
new_table.show()

+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford County Ai...

In [None]:
flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|   132.0|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|   360.0|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|   111.0|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|    83.0|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|   127.0|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

In [None]:
airports = new_table
airports.show()

+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford County Ai...

In [None]:
# Rename the faa column
airports = airports.withColumnRenamed('faa', 'dest')

# Examine the data
print(airports.show())

# Join the DataFrames
flights_with_airports = flights.join(airports, on='dest', how='leftouter')

# Examine the new DataFrame
print(flights_with_airports.show())

+----+--------------------+----------------+-----------------+----+---+---+
|dest|                name|             lat|              lon| alt| tz|dst|
+----+--------------------+----------------+-----------------+----+---+---+
| 04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
| 06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
| 06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
| 06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
| 09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
| 0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
| 0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
| 0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
| 0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
| 0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
| 0W3|Harfor

In [None]:
# read in new data
url = 'https://assets.datacamp.com/production/repositories/1237/datasets/231480a2696c55fde829ce76d936596123f12c0c/planes.csv'

# instantiate url
my_spark.sparkContext.addFile(url)

planes = my_spark.read.csv('file://'+SparkFiles.get('planes.csv'), header=True, inferSchema=True)
planes.show()

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N105UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N107US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N108UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N109UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N110UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA

In [None]:
planes = planes.withColumnRenamed('year', 'plane_year')
planes.show()

+-------+----------+--------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|plane_year|                type|    manufacturer|   model|engines|seats|speed|   engine|
+-------+----------+--------------------+----------------+--------+-------+-----+-----+---------+
| N102UW|      1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N103US|      1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N104UW|      1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N105UW|      1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N107US|      1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N108UW|      1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N109UW|      1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N110UW|      1999|

In [None]:
# Join the DataFrames
model_data = flights.join(planes, on='tailnum', how="leftouter")
model_data.show()

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+--------------+-----------+-------+-----+-----+---------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|  manufacturer|      model|engines|seats|speed|   engine|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+--------------+-----------+-------+-----+-----+---------+
| N846VA|2014|   12|  8|     658|       -7|     935|       -5|     VX|  1780|   SEA| LAX|   132.0|     954|   6|    58|      2011|Fixed wing multi ...|        AIRBUS|   A320-214|      2|  182|   NA|Turbo-fan|
| N559AS|2014|    1| 22|    1040|        5|    1505|        5|     AS|   851|   SEA| HNL|   360.0|    2677|  10|    40|      2006|Fixed wing multi ...|        BOEIN

In [None]:
model_data.printSchema()

root
 |-- tailnum: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: integer (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: integer (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: float (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- plane_year: string (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: integer (nullable = true)
 |-- seats: integer (nullable = true)
 |-- speed: string (nullable = true)
 |-- engine: string (nullable = true)



In [None]:
# convert the plane_year type to integer.
model_data = model_data.withColumn("plane_year", model_data.plane_year.cast('integer'))
model_data.printSchema()

root
 |-- tailnum: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: integer (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: integer (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: float (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- plane_year: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: integer (nullable = true)
 |-- seats: integer (nullable = true)
 |-- speed: string (nullable = true)
 |-- engine: string (nullable = true)



In [None]:
# Create the column plane_age
model_data = model_data.withColumn("plane_age", model_data.year - model_data.plane_year)
model_data.printSchema()

root
 |-- tailnum: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: integer (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: integer (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: float (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- plane_year: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: integer (nullable = true)
 |-- seats: integer (nullable = true)
 |-- speed: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- plane_age: integer (nullable = true)



In [None]:
# Create is_late
model_data = model_data.withColumn("label", (model_data.arr_delay > 0).cast('integer'))

# Remove missing values
model_data = model_data.filter("arr_delay is not NULL and \
                                dep_delay is not NULL and \
                                air_time is not NULL and \
                                plane_year is not NULL")

model_data.show()

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+--------------+-----------+-------+-----+-----+---------+---------+-----+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|  manufacturer|      model|engines|seats|speed|   engine|plane_age|label|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+--------------+-----------+-------+-----+-----+---------+---------+-----+
| N846VA|2014|   12|  8|     658|       -7|     935|       -5|     VX|  1780|   SEA| LAX|   132.0|     954|   6|    58|      2011|Fixed wing multi ...|        AIRBUS|   A320-214|      2|  182|   NA|Turbo-fan|        3|    0|
| N559AS|2014|    1| 22|    1040|        5|    1505|        5|     AS|   851|   SEA| HNL|   360.0|  

In [None]:
# import encoders
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

# Create a StringIndexer
carr_indexer = StringIndexer(inputCol="carrier", outputCol="carrier_index")

# Create a OneHotEncoder
carr_encoder = OneHotEncoder(inputCol="carrier_index", outputCol="carrier_fact")

In [None]:
# Create a StringIndexer
dest_indexer = StringIndexer(inputCol="dest", outputCol="dest_index")

# Create a OneHotEncoder
dest_encoder = OneHotEncoder(inputCol="dest_index", outputCol="dest_fact")

In [None]:
# Make a VectorAssembler
vec_assembler = VectorAssembler(inputCols=["month", "air_time", \
                                           "carrier_fact", "dest_fact", "plane_age"], outputCol='features')

In [None]:
# Import Pipeline
from pyspark.ml import Pipeline

# Make the pipeline
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, \
                                carr_indexer, carr_encoder, vec_assembler])

In [None]:
# Fit and transform the data
piped_data = flights_pipe.fit(model_data).transform(model_data)

In [None]:
piped_data.show()

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+--------------+-----------+-------+-----+-----+---------+---------+-----+----------+---------------+-------------+--------------+--------------------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|  manufacturer|      model|engines|seats|speed|   engine|plane_age|label|dest_index|      dest_fact|carrier_index|  carrier_fact|            features|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+--------------+-----------+-------+-----+-----+---------+---------+-----+----------+---------------+-------------+--------------+--------------------+
| N846VA|2014|   12|  8|     658|       -7|     935|       -5|     VX|  1780|   SEA| LAX|   13

In [None]:
# Split the data into training and test sets
training, test = piped_data.randomSplit([.6, .4])

In [None]:
# Import LogisticRegression
from pyspark.ml.classification import LogisticRegression

# Create a LogisticRegression Estimator
lr = LogisticRegression()

In [None]:
# Import the evaluation submodule
import pyspark.ml.evaluation as evals

# Create a BinaryClassificationEvaluator
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

In [None]:
# imort numpy
import numpy as np

# Import the tuning submodule
import pyspark.ml.tuning as tune

# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1])

# Build the grid
grid = grid.build()

In [80]:
# Create the CrossValidator
cv = tune.CrossValidator(estimator=lr,
                         estimatorParamMaps=grid,
                         evaluator=evaluator
                        )

In [81]:
# Fit cross validation models
models = cv.fit(training)

# Extract the best model
best_lr = models.bestModel

In [82]:
best_lr

LogisticRegressionModel: uid=LogisticRegression_efaec8c1d29f, numClasses=2, numFeatures=81

In [83]:
# Use the model to predict the test set
test_results = best_lr.transform(test)

# Evaluate the predictions
print(evaluator.evaluate(test_results))

0.686353414377754
