Logistic Regression Example with MLlib and Spark ML
====

Start up Spark
-------------

In [None]:
import os
import sys

spark_home = '/opt/spark'
os.environ['SPARK_HOME'] = spark_home

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.1-src.zip'))

execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))

Spark MLlib
----------


In [None]:
rdd = sc.textFile("data/logreg.txt")
rdd

In [None]:
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.regression import LabeledPoint

def parsePoint(line):
    values = [float(s) for s in line.split(' ')]
    return LabeledPoint(values[0], DenseVector(values[1:]))

points = rdd.map(parsePoint)
points.collect()[0:10]

In [None]:
from pyspark.mllib.classification import LogisticRegressionWithSGD

model = LogisticRegressionWithSGD.train(points, 100)

Let's predict:

In [None]:
model.predict([0.7,0.6])

In [None]:
model.setThreshold(0.8)
model.predict([0.8,0.6])

In [None]:
model.predict([100,0.6])

Spark ML
-------

Read training and test data. In this case test data is labeled as well (we will generate our label based on the `arrdelay` field) 

In [1]:
training = sqlContext.read.parquet("data/training.parquet")
test = sqlContext.read.parquet("data/test.parquet")


In [2]:
test.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- dayofmonth: integer (nullable = true)
 |-- dayofweek: integer (nullable = true)
 |-- deptime: integer (nullable = true)
 |-- crsdeptime: integer (nullable = true)
 |-- arrtime: integer (nullable = true)
 |-- crsarrtime: integer (nullable = true)
 |-- actualelapsetime: integer (nullable = true)
 |-- crselapsetime: integer (nullable = true)
 |-- airtime: integer (nullable = true)
 |-- arrdelay: integer (nullable = true)
 |-- depdelay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- taxiin: integer (nullable = true)
 |-- taxiout: integer (nullable = true)
 |-- cancelled: integer (nullable = true)
 |-- diverted: integer (nullable = true)
 |-- carrierdelay: integer (nullable = true)
 |-- weatherdelay: integer (nullable = true)
 |-- nasdelay: integer (nullable = true)
 |-- securitydelay: integer (nullable = true)
 |-- lateaircraftdelay: integer (nullable = true)



In [3]:
test.first()

Row(year=2006, month=2, dayofmonth=21, dayofweek=2, deptime=902, crsdeptime=905, arrtime=1027, crsarrtime=1030, actualelapsetime=205, crselapsetime=205, airtime=190, arrdelay=-3, depdelay=-3, distance=1162, taxiin=7, taxiout=8, cancelled=0, diverted=0, carrierdelay=0, weatherdelay=0, nasdelay=0, securitydelay=0, lateaircraftdelay=0)

In [5]:
training.toPandas()

Unnamed: 0,year,month,dayofmonth,dayofweek,deptime,crsdeptime,arrtime,crsarrtime,actualelapsetime,crselapsetime,...,distance,taxiin,taxiout,cancelled,diverted,carrierdelay,weatherdelay,nasdelay,securitydelay,lateaircraftdelay
0,2006,8,15,2,1430,1430,1606,1615,96,105,...,569,3,9,0,0,0,0,0,0,0
1,2006,4,2,7,1720,1635,1918,1830,58,55,...,162,6,15,0,0,0,0,6,0,42
2,2006,1,17,2,1427,1430,1538,1550,71,80,...,342,4,12,0,0,0,0,0,0,0
3,2003,6,8,7,1048,1050,1446,1502,178,192,...,1235,11,22,0,0,0,0,0,0,0
4,2005,11,27,7,1350,1316,1606,1534,76,78,...,264,6,24,0,0,0,32,0,0,0
5,2007,9,19,3,1851,1845,2014,2010,83,85,...,397,5,11,0,0,0,0,0,0,0
6,2007,10,13,6,1557,1600,1829,1847,212,227,...,1449,5,20,0,0,0,0,0,0,0
7,2004,4,9,5,1505,1506,1645,1655,100,109,...,632,9,10,0,0,0,0,0,0,0
8,2004,7,28,3,1445,1455,1558,1617,73,82,...,317,8,8,0,0,0,0,0,0,0
9,2003,10,3,5,2109,2054,2243,2231,94,97,...,501,4,21,0,0,0,0,0,0,0


Generate label column for the training data

In [6]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf

