# Installing PySpark on Google Colab

Special thanks to my colleagues Jeff & James for content in this notebook <3

In [1]:
!apt update

[33m0% [Working][0m            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
[33m0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Waiting f[0m                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
[33m0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Waiting f[0m[33m0% [2 InRelease gpgv 3,622 B] [Waiting for headers] [Waiting for headers] [Wait[0m                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
[33m0% [2 InRelease gpgv 3,622 B] [Waiting for headers] [Waiting for headers] [Wait[0m                                                                               Hit:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
[33m0% [2 InRel

In [4]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-1.8.0-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

In [6]:
import findspark
findspark.init()

# Resilient Distributed Datasets

Resilient Distributed Datasets (RDD) are fundamental data structures of Spark. An RDD is essentially the Spark representation of a set of data, spread across multiple machines, with APIs to let you act on it.

Use an RDD when:
[(quoted from databricks)](https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html)

- you want low-level transformation and actions and control on your dataset;
- your data is unstructured, such as media streams or streams of text;
- you want to manipulate your data with functional programming constructs than domain specific expressions;
- you don’t care about imposing a schema, such as columnar format, while processing or accessing data attributes by name or column

RDDs have 2 operations that you can perform on them:
- Transformations (create a new RDD)
- Actions (return results)

Note: transformations are lazy operators, they won't actually perform the transformation until an action is performed.

In [7]:
import pyspark

In [8]:
# create a new RDD 
nums = list(range(1,1001))

sc = pyspark.SparkContext('local[*]')

rdd = sc.parallelize(nums, numSlices=10)
rdd.getNumPartitions()

10

#### Examples of Actions


In [9]:
# first
rdd.first()

1

In [12]:
# take
rdd.take(10)

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [15]:
# collect
rdd.collect()

[1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99,
 100,
 101,
 102,
 103,
 104,
 105,
 106,
 107,
 108,
 109,
 110,
 111,
 112,
 113,
 114,
 115,
 116,
 117,
 118,
 119,
 120,
 121,
 122,
 123,
 124,
 125,
 126,
 127,
 128,
 129,
 130,
 131,
 132,
 133,
 134,
 135,
 136,
 137,
 138,
 139,
 140,
 141,
 142,
 143,
 144,
 145,
 146,
 147,
 148,
 149,
 150,
 151,
 152,
 153,
 154,
 155,
 156,
 157,
 158,
 159,
 160,
 161,
 162,
 163,
 164,
 165,
 166,
 167,
 168,
 169,
 170,
 171,
 172,
 173,
 174,
 175,
 176,
 177,
 178,
 179,
 180,
 181,
 182,
 183,
 184,
 185

In [18]:
# grab first partition using glom
rdd.glom().collect()[3]

[301,
 302,
 303,
 304,
 305,
 306,
 307,
 308,
 309,
 310,
 311,
 312,
 313,
 314,
 315,
 316,
 317,
 318,
 319,
 320,
 321,
 322,
 323,
 324,
 325,
 326,
 327,
 328,
 329,
 330,
 331,
 332,
 333,
 334,
 335,
 336,
 337,
 338,
 339,
 340,
 341,
 342,
 343,
 344,
 345,
 346,
 347,
 348,
 349,
 350,
 351,
 352,
 353,
 354,
 355,
 356,
 357,
 358,
 359,
 360,
 361,
 362,
 363,
 364,
 365,
 366,
 367,
 368,
 369,
 370,
 371,
 372,
 373,
 374,
 375,
 376,
 377,
 378,
 379,
 380,
 381,
 382,
 383,
 384,
 385,
 386,
 387,
 388,
 389,
 390,
 391,
 392,
 393,
 394,
 395,
 396,
 397,
 398,
 399,
 400]

In [19]:
print(type(rdd))

<class 'pyspark.rdd.RDD'>


#### Examples of Transformations
- map
- filter 

In [20]:
# map
# use a lambda to return x+1 if x is even, else just return x
even_rdd = rdd.map(lambda x: x + 1 if x % 2 == 0 else x)

In [21]:
even_rdd.take(5)

[1, 3, 3, 5, 5]

In [22]:
# now let's try to just return even results
rdd.map(lambda x: x if x % 2 == 0)
# can't really use a map for this...

SyntaxError: ignored

In [23]:
# try with filter now
only_evens = rdd.filter(lambda x: x % 2 == 0)
only_evens.take(10)

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

In [24]:
# stop your pyspark context instance
# can't have multiple connections at once!
sc.stop()

# Spark DataFrame

Dataframes in PySpark are the distributed collection of structured or semi-structured data. This data in Dataframe is stored in rows under named columns which is similar to the relational database tables or excel sheets. 

Use a Dataframe when:
[(also quoted from databricks)](https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html)

- you want rich semantics, high-level abstractions, and domain specific APIs, use DataFrame
- your processing demands high-level expressions, filters, maps, aggregation, averages, sum, SQL queries, columnar access and use of lambda functions on semi-structured data, use DataFrame
- you want higher degree of type-safety at compile time, want typed JVM objects, take advantage of Catalyst optimization, and benefit from Tungsten’s efficient code generation, use Dataset;
- you want unification and simplification of APIs across Spark Libraries, use DataFrame or Dataset;
- If you are a R user, use DataFrames.
- If you are a Python user, use DataFrames and resort back to RDDs if you need more control.

Note: Machine learning algorithms are run on DataFrames

In [103]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [157]:
spark = SparkSession.builder.master('local').getOrCreate()

Need to upload data to access here!

Link: https://www.kaggle.com/usdot/flight-delays

In [158]:
# reading in pyspark df
spark_df = spark.read.csv('/flights.csv', header='true', inferSchema='true')

# observing the datatype of df
type(spark_df)

pyspark.sql.dataframe.DataFrame

In [107]:
# number of rows
print(spark_df.count())

5819079


In [108]:
# number of columns
print(len(spark_df.columns))

31


In [109]:
# check first five rows v1
spark_df.take(5)

[Row(YEAR=2015, MONTH=1, DAY=1, DAY_OF_WEEK=4, AIRLINE='AS', FLIGHT_NUMBER=98, TAIL_NUMBER='N407AS', ORIGIN_AIRPORT='ANC', DESTINATION_AIRPORT='SEA', SCHEDULED_DEPARTURE=5, DEPARTURE_TIME=2354, DEPARTURE_DELAY=-11, TAXI_OUT=21, WHEELS_OFF=15, SCHEDULED_TIME=205, ELAPSED_TIME=194, AIR_TIME=169, DISTANCE=1448, WHEELS_ON=404, TAXI_IN=4, SCHEDULED_ARRIVAL=430, ARRIVAL_TIME=408, ARRIVAL_DELAY=-22, DIVERTED=0, CANCELLED=0, CANCELLATION_REASON=None, AIR_SYSTEM_DELAY=None, SECURITY_DELAY=None, AIRLINE_DELAY=None, LATE_AIRCRAFT_DELAY=None, WEATHER_DELAY=None),
 Row(YEAR=2015, MONTH=1, DAY=1, DAY_OF_WEEK=4, AIRLINE='AA', FLIGHT_NUMBER=2336, TAIL_NUMBER='N3KUAA', ORIGIN_AIRPORT='LAX', DESTINATION_AIRPORT='PBI', SCHEDULED_DEPARTURE=10, DEPARTURE_TIME=2, DEPARTURE_DELAY=-8, TAXI_OUT=12, WHEELS_OFF=14, SCHEDULED_TIME=280, ELAPSED_TIME=279, AIR_TIME=263, DISTANCE=2330, WHEELS_ON=737, TAXI_IN=4, SCHEDULED_ARRIVAL=750, ARRIVAL_TIME=741, ARRIVAL_DELAY=-9, DIVERTED=0, CANCELLED=0, CANCELLATION_REASON=Non

In [110]:
# check first five rows v2
spark_df.head(5)

[Row(YEAR=2015, MONTH=1, DAY=1, DAY_OF_WEEK=4, AIRLINE='AS', FLIGHT_NUMBER=98, TAIL_NUMBER='N407AS', ORIGIN_AIRPORT='ANC', DESTINATION_AIRPORT='SEA', SCHEDULED_DEPARTURE=5, DEPARTURE_TIME=2354, DEPARTURE_DELAY=-11, TAXI_OUT=21, WHEELS_OFF=15, SCHEDULED_TIME=205, ELAPSED_TIME=194, AIR_TIME=169, DISTANCE=1448, WHEELS_ON=404, TAXI_IN=4, SCHEDULED_ARRIVAL=430, ARRIVAL_TIME=408, ARRIVAL_DELAY=-22, DIVERTED=0, CANCELLED=0, CANCELLATION_REASON=None, AIR_SYSTEM_DELAY=None, SECURITY_DELAY=None, AIRLINE_DELAY=None, LATE_AIRCRAFT_DELAY=None, WEATHER_DELAY=None),
 Row(YEAR=2015, MONTH=1, DAY=1, DAY_OF_WEEK=4, AIRLINE='AA', FLIGHT_NUMBER=2336, TAIL_NUMBER='N3KUAA', ORIGIN_AIRPORT='LAX', DESTINATION_AIRPORT='PBI', SCHEDULED_DEPARTURE=10, DEPARTURE_TIME=2, DEPARTURE_DELAY=-8, TAXI_OUT=12, WHEELS_OFF=14, SCHEDULED_TIME=280, ELAPSED_TIME=279, AIR_TIME=263, DISTANCE=2330, WHEELS_ON=737, TAXI_IN=4, SCHEDULED_ARRIVAL=750, ARRIVAL_TIME=741, ARRIVAL_DELAY=-9, DIVERTED=0, CANCELLED=0, CANCELLATION_REASON=Non

In [111]:
# check column datatypes
spark_df.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (null

In [123]:
from pyspark.sql.functions import isnan, when, count, col

# check for nans in each column
spark_df.select([count(when(isnan(c), c)).alias(c) for c in spark_df.columns]).show()

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

In [124]:
# but NOT the same as nulls!
spark_df.select([count(when(col(c).isNull(), c)).alias(c) for c in spark_df.columns]).show()

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

In [112]:
# can groupby
spark_df.groupby('DAY_OF_WEEK').count().show()

+-----------+------+
|DAY_OF_WEEK| count|
+-----------+------+
|          1|865543|
|          6|700545|
|          3|855897|
|          5|862209|
|          4|872521|
|          7|817764|
|          2|844600|
+-----------+------+



In [159]:
# only want certain columns
spark_df = spark_df.select(col("MONTH"),col("DAY_OF_WEEK"), col("AIR_TIME"), col('ARRIVAL_DELAY'))

In [160]:
spark_df.columns

['MONTH', 'DAY_OF_WEEK', 'AIR_TIME', 'ARRIVAL_DELAY']

In [163]:
spark_df.take(5)

[Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=169, ARRIVAL_DELAY=-22),
 Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=263, ARRIVAL_DELAY=-9),
 Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=266, ARRIVAL_DELAY=5),
 Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=258, ARRIVAL_DELAY=-9),
 Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=199, ARRIVAL_DELAY=-21)]

In [162]:
# need to drop nulls in those columns
spark_df = spark_df.na.drop(subset=["AIR_TIME", "ARRIVAL_DELAY"])

Now - time to prep our target! Going to predict whether there was a delay in arrival or not

In [161]:
def prep_target(delay_value):
  if delay_value < 0: 
    return 0
  else: 
    return 1

In [164]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
# here, creating a User Defined Function, resulting in a boolean column
udfTargetToCategory = udf(prep_target, IntegerType())

preprocessed_df = spark_df.withColumn("delay_ind", udfTargetToCategory("ARRIVAL_DELAY"))


In [165]:
preprocessed_df.take(5)

[Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=169, ARRIVAL_DELAY=-22, delay_ind=0),
 Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=263, ARRIVAL_DELAY=-9, delay_ind=0),
 Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=266, ARRIVAL_DELAY=5, delay_ind=1),
 Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=258, ARRIVAL_DELAY=-9, delay_ind=0),
 Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=199, ARRIVAL_DELAY=-21, delay_ind=0)]

Need to encode!

https://spark.apache.org/docs/2.3.0/api/python/pyspark.ml.html#pyspark.ml.feature.OneHotEncoderEstimator

In [166]:
from pyspark.ml import feature

ohe = feature.OneHotEncoderEstimator(inputCols=['MONTH', 'DAY_OF_WEEK'], 
                                     outputCols=['month_vec', 'day_vec'])
ohe_hot_encoded = ohe.fit(preprocessed_df).transform(preprocessed_df)
ohe_hot_encoded.head()

Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=169, ARRIVAL_DELAY=-22, delay_ind=0, month_vec=SparseVector(12, {1: 1.0}), day_vec=SparseVector(7, {4: 1.0}))

In [167]:
ohe_hot_encoded.take(5)

[Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=169, ARRIVAL_DELAY=-22, delay_ind=0, month_vec=SparseVector(12, {1: 1.0}), day_vec=SparseVector(7, {4: 1.0})),
 Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=263, ARRIVAL_DELAY=-9, delay_ind=0, month_vec=SparseVector(12, {1: 1.0}), day_vec=SparseVector(7, {4: 1.0})),
 Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=266, ARRIVAL_DELAY=5, delay_ind=1, month_vec=SparseVector(12, {1: 1.0}), day_vec=SparseVector(7, {4: 1.0})),
 Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=258, ARRIVAL_DELAY=-9, delay_ind=0, month_vec=SparseVector(12, {1: 1.0}), day_vec=SparseVector(7, {4: 1.0})),
 Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=199, ARRIVAL_DELAY=-21, delay_ind=0, month_vec=SparseVector(12, {1: 1.0}), day_vec=SparseVector(7, {4: 1.0}))]

In [168]:
# only using a few of the features as inputs
features = ['AIR_TIME', 'month_vec', 'day_vec']
target = 'delay_ind'

# need to vectorize the inputs
vector = feature.VectorAssembler(inputCols = features, outputCol = 'features')
vectorized_df = vector.transform(ohe_hot_encoded)

In [143]:
print(type(vector))

<class 'pyspark.ml.feature.VectorAssembler'>


In [169]:
vectorized_df.columns

['MONTH',
 'DAY_OF_WEEK',
 'AIR_TIME',
 'ARRIVAL_DELAY',
 'delay_ind',
 'month_vec',
 'day_vec',
 'features']

In [145]:
vectorized_df.take(5)

[Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=169, ARRIVAL_DELAY=-22, delay_ind=False, month_vec=SparseVector(12, {1: 1.0}), day_vec=SparseVector(7, {4: 1.0}), features=SparseVector(20, {0: 169.0, 2: 1.0, 17: 1.0})),
 Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=263, ARRIVAL_DELAY=-9, delay_ind=False, month_vec=SparseVector(12, {1: 1.0}), day_vec=SparseVector(7, {4: 1.0}), features=SparseVector(20, {0: 263.0, 2: 1.0, 17: 1.0})),
 Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=266, ARRIVAL_DELAY=5, delay_ind=True, month_vec=SparseVector(12, {1: 1.0}), day_vec=SparseVector(7, {4: 1.0}), features=SparseVector(20, {0: 266.0, 2: 1.0, 17: 1.0})),
 Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=258, ARRIVAL_DELAY=-9, delay_ind=False, month_vec=SparseVector(12, {1: 1.0}), day_vec=SparseVector(7, {4: 1.0}), features=SparseVector(20, {0: 258.0, 2: 1.0, 17: 1.0})),
 Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=199, ARRIVAL_DELAY=-21, delay_ind=False, month_vec=SparseVector(12, {1: 1.0}), day_vec=SparseVector(7, {4: 1.0}), features=SparseVe

It appears I'm not alone in having trouble with `randomSplit`: https://medium.com/udemy-engineering/pyspark-under-the-hood-randomsplit-and-sample-inconsistencies-examined-7c6ec62644bc

(I'll note, though, that my problems were not rooted in this issue!)

In [170]:
# train test split
# Implementing the solution discussed above

persist_df = vectorized_df.persist(pyspark.StorageLevel.MEMORY_AND_DISK)

train, test = persist_df.randomSplit([0.7, 0.3], seed = 11)

In [171]:
type(train)

pyspark.sql.dataframe.DataFrame

In [172]:
train.take(5)

[Row(MONTH=1, DAY_OF_WEEK=1, AIR_TIME=9, ARRIVAL_DELAY=-25, delay_ind=0, month_vec=SparseVector(12, {1: 1.0}), day_vec=SparseVector(7, {1: 1.0}), features=SparseVector(20, {0: 9.0, 2: 1.0, 14: 1.0})),
 Row(MONTH=1, DAY_OF_WEEK=1, AIR_TIME=9, ARRIVAL_DELAY=-16, delay_ind=0, month_vec=SparseVector(12, {1: 1.0}), day_vec=SparseVector(7, {1: 1.0}), features=SparseVector(20, {0: 9.0, 2: 1.0, 14: 1.0})),
 Row(MONTH=1, DAY_OF_WEEK=1, AIR_TIME=13, ARRIVAL_DELAY=-9, delay_ind=0, month_vec=SparseVector(12, {1: 1.0}), day_vec=SparseVector(7, {1: 1.0}), features=SparseVector(20, {0: 13.0, 2: 1.0, 14: 1.0})),
 Row(MONTH=1, DAY_OF_WEEK=1, AIR_TIME=13, ARRIVAL_DELAY=31, delay_ind=1, month_vec=SparseVector(12, {1: 1.0}), day_vec=SparseVector(7, {1: 1.0}), features=SparseVector(20, {0: 13.0, 2: 1.0, 14: 1.0})),
 Row(MONTH=1, DAY_OF_WEEK=1, AIR_TIME=14, ARRIVAL_DELAY=-18, delay_ind=0, month_vec=SparseVector(12, {1: 1.0}), day_vec=SparseVector(7, {1: 1.0}), features=SparseVector(20, {0: 14.0, 2: 1.0, 14:

In [175]:
# Now let's try a classifier!
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'delay_ind', 
                            maxDepth = 3)
dtModel = dt.fit(train)
predictions = dtModel.transform(test)

In [177]:
 # Evaluate!
 from pyspark.ml.evaluation import BinaryClassificationEvaluator

 evaluator = BinaryClassificationEvaluator(rawPredictionCol = 'prediction',
                                           labelCol = 'delay_ind')
 evaluator.evaluate(predictions) # Note: ROC/AUC score

0.5

#### Evaluate! How'd we do? Why?

- .5 ROC/AUC??? Guess: is our model only predicting a single class?


In [179]:
# Explore our predictions
predictions.groupby('prediction').count().show()
# note - this is the size of the test set

+----------+-------+
|prediction|  count|
+----------+-------+
|       0.0|1713597|
+----------+-------+



In [182]:
# Explore our original data
test.groupby('delay_ind').count().show()

+---------+-------+
|delay_ind|  count|
+---------+-------+
|        1| 663602|
|        0|1049995|
+---------+-------+



In [193]:
preprocessed_df.groupby('delay_ind').count().show()

+---------+-------+
|delay_ind|  count|
+---------+-------+
|        1|2213109|
|        0|3500899|
+---------+-------+



Yup - it is only predicting the majority class.

How can we fix this? Let's try undersampling our majority so our classes are more balanced.

In [190]:
# Note that these don't use square brackets!
major_df = preprocessed_df.filter(col('delay_ind') == 0)
minor_df = preprocessed_df.filter(col('delay_ind') == 1)

In [200]:
# Down-sample - without replacement
sampled_majority_df = major_df.sample(False, .65)

In [201]:
sampled_majority_df.count()

2274927

In [202]:
combined_df = sampled_majority_df.unionAll(minor_df)

In [203]:
combined_df.groupby('delay_ind').count().show()

+---------+-------+
|delay_ind|  count|
+---------+-------+
|        1|2213109|
|        0|2274927|
+---------+-------+



In [204]:
combined_df.columns

['MONTH', 'DAY_OF_WEEK', 'AIR_TIME', 'ARRIVAL_DELAY', 'delay_ind']

In [205]:
ohe = feature.OneHotEncoderEstimator(inputCols=['MONTH', 'DAY_OF_WEEK'], 
                                     outputCols=['month_vec', 'day_vec'])
ohe_hot_encoded = ohe.fit(combined_df).transform(combined_df)
ohe_hot_encoded.head()

Row(MONTH=1, DAY_OF_WEEK=4, AIR_TIME=169, ARRIVAL_DELAY=-22, delay_ind=0, month_vec=SparseVector(12, {1: 1.0}), day_vec=SparseVector(7, {4: 1.0}))

In [206]:
features = ['AIR_TIME', 'month_vec', 'day_vec']
target = 'delay_ind'

vector = feature.VectorAssembler(inputCols = features, outputCol = 'features')
vectorized_df = vector.transform(ohe_hot_encoded)

In [207]:
persist_df = vectorized_df.persist(pyspark.StorageLevel.MEMORY_AND_DISK)

train, test = persist_df.randomSplit([0.7, 0.3], seed = 11)

In [208]:
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'delay_ind', 
                            maxDepth = 9)
dtModel = dt.fit(train)
predictions = dtModel.transform(test)

In [209]:
 evaluator = BinaryClassificationEvaluator(rawPredictionCol = 'prediction',
                                           labelCol = 'delay_ind')
 evaluator.evaluate(predictions)

0.542377707269281

#### Evaluate!

- 


# Other Notes

## Hadoop vs Spark: which is better?

> Spark has been found to run 100 times faster in-memory, and 10 times faster on disk. It’s also been used to sort 100 TB of data 3 times faster than Hadoop MapReduce on one-tenth of the machines. Spark has particularly been found to be faster on machine learning applications, such as Naive Bayes and k-means. Spark performance, as measured by processing speed, has been found to be optimal over Hadoop, for several reasons:
- Spark is not bound by input-output concerns every time it runs a selected part of a MapReduce task. It’s proven to be much faster for applications.
- Spark’s DAGs enable optimizations between steps. Hadoop doesn’t have any cyclical connection between MapReduce steps, meaning no performance tuning can occur at that level. However, if Spark is running on YARN with other shared services, performance might degrade and cause RAM overhead memory leaks. For this reason, if a user has a use-case of batch processing, Hadoop has been found to be the more efficient system.

Using Hadoop and Spark together
> There are several instances where you would want to use the two tools together. Despite some asking if Spark will replace Hadoop entirely because of the former’s processing power, they are meant to complement each other rather than compete