In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [3]:
sc = SparkContext(master='local',appName='Chapter 6')
spark = SparkSession(sc)

In [4]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import corr
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [5]:
data = spark.read.csv('./flights.csv',inferSchema=True,header=True)

In [6]:
data.printSchema()

root
 |-- mon: integer (nullable = true)
 |-- dom: integer (nullable = true)
 |-- dow: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- org: string (nullable = true)
 |-- mile: integer (nullable = true)
 |-- depart: double (nullable = true)
 |-- duration: integer (nullable = true)
 |-- delay: string (nullable = true)



In [7]:
data.toPandas().head()

Unnamed: 0,mon,dom,dow,carrier,flight,org,mile,depart,duration,delay
0,11,20,6,US,19,JFK,2153,9.48,351,
1,0,22,2,UA,1107,ORD,316,16.33,82,30.0
2,2,20,4,UA,226,SFO,337,6.17,82,-8.0
3,9,13,1,AA,419,ORD,1236,10.33,195,-5.0
4,4,2,5,AA,325,ORD,258,8.92,65,


In [8]:
from pyspark.sql.functions import round

In [9]:
#convert from mile to km
data = data.withColumn('km',round(data.mile*1.60934,0))

In [10]:
data = data.withColumn('label',(data.delay >= 15).cast('int'))

In [11]:
data.toPandas().head(3)

Unnamed: 0,mon,dom,dow,carrier,flight,org,mile,depart,duration,delay,km,label
0,11,20,6,US,19,JFK,2153,9.48,351,,3465.0,
1,0,22,2,UA,1107,ORD,316,16.33,82,30.0,509.0,1.0
2,2,20,4,UA,226,SFO,337,6.17,82,-8.0,542.0,0.0


In [12]:
# Chuan hoa du lieu

In [13]:
indexer = StringIndexer(inputCol='carrier',outputCol='carrier_idx')

In [14]:
indexer_model = indexer.fit(data)

In [15]:
data_indexed = indexer_model.transform(data)

In [16]:
data_indexed = StringIndexer(inputCol='org',outputCol='org_idx').fit(data_indexed).transform(data_indexed)

In [17]:
data_indexed.toPandas().head()

Unnamed: 0,mon,dom,dow,carrier,flight,org,mile,depart,duration,delay,km,label,carrier_idx,org_idx
0,11,20,6,US,19,JFK,2153,9.48,351,,3465.0,,6.0,2.0
1,0,22,2,UA,1107,ORD,316,16.33,82,30.0,509.0,1.0,0.0,0.0
2,2,20,4,UA,226,SFO,337,6.17,82,-8.0,542.0,0.0,0.0,1.0
3,9,13,1,AA,419,ORD,1236,10.33,195,-5.0,1989.0,0.0,1.0,0.0
4,4,2,5,AA,325,ORD,258,8.92,65,,415.0,,1.0,0.0


In [18]:
data_indexed.columns

['mon',
 'dom',
 'dow',
 'carrier',
 'flight',
 'org',
 'mile',
 'depart',
 'duration',
 'delay',
 'km',
 'label',
 'carrier_idx',
 'org_idx']

In [19]:
from pyspark.sql.functions import isnull,col

In [20]:
data_unknow = data_indexed.filter(isnull(col('label')))
data_unknow.count()

2978

In [44]:
data_unknow.toPandas().head(3)

Unnamed: 0,mon,dom,dow,carrier,flight,org,mile,depart,duration,delay,km,label,carrier_idx,org_idx
0,11,20,6,US,19,JFK,2153,9.48,351,,3465.0,,6.0,2.0
1,4,2,5,AA,325,ORD,258,8.92,65,,415.0,,1.0,0.0
2,0,8,2,UA,549,ORD,334,11.08,85,,538.0,,0.0,0.0


In [21]:
data_indexed = data_indexed.filter(~isnull(col('label')))

In [47]:
data_indexed.toPandas().head(3)

Unnamed: 0,mon,dom,dow,carrier,flight,org,mile,depart,duration,delay,km,label,carrier_idx,org_idx
0,0,22,2,UA,1107,ORD,316,16.33,82,30,509.0,1,0.0,0.0
1,2,20,4,UA,226,SFO,337,6.17,82,-8,542.0,0,0.0,1.0
2,9,13,1,AA,419,ORD,1236,10.33,195,-5,1989.0,0,1.0,0.0


In [48]:
data_indexed.count()

47022

In [22]:
assembler = VectorAssembler( inputCols=['mon',
 'dom',
 'dow',
 'depart',
 'duration',
 'km',
 'carrier_idx',
 'org_idx'], outputCol='features')

In [23]:
data_pre = assembler.transform(data_indexed)

In [96]:
data_unknow = assembler.transform(data_unknow)

