In [4]:
from pyspark.sql import SparkSession


In [5]:
spark = SparkSession.builder.appName("secondApp").getOrCreate()

In [None]:
spark.catalog.listTables() # catalog của của cluster 

[]

Download data: flights.csv from [HERE](https://www.kaggle.com/code/miquar/explore-flights-csv-airports-csv-airlines-csv/data)

In [60]:
df = spark.read.csv("data/flights.csv", header=True, inferSchema=True) # local spark DataFrame  # option("inferschema", "True").
df.show(5)

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

In [61]:
df.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (null

To save local spark DataFrame to cluster (catalog contains table) => save it as temporary table: 
- df.createTempView(table_name): create 1 temporary table, only exists in Session 
- df.createOrReplaceTempView(table_name)

- df.createDataFrame(): crete a spark DataFrame from 1 pandas DataFrame 

In [62]:
df.createOrReplaceTempView('flights_temp')
spark.catalog.listTables()


[Table(name='flights_temp', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

### 2.4. Các lệnh biến đổi dữ liệu của spark DataFrame
- .withColumn(“newColumnName”, formular): Thêm cột mới giá trị đc tính theo formular 
- .withColumnRenamed(“oldColumnName”, “newColumnName”): thay đổi tên cột 
- .select(“column1”, “column2”, … , “columnt”, formular.alias("new_col_name")): tạo 1 cột mới new_col_name với giá trị đc tính theo formular dựa trên các cột đc select 
- df.selectExpr(): tương tự như .select nhưng mà cách viết khác 
- .filter(condition)

In [63]:
df = df.withColumn('HOUR_ARR', df.AIR_TIME/60)
df.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (null

In [None]:
df1 = df.select("ORIGIN_AIRPORT", "DESTINATION_AIRPORT", "TAIL_NUMBER", (df.DISTANCE/df.HOUR_ARR).alias("avg_speed"))
df1.show(5)

+--------------+-------------------+-----------+-----------------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|TAIL_NUMBER|         avg_hour|
+--------------+-------------------+-----------+-----------------+
|           ANC|                SEA|     N407AS|514.0828402366864|
|           LAX|                PBI|     N3KUAA|531.5589353612166|
|           SFO|                CLT|     N171US|517.8947368421052|
|           LAX|                MIA|     N3HYAA|544.6511627906978|
|           SEA|                ANC|     N527AS|436.5829145728643|
+--------------+-------------------+-----------+-----------------+
only showing top 5 rows


In [64]:
df2 = df.selectExpr("ORIGIN_AIRPORT", "DESTINATION_AIRPORT", "TAIL_NUMBER", "DISTANCE/HOUR_ARR as avg_speed")
df2.show(5)

+--------------+-------------------+-----------+-----------------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|TAIL_NUMBER|        avg_speed|
+--------------+-------------------+-----------+-----------------+
|           ANC|                SEA|     N407AS|514.0828402366864|
|           LAX|                PBI|     N3KUAA|531.5589353612166|
|           SFO|                CLT|     N171US|517.8947368421052|
|           LAX|                MIA|     N3HYAA|544.6511627906978|
|           SEA|                ANC|     N527AS|436.5829145728643|
+--------------+-------------------+-----------+-----------------+
only showing top 5 rows


In [65]:
df3 = df.filter("DESTINATION_AIRPORT == 'SEA'").filter("TAIL_NUMBER == 'N407AS'")
df3.show(5)

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+------------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|          HOUR_ARR|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+

In [69]:
df5 = df.groupBy("ORIGIN_AIRPORT").sum("ARRIVAL_TIME")
df5.show(5)

+--------------+-----------------+
|ORIGIN_AIRPORT|sum(ARRIVAL_TIME)|
+--------------+-----------------+
|           BGM|           337361|
|           PSE|           524900|
|           INL|           545165|
|           MSY|         57817756|
|           PPG|            64058|
+--------------+-----------------+
only showing top 5 rows


In [None]:
df.join("abc", on = "Id", how = "leftouter")

### 2.6. các lệnh biến đổi dữ liệu của spark.sql()
- CRUD 
- left join, right join, inner join, outer join.


In [70]:
f6 = spark.sql("select * from flights_temp")
f6.show(5)

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

In [71]:
df7 = spark.sql("select ORIGIN_AIRPORT, sum(ARRIVAL_TIME) as DELAY_TIME from flights_temp group by ORIGIN_AIRPORT")
df7.show(5)

+--------------+----------+
|ORIGIN_AIRPORT|DELAY_TIME|
+--------------+----------+
|           BGM|    337361|
|           PSE|    524900|
|           INL|    545165|
|           MSY|  57817756|
|           PPG|     64058|
+--------------+----------+
only showing top 5 rows


### 3. Xây dựng pipeline End-to-End model trên pyspark
- Classification: 
    - LogisticRegression, 
    - DecisionTreeClassifier, 
    - RandomForestModel, 
    - GBTClassifier (gradient bosting tree), 
    - MultilayerPerceptronClassifier, 
    - LinearSVC (Linear Support Vector Machine), 
    - NaiveBayes.

- Regression: 
    - GeneralizedLinearRegression, 
    - DecisionTreeRegressor, 
    - RandomForestRegressor, 
    - GBTRegressor (gradient boosting Tree), 
    - AFTSurvivalRegression (Hồi qui đối với các lớp bài toán estimate survival).

### 3.1. Xây dựng pipeline biến đổi dữ liệu.

Filter data to get sample of data 

In [9]:
print('Shape of previous data: ({}, {})'.format(df.count(), len(df.columns)))
flights_SEA = spark.sql("select ARRIVAL_DELAY, ARRIVAL_TIME, MONTH, YEAR, DAY_OF_WEEK, DESTINATION_AIRPORT, AIRLINE from flights_temp where ORIGIN_AIRPORT = 'SEA' and AIRLINE in ('DL', 'AA') ")
print('Shape of flights_SEA data: ({}, {})'.format(flights_SEA.count(), len(flights_SEA.columns)))

Shape of previous data: (5819079, 31)
Shape of flights_SEA data: (19956, 7)


In [10]:
# Create boolean variable IS_DELAY variable as Target
flights_SEA = flights_SEA.withColumn("IS_DELAY", df.ARRIVAL_DELAY > 0)

# Now Convert Boolean variable into integer
flights_SEA = flights_SEA.withColumn("label", flights_SEA.IS_DELAY.cast("integer")) # classification ALWAYS consider label column as the depedent value 
# Remove missing value
model_data = flights_SEA.filter("ARRIVAL_DELAY is not null \
                                and ARRIVAL_TIME is not null \
                                and MONTH is not null \
                                and YEAR is not null  \
                                and DAY_OF_WEEK is not null \
                                and DESTINATION_AIRPORT is not null \
                                and AIRLINE is not null")

print('Shape of model_data data: ({}, {})'.format(model_data.count(), len(model_data.columns)))

Shape of model_data data: (19823, 9)


In [11]:
flights_SEA.printSchema()

root
 |-- ARRIVAL_DELAY: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- IS_DELAY: boolean (nullable = true)
 |-- label: integer (nullable = true)



Change string variables with StringIndexer, OneHotEncoder 

In [22]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

# DESTINATION_AIRPORT: StringIndexer -> OneHotEncoder
dest_indexer = StringIndexer(inputCol="DESTINATION_AIRPORT", outputCol="DESTINATION_INDEX")
dest_onehot = OneHotEncoder(inputCol="DESTINATION_INDEX", outputCol = "DESTINATION_FACT")

# AIRLINE
airline_indexer = StringIndexer(inputCol='AIRLINE', outputCol='AIRLINE_INDEX')
airline_onehot = OneHotEncoder(inputCol='AIRLINE_INDEX', outputCol='AIRLINE_FACT')



Kết hợp toàn bộ các columns chứa các features thành một column duy nhất. Spark model only accept this type of variable 

In [26]:
from pyspark.ml.feature import VectorAssembler

vec_assembler = VectorAssembler(inputCols=["ARRIVAL_TIME", "MONTH", "YEAR", "DAY_OF_WEEK", "DESTINATION_FACT","AIRLINE_FACT"], outputCol="features")

In [27]:
from pyspark.ml import Pipeline

flights_sea_pipe  = Pipeline(stages = [dest_indexer, dest_onehot, airline_indexer, airline_onehot, vec_assembler])

### 3.2. Huấn luyện và đánh giá model.
3.2.1. Phân chia tập train/test.

In [28]:
# create pipe_data from pipeline
pipe_data = flights_sea_pipe.fit(model_data).transform(model_data)

train, test = pipe_data.randomSplit([0.8, 0.2])

In [38]:
train.show(5)

+-------------+------------+-----+----+-----------+-------------------+-------+--------+-----+-----------------+----------------+-------------+-------------+--------------------+
|ARRIVAL_DELAY|ARRIVAL_TIME|MONTH|YEAR|DAY_OF_WEEK|DESTINATION_AIRPORT|AIRLINE|IS_DELAY|label|DESTINATION_INDEX|DESTINATION_FACT|AIRLINE_INDEX| AIRLINE_FACT|            features|
+-------------+------------+-----+----+-----------+-------------------+-------+--------+-----+-----------------+----------------+-------------+-------------+--------------------+
|          -42|         616|    1|2015|          7|                JFK|     DL|   false|    0|              5.0|  (24,[5],[1.0])|          0.0|(1,[0],[1.0])|(29,[0,1,2,3,9,28...|
|          -39|        1336|    2|2015|          6|                DTW|     DL|   false|    0|              6.0|  (24,[6],[1.0])|          0.0|(1,[0],[1.0])|(29,[0,1,2,3,10,2...|
|          -39|        1611|    1|2015|          3|                ATL|     DL|   false|    0|           

3.2.2. Huấn luyện và đánh giá model.


In [31]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression()

In [33]:
model = lr.fit(train)

In [36]:
test_results = model.transform(test)

### Evaluator: 
- RegressionEvaluator: for regression 
    - rmse
    - mse
    - mae
    - r2
- BinaryClassificationEvaluator: for binary classification 
    - areaUnderROC
    - areaUnderPR
- MulticlassClassificationEvaluator: for multiple classification 
    - accuracy
    - weightedPrecision
    - weightedRecall
    - f1
- ClusteringEvaluator: for clusters 
    - silhouette
    - silhouette_euclidean
    - silhouette_cosine
- RankingEvaluator: (only ALS / Recommender): from mllib.RankingMetrics 
    - Precision@k
    - MAP
    - NDCG@k

In [34]:
import pyspark.ml.evaluation as evals 

evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

In [37]:
evaluator.evaluate(test_results)

0.5995959356811679

Như vậy trên tập test model đạt mức độ chính xác khoảng 58.9%. Phương án tốt được cân nhắc là thêm biến để cải thiện model.

3.2.3. Tuning model thông qua param gridSearch.

In [None]:
import pyspark.ml.tuning as tune 
import numpy as np 

# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameter, we can add more than one hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, 0.1, 0.01))

grid = grid.build()

In [None]:
cv = tune.CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
models = cv.fit(train)

In [None]:
bestlr = models.bestModel