In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from modules.my_spark_regression import *
from modules.my_pyspark import *
from modules.my_drawer import MyDrawer

In [3]:
spark = MyPySpark(session=True, sql=True)
drawer = MyDrawer()

# 3. Xây dựng model

## 3.1. Chuẩn bị & chuẩn hóa dữ liệu, xác định input, output

* Đọc dữ liệu

In [4]:
file_path = r'./data/flights.csv'

In [5]:
data = spark.readFile(file_path)

In [6]:
data.printSchema()

root
 |-- mon: integer (nullable = true)
 |-- dom: integer (nullable = true)
 |-- dow: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- org: string (nullable = true)
 |-- mile: integer (nullable = true)
 |-- depart: double (nullable = true)
 |-- duration: integer (nullable = true)
 |-- delay: string (nullable = true)



In [7]:
data.head()

Row(mon=11, dom=20, dow=6, carrier='US', flight=19, org='JFK', mile=2153, depart=9.48, duration=351, delay='NA')

* Xác định input và output

In [8]:
data.columns

['mon',
 'dom',
 'dow',
 'carrier',
 'flight',
 'org',
 'mile',
 'depart',
 'duration',
 'delay']

* Chuẩn hoá dữ liệu từ mile sang km

In [9]:
from pyspark.sql.functions import round

In [10]:
data = data.withColumn('km', round(data.mile * 1.60934, 0))

In [11]:
data = data.withColumn('label', (data['delay'] >= 15).cast('integer'))

In [12]:
data.show(5)

+---+---+---+-------+------+---+----+------+--------+-----+------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|    km|label|
+---+---+---+-------+------+---+----+------+--------+-----+------+-----+
| 11| 20|  6|     US|    19|JFK|2153|  9.48|     351|   NA|3465.0| null|
|  0| 22|  2|     UA|  1107|ORD| 316| 16.33|      82|   30| 509.0|    1|
|  2| 20|  4|     UA|   226|SFO| 337|  6.17|      82|   -8| 542.0|    0|
|  9| 13|  1|     AA|   419|ORD|1236| 10.33|     195|   -5|1989.0|    0|
|  4|  2|  5|     AA|   325|ORD| 258|  8.92|      65|   NA| 415.0| null|
+---+---+---+-------+------+---+----+------+--------+-----+------+-----+
only showing top 5 rows



In [13]:
from pyspark.ml.feature import StringIndexer

In [14]:
indexer = StringIndexer(inputCol='carrier', outputCol='carrier_idx')
indexer_model = indexer.fit(data)
data_indexed = indexer_model.transform(data)
data_indexed = StringIndexer(inputCol='org', outputCol='org_idx').fit(data_indexed).transform(data_indexed)

In [15]:
data_indexed.show(5)

+---+---+---+-------+------+---+----+------+--------+-----+------+-----+-----------+-------+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|    km|label|carrier_idx|org_idx|
+---+---+---+-------+------+---+----+------+--------+-----+------+-----+-----------+-------+
| 11| 20|  6|     US|    19|JFK|2153|  9.48|     351|   NA|3465.0| null|        6.0|    2.0|
|  0| 22|  2|     UA|  1107|ORD| 316| 16.33|      82|   30| 509.0|    1|        0.0|    0.0|
|  2| 20|  4|     UA|   226|SFO| 337|  6.17|      82|   -8| 542.0|    0|        0.0|    1.0|
|  9| 13|  1|     AA|   419|ORD|1236| 10.33|     195|   -5|1989.0|    0|        1.0|    0.0|
|  4|  2|  5|     AA|   325|ORD| 258|  8.92|      65|   NA| 415.0| null|        1.0|    0.0|
+---+---+---+-------+------+---+----+------+--------+-----+------+-----+-----------+-------+
only showing top 5 rows



* Chuyển đổi dữ liệu

In [16]:
input_features = [
    'mon', 'dom', 'dow', 'carrier_idx', 'org_idx', 'km', 'depart', 'duration'
]

In [17]:
assembler = VectorAssembler(inputCols=input_features, outputCol='features')

In [18]:
data_pre = assembler.transform(data_indexed)

In [19]:
data_pre.show(3, False)