In [24]:
data_pre[['features']].show(2,False)

+---------------------------------------+
|features                               |
+---------------------------------------+
|[0.0,22.0,2.0,16.33,82.0,509.0,0.0,0.0]|
|[2.0,20.0,4.0,6.17,82.0,542.0,0.0,1.0] |
+---------------------------------------+
only showing top 2 rows



In [49]:
final_data = data_pre.select('features','label')

In [52]:
final_data.show(5,truncate=False)

+-----------------------------------------+-----+
|features                                 |label|
+-----------------------------------------+-----+
|[0.0,22.0,2.0,16.33,82.0,509.0,0.0,0.0]  |1    |
|[2.0,20.0,4.0,6.17,82.0,542.0,0.0,1.0]   |0    |
|[9.0,13.0,1.0,10.33,195.0,1989.0,1.0,0.0]|0    |
|[5.0,2.0,1.0,7.98,102.0,885.0,0.0,1.0]   |0    |
|[7.0,2.0,6.0,10.83,135.0,1180.0,1.0,0.0] |1    |
+-----------------------------------------+-----+
only showing top 5 rows



In [51]:
train_data, test_data = final_data.randomSplit([0.8,0.2])

In [53]:
from pyspark.ml.classification import LogisticRegression

In [54]:
lr = LogisticRegression(featuresCol='features',
                      labelCol='label',
                      predictionCol='prediction')

In [55]:
lrModel = lr.fit(train_data)

In [56]:
print('coefficients: {} intercept: {}'.format(lrModel.coefficients,lrModel.intercept))

coefficients: [-0.059535553300265845,-0.0007554668337151718,-0.0045353938231821505,0.07674386210550642,0.008977784732853131,-0.0005800588319815932,-0.04900711501279269,-0.126871833386621] intercept: -0.9241484816286799


In [57]:
test_results = lrModel.evaluate(test_data)

In [35]:
#check

In [39]:
#predict

In [61]:
test_model = lrModel.transform(test_data)

In [64]:
test_model.groupBy('label','prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 1656|
|    0|       0.0| 2534|
|    1|       1.0| 3115|
|    0|       1.0| 2040|
+-----+----------+-----+



In [66]:
test_model.filter(col('label')==col('prediction')).count()/test_model.count()

0.604494382022472

In [72]:
TN = test_model.filter((col('prediction')==0) & (col('label')==col('prediction'))).count()
TP = test_model.filter((col('prediction')==1) & (col('label')==col('prediction'))).count()
FN = test_model.filter((col('prediction')==0) & (col('label')!=col('prediction'))).count()
FP = test_model.filter((col('prediction')==1) & (col('label')!=col('prediction'))).count()

In [73]:
precision = TP/(TP+FP)
precision

0.6042677012609118

In [74]:
recall = TP/(TP+FN)
recall

0.6529029553552714

In [63]:
test_model.select('prediction','label').show()

+----------+-----+
|prediction|label|
+----------+-----+
|       0.0|    1|
|       1.0|    1|
|       1.0|    1|
|       1.0|    1|
|       1.0|    1|
|       1.0|    0|
|       1.0|    0|
|       1.0|    1|
|       1.0|    0|
|       1.0|    1|
|       1.0|    0|
|       1.0|    1|
|       1.0|    1|
|       1.0|    0|
|       1.0|    1|
|       1.0|    0|
|       1.0|    0|
|       1.0|    1|
|       1.0|    1|
|       0.0|    1|
+----------+-----+
only showing top 20 rows



In [75]:
# Danh gia ket qua

In [76]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

In [77]:
multi_evaluator = MulticlassClassificationEvaluator()

In [42]:
#Luu tru & model

In [90]:
lrModel.save('Logistic_flights')

In [91]:
from pyspark.ml.classification import LogisticRegressionModel

In [94]:
lrModel2 = LogisticRegressionModel.load('Logistic_flights/')

In [None]:
#Predict new values

In [97]:
unlabel_data = data_unknow.select('features')

In [98]:
predictions = lrModel2.transform(unlabel_data)

In [99]:
predictions.show(5)

+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|[11.0,20.0,6.0,9....|[0.30031722321956...|[0.57452006283245...|       0.0|
|[4.0,2.0,5.0,8.92...|[0.20809887028166...|[0.55183778224905...|       0.0|
|[0.0,8.0,2.0,11.0...|[-0.3620990388706...|[0.41045154295701...|       1.0|
|[5.0,8.0,0.0,14.4...|[-0.3733638958943...|[0.40772843648905...|       1.0|
|[1.0,13.0,3.0,20....|[-0.3897458765290...|[0.40377847725188...|       1.0|
+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [102]:
predictions.groupBy(col('prediction')).count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0| 1010|
|       1.0| 1968|
+----------+-----+

