Getting to know PySpark

In [2]:
!pip install pyspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 68kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 26.2MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=2ebbc3fa430fd29f7e266233657892a43a17ea2730051932e4170f23149bc78e
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [3]:
import pyspark as sp

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

In [6]:
!echo $SPARK_HOME

/content/spark-3.0.1-bin-hadoop2.7


In [7]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
#sc = spark.SparkContext.getOrCreate()

In [8]:
#Verify SparkContext
print(spark)

#Print Spark version
print(spark.version)

<pyspark.sql.session.SparkSession object at 0x7f3c4627a4a8>
3.0.1


Creating a SparkSession

In [9]:
from pyspark.sql import SparkSession

#Create my_spark
spark = SparkSession.builder.getOrCreate()

print(spark)

<pyspark.sql.session.SparkSession object at 0x7f3c4627a4a8>


Viewing tables

In [16]:
from google.colab import files
files.upload()

!ls

flights = spark.read.csv('flights_small.csv', inferSchema=True, header=True)
flights.printSchema()

spark.catalog.listTables()

Saving flights_small.csv to flights_small.csv
 13fb36dd18bb2a98b6e233f63c5608ad.jpg   sample_data
 BD.zip				        spark-3.0.1-bin-hadoop2.7
'flights_small[1].csv'		        spark-3.0.1-bin-hadoop2.7.tgz
 flights_small.csv
root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



[]

Are you query-ious?

In [17]:
flights.createOrReplaceTempView("flights")

query = "FROM flights SELECT * LIMIT 10"

flights10 = spark.sql(query)

flights10.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

Pandafy a Spark DataFrame

In [19]:
query = "SELECT origin, dest, COUNT(*) as N FROM flights GROUP BY origin, dest"

flight_counts = spark.sql(query)

pd_counts = flight_counts.toPandas()

print(pd_counts.head())

  origin dest    N
0    SEA  RNO    8
1    SEA  DTW   98
2    SEA  CLE    2
3    SEA  LAX  450
4    PDX  SEA  144


Put some Spark in your data

In [20]:
import numpy as np
import pandas as pd

In [24]:
pd_temp = pd.DataFrame(np.random.random(10))

spark_temp = spark.createDataFrame(pd_temp)

print(spark.catalog.listTables())

spark_temp.name = spark_temp.createOrReplaceTempView('temp')

print(spark.catalog.listTables())

files.upload()

[Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
[Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


Saving airports.csv to airports.csv


{'airports.csv': b'"faa","name","lat","lon","alt","tz","dst"\r\n"04G","Lansdowne Airport",41.1304722,-80.6195833,1044,-5,"A"\r\n"06A","Moton Field Municipal Airport",32.4605722,-85.6800278,264,-5,"A"\r\n"06C","Schaumburg Regional",41.9893408,-88.1012428,801,-6,"A"\r\n"06N","Randall Airport",41.431912,-74.3915611,523,-5,"A"\r\n"09J","Jekyll Island Airport",31.0744722,-81.4277778,11,-4,"A"\r\n"0A9","Elizabethton Municipal Airport",36.3712222,-82.1734167,1593,-4,"A"\r\n"0G6","Williams County Airport",41.4673056,-84.5067778,730,-5,"A"\r\n"0G7","Finger Lakes Regional Airport",42.8835647,-76.7812318,492,-5,"A"\r\n"0P2","Shoestring Aviation Airfield",39.7948244,-76.6471914,1000,-5,"U"\r\n"0S9","Jefferson County Intl",48.0538086,-122.8106436,108,-8,"A"\r\n"0W3","Harford County Airport",39.5668378,-76.2024028,409,-5,"A"\r\n"10C","Galt Field Airport",42.4028889,-88.3751111,875,-6,"U"\r\n"17G","Port Bucyrus-Crawford County Airport",40.7815556,-82.9748056,1003,-5,"A"\r\n"19A","Jackson County Airpo

Dropping the middle man

In [27]:
file_path = 'airports.csv'

airports = spark.read.csv(file_path, header=True)

airports.show()

type(airports)

spark.catalog.listDatabases()

spark.catalog.listTables()

+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford County Ai...

[Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

Manipulatin data

Creating columns

In [28]:
flights = spark.read.csv('flights_small.csv', header=True)

flights.show()

flights.name = flights.createOrReplaceTempView('flights')

spark.catalog.listTables()

flights_1 = spark.table('flights')

print(flights_1.show())

flights = flights.withColumn('durations_hrs', flights.air_time / 60)

flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

Filtering Data

In [29]:
long_flights1 = flights.filter('distance > 1000')

long_flights2 = flights.filter(flights.distance > 1000)

print(long_flights1.show())
print(long_flights2.show())

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|     durations_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|               6.0|
|2014|    4| 19|    1236|       -4|    1508|       -7|     AS| N309AS|   490|   SEA| SAN|     135|    1050|  12|    36|              2.25|
|2014|   11| 19|    1812|       -3|    2352|       -4|     AS| N564AS|    26|   SEA| ORD|     198|    1721|  18|    12|               3.3|
|2014|    8|  3|    1120|        0|    1415|        2|     AS| N305AS|   656|   SEA| PHX|     154|    1107|  11|    20| 2.566666666666667|
|2014|   11| 12|    2346|  

Selecting

In [31]:
selected1 = flights.select("tailnum", "origin", "dest")

temp = flights.select(flights.origin, flights.dest, flights.carrier)

filterA = flights.origin == 'SEA'

filterB = flights.dest == 'PDX'

selected2 = temp.filter(filterA).filter(filterB)

selected2.show()

+------+----+-------+
|origin|dest|carrier|
+------+----+-------+
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     AS|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     AS|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
|   SEA| PDX|     OO|
+------+----+-------+
only showing top 20 rows



Selecting II

In [37]:
avg_speed = (flights.distance/(flights.air_time / 60)).alias("avg_speed")

speed1 = flights.select("origin", "dest", "tailnum", avg_speed)

speed2 = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) AS avg_speed")

speed1.show()
speed2.show()

+------+----+-------+------------------+
|origin|dest|tailnum|         avg_speed|
+------+----+-------+------------------+
|   SEA| LAX| N846VA| 433.6363636363636|
|   SEA| HNL| N559AS| 446.1666666666667|
|   SEA| SFO| N847VA|367.02702702702703|
|   PDX| SJC| N360SW| 411.3253012048193|
|   SEA| BUR| N612AS| 442.6771653543307|
|   PDX| DEN| N646SW|491.40495867768595|
|   PDX| OAK| N422WN|             362.0|
|   SEA| SFO| N361VA| 415.7142857142857|
|   SEA| SAN| N309AS| 466.6666666666667|
|   SEA| ORD| N564AS| 521.5151515151515|
|   SEA| LAX| N323AS| 440.3076923076923|
|   SEA| PHX| N305AS|431.29870129870125|
|   SEA| LAS| N433AS| 409.6062992125984|
|   SEA| ANC| N765AS|474.75409836065575|
|   SEA| SFO| N713AS| 315.8139534883721|
|   PDX| SFO| N27205| 366.6666666666667|
|   SEA| SMF| N626AS|477.63157894736844|
|   SEA| MDW| N8634A|481.38888888888886|
|   SEA| BOS| N597AS| 516.4137931034483|
|   PDX| BUR| N215AG| 441.6216216216216|
+------+----+-------+------------------+
only showing top

Aggregating

In [38]:
flights.describe()

flights = flights.withColumn("distance", flights.distance.cast("float"))

flights = flights.withColumn("air_time", flights.air_time.cast("float"))

flights.describe('air_time', 'distance').show()

flights.filter(flights.origin == "PDX").groupBy().min("distance").show()

flights.filter(flights.origin == "SEA").groupby().max("air_time").show()

+-------+------------------+-----------------+
|summary|          air_time|         distance|
+-------+------------------+-----------------+
|  count|              9925|            10000|
|   mean|152.88423173803525|        1208.1516|
| stddev|  72.8656286392139|656.8599023464376|
|    min|              20.0|             93.0|
|    max|             409.0|           2724.0|
+-------+------------------+-----------------+

+-------------+
|min(distance)|
+-------------+
|        106.0|
+-------------+

+-------------+
|max(air_time)|
+-------------+
|        409.0|
+-------------+



Aggregating II

In [39]:
flights.filter(flights.carrier == "DL")\
       .filter(flights.origin == "SEA")\
       .groupBy().avg('air_time')\
       .show()

flights.withColumn("duration_hrs", flights.air_time/60).groupBy().sum("duration_hrs").show()

+------------------+
|     avg(air_time)|
+------------------+
|188.20689655172413|
+------------------+

+------------------+
| sum(duration_hrs)|
+------------------+
|25289.600000000126|
+------------------+



Grouping and Aggregating I

In [40]:
flights.show()

by_plane = flights.groupBy("tailnum")

by_plane.count().show()

by_origin = flights.groupBy("origin")

by_origin.avg("air_time").show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|     durations_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|   132.0|   954.0|   6|    58|               2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|   360.0|  2677.0|  10|    40|               6.0|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|   111.0|   679.0|  14|    43|              1.85|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|    83.0|   569.0|  17|     5|1.3833333333333333|
|2014|    3|  9|     754|  

Grouping and Aggregating II

In [None]:
flights = flights.withColumn("dep_delay", flights.dep_deplay.cast("float"))

import pyspark.sql.functions as F

by_month_dest = flights.groupBy("month", "dest")

by_month_dest.avg("dep_deplay").show()

by_month_dest.agg(F.stddev("dep_deplay")).show()

Joining II

In [41]:
airports.show()

airports = airports.withColumnRenamed("faa", "dest")

flights_with_airports = flights.join(airports, on="dest", how="leftouter")

print(flights_with_airports.show())

+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford County Ai...

Getting started with machine learning pipelines

Join the DataFrames

In [43]:
files.upload()
planes = spark.read.csv('planes.csv', header=True)
planes.show()

planes = planes.withColumnRenamed("year", "plane_year")

model_data = flights.join(planes, on="tailnum", how="leftouter")

model_data.show()

Saving planes.csv to planes.csv
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N105UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N107US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N108UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N109UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N110UW|1999|Fixed wing multi ...|AIRBUS INDUST

String to integer

In [45]:
model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast("integer"))
model_data = model_data.withColumn("air_time", model_data.air_time.cast("integer"))
model_data = model_data.withColumn("month", model_data.month.cast("integer"))
model_data = model_data.withColumn("plane_year", model_data.plane_year.cast("integer"))

Create a new column

In [47]:
model_data = model_data.withColumn("plane_age", model_data.year - model_data.plane_year)

Making a boolean

In [51]:
model_data = model_data.withColumn("is_late", model_data.arr_delay > 0)

model_data = model_data.withColumn("label", model_data.is_late.cast("integer"))

model_data = model_data.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL")

Carrier

In [52]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [53]:
carr_indexer = StringIndexer(inputCol = "carrier", outputCol = "carrier_index")

carr_encoder = OneHotEncoder(inputCol = "carrier_index", outputCol="carrier_fact")

Destination

In [54]:
dest_indexer = StringIndexer(inputCol="dest", outputCol="dest_index")

dest_encoder = OneHotEncoder(inputCol="dest_index", outputCol="dest_fact")

Assemble a vector

In [55]:
from pyspark.ml.feature import VectorAssembler

vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")

Create the pipeline

In [57]:
from pyspark.ml import Pipeline

flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

Transform the Data

In [58]:
piped_data = flights_pipe.fit(model_data).transform(model_data)

piped_data.show()

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+------------------+----------+--------------------+--------------+-----------+-------+-----+-----+---------+---------+-------+-----+----------+---------------+-------------+--------------+--------------------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|     durations_hrs|plane_year|                type|  manufacturer|      model|engines|seats|speed|   engine|plane_age|is_late|label|dest_index|      dest_fact|carrier_index|  carrier_fact|            features|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+------------------+----------+--------------------+--------------+-----------+-------+-----+-----+---------+---------+-------+-----+----------+---------------+-------------+--------------+--------------------+
| N846VA|2014

Split the data

In [62]:
training, test = piped_data.randomSplit([.6, .4])

Create the modeler

In [61]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression()

Create the evaluator

In [64]:
import pyspark.ml.evaluation as evals

evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

Make a grid

In [65]:
import pyspark.ml.tuning as tune

grid = tune.ParamGridBuilder()

grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0,1])

grid = grid.build()

Make the validator

In [67]:
cv = tune.CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)

Fit the model(s)

In [68]:
training.show()

best_lr = lr.fit(training)

print(best_lr)

models = cv.fit(training)

best_lr = models.bestModel

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+------------------+----------+--------------------+----------------+--------+-------+-----+-----+---------+---------+-------+-----+----------+---------------+-------------+--------------+--------------------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|     durations_hrs|plane_year|                type|    manufacturer|   model|engines|seats|speed|   engine|plane_age|is_late|label|dest_index|      dest_fact|carrier_index|  carrier_fact|            features|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+------------------+----------+--------------------+----------------+--------+-------+-----+-----+---------+---------+-------+-----+----------+---------------+-------------+--------------+--------------------+
| N102UW|2014|  

Evaluate the model

In [71]:
test_results = best_lr.transform(test)

print(evaluator.evaluate(test_results))

0.691749130036517