+---+---+---+-------+------+---+----+------+--------+-----+------+-----+-----------+-------+-----------------------------------------+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|km    |label|carrier_idx|org_idx|features                                 |
+---+---+---+-------+------+---+----+------+--------+-----+------+-----+-----------+-------+-----------------------------------------+
|11 |20 |6  |US     |19    |JFK|2153|9.48  |351     |NA   |3465.0|null |6.0        |2.0    |[11.0,20.0,6.0,6.0,2.0,3465.0,9.48,351.0]|
|0  |22 |2  |UA     |1107  |ORD|316 |16.33 |82      |30   |509.0 |1    |0.0        |0.0    |[0.0,22.0,2.0,0.0,0.0,509.0,16.33,82.0]  |
|2  |20 |4  |UA     |226   |SFO|337 |6.17  |82      |-8   |542.0 |0    |0.0        |1.0    |[2.0,20.0,4.0,0.0,1.0,542.0,6.17,82.0]   |
+---+---+---+-------+------+---+----+------+--------+-----+------+-----+-----------+-------+-----------------------------------------+
only showing top 3 rows



In [20]:
data_pre.select('features').show(3, False)

+-----------------------------------------+
|features                                 |
+-----------------------------------------+
|[11.0,20.0,6.0,6.0,2.0,3465.0,9.48,351.0]|
|[0.0,22.0,2.0,0.0,0.0,509.0,16.33,82.0]  |
|[2.0,20.0,4.0,0.0,1.0,542.0,6.17,82.0]   |
+-----------------------------------------+
only showing top 3 rows



* Chuyển đổi dữ liệu

In [21]:
final_data = data_pre.select('features', 'label')

In [22]:
final_data.count()

50000