is_late = udf(lambda delay: 1.0 if delay > 0 else 0.0, DoubleType())
training = training.withColumn("is_late",is_late(training.arrdelay))


Create and fit Spark ML model

In [7]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

# Create feature vectors. Ignore arr_delay and it's derivate, is_late
feature_assembler = VectorAssembler(
    inputCols=[x for x in training.columns if x not in ["is_late","arrdelay"]],
    outputCol="features")

reg = LogisticRegression().setParams(
    maxIter = 100,
    labelCol="is_late",
    predictionCol="prediction")

model = Pipeline(stages=[feature_assembler, reg]).fit(training)


PipelineModel_47d29576b05f66dbee86

Predict whether the aircraft will be late

In [9]:
predicted = model.transform(test)



In [11]:
predicted.toPandas()

Unnamed: 0,year,month,dayofmonth,dayofweek,deptime,crsdeptime,arrtime,crsarrtime,actualelapsetime,crselapsetime,...,diverted,carrierdelay,weatherdelay,nasdelay,securitydelay,lateaircraftdelay,features,rawPrediction,probability,prediction
0,2006,2,21,2,902,905,1027,1030,205,205,...,0,0,0,0,0,0,"[2006.0, 2.0, 21.0, 2.0, 902.0, 905.0, 1027.0,...","[0.75595747136, -0.75595747136]","[0.680475414344, 0.319524585656]",0
1,2005,7,26,2,2147,2130,2338,2340,111,130,...,0,0,0,0,0,0,"[2005.0, 7.0, 26.0, 2.0, 2147.0, 2130.0, 2338....","[0.100384301163, -0.100384301163]","[0.52507502206, 0.47492497794]",0
2,2006,2,16,4,1038,1025,1346,1353,128,148,...,0,0,0,0,0,0,"[2006.0, 2.0, 16.0, 4.0, 1038.0, 1025.0, 1346....","[0.594745842258, -0.594745842258]","[0.644453319858, 0.355546680142]",0
3,2006,11,3,5,1131,1130,1545,1550,194,200,...,0,0,0,0,0,0,"[2006.0, 11.0, 3.0, 5.0, 1131.0, 1130.0, 1545....","[0.336503298729, -0.336503298729]","[0.583340883132, 0.416659116868]",0
4,2005,2,10,4,1438,1435,1846,1805,188,150,...,0,0,0,41,0,0,"[2005.0, 2.0, 10.0, 4.0, 1438.0, 1435.0, 1846....","[-1.79676046654, 1.79676046654]","[0.142245868892, 0.857754131108]",1
5,2005,9,22,4,1324,1323,1622,1626,178,183,...,0,0,0,0,0,0,"[2005.0, 9.0, 22.0, 4.0, 1324.0, 1323.0, 1622....","[0.434301696981, -0.434301696981]","[0.606900404754, 0.393099595246]",0
6,2006,8,1,2,624,624,1052,1048,208,204,...,0,0,0,0,0,0,"[2006.0, 8.0, 1.0, 2.0, 624.0, 624.0, 1052.0, ...","[0.575209498954, -0.575209498954]","[0.639964368802, 0.360035631198]",0
7,2006,9,19,2,1354,1325,1651,1610,117,105,...,0,29,0,12,0,0,"[2006.0, 9.0, 19.0, 2.0, 1354.0, 1325.0, 1651....","[-0.826203375684, 0.826203375684]","[0.304448444363, 0.695551555637]",1
8,2007,5,7,1,1345,1345,1633,1623,168,158,...,0,0,0,0,0,0,"[2007.0, 5.0, 7.0, 1.0, 1345.0, 1345.0, 1633.0...","[0.212984822798, -0.212984822798]","[0.553045832689, 0.446954167311]",0
9,2008,12,3,3,1736,1715,1844,1815,68,60,...,0,0,0,8,0,21,"[2008.0, 12.0, 3.0, 3.0, 1736.0, 1715.0, 1844....","[-0.225210376018, 0.225210376018]","[0.44393417613, 0.55606582387]",1


Check model performance

In [12]:
predicted = predicted.withColumn("is_late",is_late(predicted.arrdelay))


In [13]:
predicted.crosstab("is_late","prediction").show()

+------------------+----+----+
|is_late_prediction| 1.0| 0.0|
+------------------+----+----+
|               1.0|1448|1110|
|               0.0|  62|2805|
+------------------+----+----+

