In [1]:
# Import the PySpark module
from pyspark.sql import SparkSession

# Create SparkSession object
spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('test') \
                    .getOrCreate()

In [2]:
# Read data from CSV file
flights = spark.read.csv('flights-larger.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')


In [3]:
flights.show(5)

+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 10| 10|  1|     OO|  5836|ORD| 157|  8.18|      51|   27|
|  1|  4|  1|     OO|  5866|ORD| 466|  15.5|     102| null|
| 11| 22|  1|     OO|  6016|ORD| 738|  7.17|     127|  -19|
|  2| 14|  5|     B6|   199|JFK|2248| 21.17|     365|   60|
|  5| 25|  3|     WN|  1675|SJC| 386| 12.92|      85|   22|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 5 rows



In [4]:
# Remove the 'flight' column
flights = flights.drop('flight')

In [5]:
# Number of records with missing 'delay' values
flights.filter('delay IS NULL').count()

16711

In [6]:
# Remove records with missing 'delay' values
flights = flights.filter('delay IS NOT NULL')

In [7]:
# Remove records with missing values in any column and get the number of remaining rows
flights = flights.dropna()
print(flights.count())

258289


In [8]:
flights.show(5)

+---+---+---+-------+---+----+------+--------+-----+
|mon|dom|dow|carrier|org|mile|depart|duration|delay|
+---+---+---+-------+---+----+------+--------+-----+
| 10| 10|  1|     OO|ORD| 157|  8.18|      51|   27|
| 11| 22|  1|     OO|ORD| 738|  7.17|     127|  -19|
|  2| 14|  5|     B6|JFK|2248| 21.17|     365|   60|
|  5| 25|  3|     WN|SJC| 386| 12.92|      85|   22|
|  3| 28|  1|     B6|LGA|1076| 13.33|     182|   70|
+---+---+---+-------+---+----+------+--------+-----+
only showing top 5 rows



In [9]:
# Import the required function
from pyspark.sql.functions import round

In [10]:
# Convert 'mile' to 'km' and drop 'mile' column
flights_km = flights.withColumn('km', round(flights.mile * 1.60934, 0)).drop('Mile')

In [11]:
flights_km.show(5)

+---+---+---+-------+---+------+--------+-----+------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|
+---+---+---+-------+---+------+--------+-----+------+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|
+---+---+---+-------+---+------+--------+-----+------+
only showing top 5 rows



In [12]:
# Create 'label' column indicating where flights delayed (1) or not (0)
flights_km = flights_km.withColumn('label',
                                  (flights_km.delay >= 15).cast('integer'))

In [13]:
flights_km.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|
+---+---+---+-------+---+------+--------+-----+------+-----+
| 10| 10|  1|     OO|ORD|  8.18|      51|   27| 253.0|    1|
| 11| 22|  1|     OO|ORD|  7.17|     127|  -19|1188.0|    0|
|  2| 14|  5|     B6|JFK| 21.17|     365|   60|3618.0|    1|
|  5| 25|  3|     WN|SJC| 12.92|      85|   22| 621.0|    1|
|  3| 28|  1|     B6|LGA| 13.33|     182|   70|1732.0|    1|
+---+---+---+-------+---+------+--------+-----+------+-----+
only showing top 5 rows



In [14]:
flights_km.dtypes

[('mon', 'int'),
 ('dom', 'int'),
 ('dow', 'int'),
 ('carrier', 'string'),
 ('org', 'string'),
 ('depart', 'double'),
 ('duration', 'int'),
 ('delay', 'int'),
 ('km', 'double'),
 ('label', 'int')]

In [15]:
from pyspark.ml.feature import StringIndexer

In [16]:
# Create an indexer
indexer = StringIndexer(inputCol='carrier', outputCol='carrier_idx')

In [17]:
# Indexer indentifies categories in the data
indexer_model = indexer.fit(flights)

In [18]:
# Indexer creates a new column with numeric index values
flights_indexed = indexer_model.transform(flights)

In [19]:
# Repeat the process for the other categorical feature
flights_indexed = StringIndexer(inputCol='org', outputCol='org_idx').fit(flights_indexed).transform(flights_indexed)

In [20]:
flights_indexed.show(5)

+---+---+---+-------+---+----+------+--------+-----+-----------+-------+
|mon|dom|dow|carrier|org|mile|depart|duration|delay|carrier_idx|org_idx|
+---+---+---+-------+---+----+------+--------+-----+-----------+-------+
| 10| 10|  1|     OO|ORD| 157|  8.18|      51|   27|        2.0|    0.0|
| 11| 22|  1|     OO|ORD| 738|  7.17|     127|  -19|        2.0|    0.0|
|  2| 14|  5|     B6|JFK|2248| 21.17|     365|   60|        4.0|    2.0|
|  5| 25|  3|     WN|SJC| 386| 12.92|      85|   22|        3.0|    5.0|
|  3| 28|  1|     B6|LGA|1076| 13.33|     182|   70|        4.0|    3.0|
+---+---+---+-------+---+----+------+--------+-----+-----------+-------+
only showing top 5 rows



In [21]:
flights_indexed.dtypes

[('mon', 'int'),
 ('dom', 'int'),
 ('dow', 'int'),
 ('carrier', 'string'),
 ('org', 'string'),
 ('mile', 'int'),
 ('depart', 'double'),
 ('duration', 'int'),
 ('delay', 'int'),
 ('carrier_idx', 'double'),
 ('org_idx', 'double')]

In [25]:
# This code is not working
# # Import the necessary class
# from pyspark.ml.feature import VectorAssembler

# # Create an assembler object
# assembler = VectorAssembler(inputCols=[
#     'mon', 'dom', 'dow', 'carrier_idx', 'org_idx', 'km', 'depart', 'duration'
# ], outputCol='features')

# # Consolidate predictor columns
# flights_assembled = assembler.transform(flights)

# # Check the resulting column
# flights_assembled.select('features', 'delay').show(5, truncate=False)

In [26]:
# Terminate the cluster
spark.stop()