In [23]:
final_data.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[11.0,20.0,6.0,6....| null|
|[0.0,22.0,2.0,0.0...|    1|
|[2.0,20.0,4.0,0.0...|    0|
|[9.0,13.0,1.0,1.0...|    0|
|[4.0,2.0,5.0,1.0,...| null|
|[5.0,2.0,1.0,0.0,...|    0|
|[7.0,2.0,6.0,1.0,...|    1|
|[1.0,16.0,6.0,0.0...|    0|
|[1.0,22.0,5.0,0.0...|    0|
|[11.0,8.0,1.0,2.0...|    1|
|[4.0,26.0,1.0,1.0...|    0|
|[4.0,25.0,0.0,1.0...|    1|
|[8.0,30.0,2.0,0.0...|    1|
|[3.0,16.0,3.0,0.0...|    0|
|[0.0,3.0,4.0,1.0,...|    1|
|[5.0,9.0,1.0,0.0,...|    1|
|[3.0,10.0,4.0,4.0...|    1|
|[11.0,15.0,1.0,1....|    1|
|[8.0,18.0,4.0,0.0...|    1|
|[2.0,14.0,5.0,4.0...|    0|
+--------------------+-----+
only showing top 20 rows



## 3.2. Chuẩn bị train/test data

In [24]:
train_data, test_data = final_data.randomSplit((0.8, 0.2))

In [29]:
train_data.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(8,[1,5,6,7],[6.0...|    1|
|(8,[1,5,6,7],[6.0...|    1|
|(8,[1,5,6,7],[6.0...|    1|
|(8,[1,5,6,7],[6.0...|    1|
|(8,[1,5,6,7],[6.0...|    1|
|(8,[1,5,6,7],[6.0...|    1|
|(8,[1,5,6,7],[6.0...|    1|
|(8,[1,5,6,7],[6.0...|    1|
|(8,[1,5,6,7],[6.0...|    1|
|(8,[1,5,6,7],[6.0...|    1|
|(8,[1,5,6,7],[6.0...|    1|
|(8,[1,5,6,7],[6.0...|    1|
|(8,[1,5,6,7],[6.0...|    1|
|(8,[1,5,6,7],[6.0...|    1|
|(8,[1,5,6,7],[6.0...|    1|
|(8,[1,5,6,7],[13....|    1|
|(8,[1,5,6,7],[13....|    0|
|(8,[1,5,6,7],[13....|    0|
|(8,[1,5,6,7],[13....|    0|
|(8,[1,5,6,7],[13....|    0|
+--------------------+-----+
only showing top 20 rows



In [25]:
# train_data.describe().show()

In [26]:
# test_data.describe().show()

> * Dữ liệu train và test gần như tương đương, ko có sự chênh lệch cao về mặt thống kê

## 3.3. Xây dựng model

* Tạo model Linear Regression

In [27]:
logistic = LogisticRegression(featuresCol='features', labelCol='label', predictionCol='prediction')

* Fit model với data và gán model cho một biến nào đó

In [28]:
logisticModel = logistic.fit(train_data)

Py4JJavaError: An error occurred while calling o152.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 1 times, most recent failure: Lost task 0.0 in stage 14.0 (TID 14) (192.168.43.86 executor driver): scala.MatchError: [null,1.0,(8,[1,5,6,7],[13.0,1157.0,21.25,124.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$1(Predictor.scala:81)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:219)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:219)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1429)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$3(RDD.scala:1230)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$5(RDD.scala:1231)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2297)
	at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1183)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1177)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1222)
	at org.apache.spark.ml.stat.Summarizer$.getClassificationSummarizers(Summarizer.scala:232)
	at org.apache.spark.ml.classification.LogisticRegression.$anonfun$train$1(LogisticRegression.scala:510)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:494)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:285)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:115)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: scala.MatchError: [null,1.0,(8,[1,5,6,7],[13.0,1157.0,21.25,124.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$1(Predictor.scala:81)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:219)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:219)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1429)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$3(RDD.scala:1230)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$5(RDD.scala:1231)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	... 1 more


* In ra coefficients và intercept

In [20]:
lrModel.coefficients, lrModel.intercept

(DenseVector([0.1216]), 44.440062415664244)

## 3.4. Đánh giá model vs test data

In [21]:
test_results = lrModel.evaluate(test_data)

* Đánh giá phần dư

In [22]:
test_results.residuals.show()

+-------------------+
|          residuals|
+-------------------+
| -9.586394736421823|
| -6.586394736421823|
| -6.586394736421823|
| -6.586394736421823|
| -6.586394736421823|
| -5.586394736421823|
| -5.586394736421823|
| -5.586394736421823|
| -4.586394736421823|
| -4.586394736421823|
| -4.586394736421823|
| -4.586394736421823|
| -4.586394736421823|
|-3.5863947364218234|
|-3.5863947364218234|
|-3.5863947364218234|
|-2.5863947364218234|
|-1.5863947364218234|
|-1.5863947364218234|
|-1.5863947364218234|
+-------------------+
only showing top 20 rows



* Đánh giá RMSE

In [23]:
test_results.rootMeanSquaredError

17.003770681934345

In [33]:
from pyspark.ml.evaluation import RegressionEvaluator

In [34]:
RegressionEvaluator(labelCol='duration').evaluate(test_model)

17.003770681934345

* Đánh giá mean squared error

In [24]:
test_results.meanSquaredError

289.12821740380997

* Đánh giá $R^2$

In [25]:
test_results.r2

0.9616552296008894

## 3.5. Đánh giá model vs test data

In [26]:
test_model = lrModel.transform(test_data)
test_model.select('prediction', 'duration').show()

+-----------------+--------+
|       prediction|duration|
+-----------------+--------+
|52.58639473642182|      43|
|52.58639473642182|      46|
|52.58639473642182|      46|
|52.58639473642182|      46|
|52.58639473642182|      46|
|52.58639473642182|      47|
|52.58639473642182|      47|
|52.58639473642182|      47|
|52.58639473642182|      48|
|52.58639473642182|      48|
|52.58639473642182|      48|
|52.58639473642182|      48|
|52.58639473642182|      48|
|52.58639473642182|      49|
|52.58639473642182|      49|
|52.58639473642182|      49|
|52.58639473642182|      50|
|52.58639473642182|      51|
|52.58639473642182|      51|
|52.58639473642182|      51|
+-----------------+--------+
only showing top 20 rows



## 3.6. Lưu trữ & tải model

* Lưu model

In [27]:
file_path1 = r'./data/lrModel_flight'

In [28]:
lrModel.save(file_path1)

* Tải model

In [29]:
from pyspark.ml.regression import LinearRegressionModel

In [30]:
lrModel2 = LinearRegressionModel.load(file_path1)

## 3.7. Dự đoán dữ liệu mới

In [31]:
unlabeled_data = test_data.select('features')
preditions = lrModel2.transform(unlabeled_data)

In [32]:
preditions.show()

+--------+-----------------+
|features|       prediction|
+--------+-----------------+
|  [67.0]|52.58639473642182|
|  [67.0]|52.58639473642182|
|  [67.0]|52.58639473642182|
|  [67.0]|52.58639473642182|
|  [67.0]|52.58639473642182|
|  [67.0]|52.58639473642182|
|  [67.0]|52.58639473642182|
|  [67.0]|52.58639473642182|
|  [67.0]|52.58639473642182|
|  [67.0]|52.58639473642182|
|  [67.0]|52.58639473642182|
|  [67.0]|52.58639473642182|
|  [67.0]|52.58639473642182|
|  [67.0]|52.58639473642182|
|  [67.0]|52.58639473642182|
|  [67.0]|52.58639473642182|
|  [67.0]|52.58639473642182|
|  [67.0]|52.58639473642182|
|  [67.0]|52.58639473642182|
|  [67.0]|52.58639473642182|
+--------+-----------------+
only showing top 20 rows



In [1]:
150/165

0.9090909090909091

In [2]:
100/110

0.9090909090909091

In [3]:
100/105

0.9523809523809523