In [1]:
from pathlib import Path
from tqdm.notebook import tqdm

import numpy as np
from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import evaluation as evals
from pyspark.ml import tuning as tune
from pyspark.sql import SparkSession

# Cơ chế hoạt động

# khởi tạo connection

In [2]:
sc = SparkContext.getOrCreate()
print(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/24 14:00:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/03/24 14:00:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
<SparkContext master=local[*] appName=pyspark-shell>


In [3]:
sc.setLogLevel("ERROR")

## khởi tạo session

In [4]:
my_spark = SparkSession.builder.getOrCreate()
print(my_spark)

<pyspark.sql.session.SparkSession object at 0x7f10f81e72b0>


In [5]:
my_spark.catalog.listTables()

[]

In [6]:
flights = my_spark.read.csv('/home/quyennt72/pyspark-tutorial/data/flights.csv', header=True)

flights.limit(5).toPandas().head()

Unnamed: 0,YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,...,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY
0,2015,1,1,4,AS,98,N407AS,ANC,SEA,5,...,408,-22,0,0,,,,,,
1,2015,1,1,4,AA,2336,N3KUAA,LAX,PBI,10,...,741,-9,0,0,,,,,,
2,2015,1,1,4,US,840,N171US,SFO,CLT,20,...,811,5,0,0,,,,,,
3,2015,1,1,4,AA,258,N3HYAA,LAX,MIA,20,...,756,-9,0,0,,,,,,
4,2015,1,1,4,AS,135,N527AS,SEA,ANC,25,...,259,-21,0,0,,,,,,


In [7]:
flights.printSchema()

root
 |-- YEAR: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- DAY: string (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: string (nullable = true)
 |-- DEPARTURE_TIME: string (nullable = true)
 |-- DEPARTURE_DELAY: string (nullable = true)
 |-- TAXI_OUT: string (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- SCHEDULED_TIME: string (nullable = true)
 |-- ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: string (nullable = true)
 |-- WHEELS_ON: string (nullable = true)
 |-- TAXI_IN: string (nullable = true)
 |-- SCHEDULED_ARRIVAL: string (nullable = true)
 |-- ARRIVAL_TIME: string (nullable = true)
 |-- ARRIVAL_DELAY: string (nullable = true)
 |-- D

# Add spark dataframe from local into catalog

In [8]:
print(my_spark.catalog.listTables())

[]


In [9]:
flights.createOrReplaceTempView('flights_temp')
my_spark.catalog.listTables()

[Table(name='flights_temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

# Transform function of spark dataframe

PySpark withColumn() function of DataFrame can also be used to change the value of an existing column. Or in the example below, we can add a column name 'HOUR_ARR' equal to column 'AIR_TIME' devide to 60.

In [10]:
flights = flights.withColumn('HOUR_ARR', flights.AIR_TIME/60)
flights.printSchema()

root
 |-- YEAR: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- DAY: string (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: string (nullable = true)
 |-- DEPARTURE_TIME: string (nullable = true)
 |-- DEPARTURE_DELAY: string (nullable = true)
 |-- TAXI_OUT: string (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- SCHEDULED_TIME: string (nullable = true)
 |-- ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: string (nullable = true)
 |-- WHEELS_ON: string (nullable = true)
 |-- TAXI_IN: string (nullable = true)
 |-- SCHEDULED_ARRIVAL: string (nullable = true)
 |-- ARRIVAL_TIME: string (nullable = true)
 |-- ARRIVAL_DELAY: string (nullable = true)
 |-- D

Easy to understand, right? :)

In [11]:
avg_speed = flights.select('ORIGIN_AIRPORT', 'DESTINATION_AIRPORT', 'TAIL_NUMBER', (flights.DISTANCE/flights.HOUR_ARR).alias("avg_speed"))
avg_speed.printSchema()
avg_speed.show(5)

root
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- avg_speed: double (nullable = true)

+--------------+-------------------+-----------+-----------------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|TAIL_NUMBER|        avg_speed|
+--------------+-------------------+-----------+-----------------+
|           ANC|                SEA|     N407AS|514.0828402366864|
|           LAX|                PBI|     N3KUAA|531.5589353612166|
|           SFO|                CLT|     N171US|517.8947368421052|
|           LAX|                MIA|     N3HYAA|544.6511627906978|
|           SEA|                ANC|     N527AS|436.5829145728643|
+--------------+-------------------+-----------+-----------------+
only showing top 5 rows



In [12]:
avg_speed_exp = flights.selectExpr('ORIGIN_AIRPORT', 'DESTINATION_AIRPORT', 'TAIL_NUMBER', "(DISTANCE/HOUR_ARR) as avg_speed")
avg_speed_exp.printSchema()
avg_speed_exp.show(5)

root
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- avg_speed: double (nullable = true)

+--------------+-------------------+-----------+-----------------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|TAIL_NUMBER|        avg_speed|
+--------------+-------------------+-----------+-----------------+
|           ANC|                SEA|     N407AS|514.0828402366864|
|           LAX|                PBI|     N3KUAA|531.5589353612166|
|           SFO|                CLT|     N171US|517.8947368421052|
|           LAX|                MIA|     N3HYAA|544.6511627906978|
|           SEA|                ANC|     N527AS|436.5829145728643|
+--------------+-------------------+-----------+-----------------+
only showing top 5 rows



In [13]:
filter_SEA_ANC = flights.filter("ORIGIN_AIRPORT == 'SEA'").filter("DESTINATION_AIRPORT == 'ANC'")
filter_SEA_ANC.limit(5).toPandas().head()

Unnamed: 0,YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,...,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY,HOUR_ARR
0,2015,1,1,4,AS,135,N527AS,SEA,ANC,25,...,-21,0,0,,,,,,,3.316667
1,2015,1,1,4,AS,81,N577AS,SEA,ANC,600,...,-13,0,0,,,,,,,3.25
2,2015,1,1,4,AS,83,N532AS,SEA,ANC,800,...,-22,0,0,,,,,,,3.233333
3,2015,1,1,4,AS,111,N570AS,SEA,ANC,905,...,-10,0,0,,,,,,,3.216667
4,2015,1,1,4,AS,85,N764AS,SEA,ANC,1020,...,-3,0,0,,,,,,,3.3


In [14]:
avg_time_org_airport = flights.groupBy("ORIGIN_AIRPORT").avg("HOUR_ARR")
avg_time_org_airport.show(5)

[Stage 6:====>                                                    (1 + 11) / 12]

+--------------+------------------+
|ORIGIN_AIRPORT|     avg(HOUR_ARR)|
+--------------+------------------+
|           BGM|1.0965250965250966|
|           PSE| 3.035252935862692|
|           INL|0.5327937649880096|
|           MSY|1.7156564469514952|
|           PPG|  5.15314465408805|
+--------------+------------------+
only showing top 5 rows



                                                                                

# Transform using spark.sql

In [16]:
flights_10 = my_spark.sql("select * from flights_temp where AIR_TIME > 10")
flights_10.limit(5).toPandas().head()

Unnamed: 0,YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,...,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY
0,2015,1,1,4,AS,98,N407AS,ANC,SEA,5,...,408,-22,0,0,,,,,,
1,2015,1,1,4,AA,2336,N3KUAA,LAX,PBI,10,...,741,-9,0,0,,,,,,
2,2015,1,1,4,US,840,N171US,SFO,CLT,20,...,811,5,0,0,,,,,,
3,2015,1,1,4,AA,258,N3HYAA,LAX,MIA,20,...,756,-9,0,0,,,,,,
4,2015,1,1,4,AS,135,N527AS,SEA,ANC,25,...,259,-21,0,0,,,,,,


In [17]:
print(my_spark.catalog.listTables())

[Table(name='flights_temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


In [18]:
agg_arr_time = my_spark.sql("select ORIGIN_AIRPORT, DESTINATION_AIRPORT, TAIL_NUMBER, mean(AIR_TIME) as avg_speed from flights_temp group by ORIGIN_AIRPORT, DESTINATION_AIRPORT, TAIL_NUMBER")
agg_arr_time.show(5)



+--------------+-------------------+-----------+------------------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|TAIL_NUMBER|         avg_speed|
+--------------+-------------------+-----------+------------------+
|           IAG|                FLL|     N630NK|             158.0|
|           RIC|                ATL|     N947DN|  75.6470588235294|
|           EWR|                ATL|     N970AT|108.02777777777777|
|           MSN|                ORD|     N703SK|              28.0|
|           AVL|                ATL|     N994AT|              30.5|
+--------------+-------------------+-----------+------------------+
only showing top 5 rows



                                                                                

# End-to-End model on pyspark

## pipe line for transforming data

In [19]:
flights.printSchema()

root
 |-- YEAR: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- DAY: string (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: string (nullable = true)
 |-- DEPARTURE_TIME: string (nullable = true)
 |-- DEPARTURE_DELAY: string (nullable = true)
 |-- TAXI_OUT: string (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- SCHEDULED_TIME: string (nullable = true)
 |-- ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: string (nullable = true)
 |-- WHEELS_ON: string (nullable = true)
 |-- TAXI_IN: string (nullable = true)
 |-- SCHEDULED_ARRIVAL: string (nullable = true)
 |-- ARRIVAL_TIME: string (nullable = true)
 |-- ARRIVAL_DELAY: string (nullable = true)
 |-- D

In [20]:
print("Shape of previous data: ({}, {})".format(flights.count(), len(flights.columns)))

Shape of previous data: (5819079, 32)


In [21]:
flights_SEA = my_spark.sql("select ARRIVAL_DELAY, ARRIVAL_TIME, MONTH, YEAR, DAY_OF_WEEK, DESTINATION_AIRPORT, AIRLINE \
                            from flights_temp \
                            where ORIGIN_AIRPORT = 'SEA' and AIRLINE in ('DL', 'AA')")
print("Shape of previous data: ({}, {})".format(flights_SEA.count(), len(flights_SEA.columns)))

[Stage 18:>                                                       (0 + 12) / 12]

Shape of previous data: (19956, 7)


                                                                                

In [22]:
flights_SEA = flights_SEA.withColumn("IS_DELAY", flights_SEA.ARRIVAL_DELAY > 0)
flights_SEA = flights_SEA.withColumn("label", flights_SEA.IS_DELAY.cast("integer"))

In [23]:
model_data = flights_SEA.filter("ARRIVAL_DELAY is not null \
                                 and ARRIVAL_TIME is not null \
                                 and MONTH is not null \
                                 and YEAR is not null \
                                 and DAY_OF_WEEK is not null \
                                 and DESTINATION_AIRPORT is not null \
                                 and AIRLINE is not null")
print('Shape of model_data data: ({}, {})'.format(model_data.count(), len(model_data.columns)))

[Stage 21:>                                                       (0 + 12) / 12]

Shape of model_data data: (19823, 9)


                                                                                

In [24]:
flights_SEA.show(5)

+-------------+------------+-----+----+-----------+-------------------+-------+--------+-----+
|ARRIVAL_DELAY|ARRIVAL_TIME|MONTH|YEAR|DAY_OF_WEEK|DESTINATION_AIRPORT|AIRLINE|IS_DELAY|label|
+-------------+------------+-----+----+-----------+-------------------+-------+--------+-----+
|            8|        0557|    1|2015|          4|                MSP|     DL|    true|    1|
|            1|        0939|    1|2015|          4|                MIA|     AA|    true|    1|
|           16|        1206|    1|2015|          4|                DFW|     AA|    true|    1|
|           13|        1418|    1|2015|          4|                ATL|     DL|    true|    1|
|           24|        1314|    1|2015|          4|                DFW|     AA|    true|    1|
+-------------+------------+-----+----+-----------+-------------------+-------+--------+-----+
only showing top 5 rows



In [25]:
model_data = model_data.withColumn("ARRIVAL_TIME", model_data.ARRIVAL_TIME.cast("integer"))
model_data = model_data.withColumn("MONTH", model_data.ARRIVAL_TIME.cast("integer"))
model_data = model_data.withColumn("YEAR", model_data.ARRIVAL_TIME.cast("integer"))
model_data = model_data.withColumn("DAY_OF_WEEK", model_data.ARRIVAL_TIME.cast("integer"))
model_data.printSchema()

root
 |-- ARRIVAL_DELAY: string (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- IS_DELAY: boolean (nullable = true)
 |-- label: integer (nullable = true)



In [26]:
dest_indexer = StringIndexer(inputCol="DESTINATION_AIRPORT",
                             outputCol="DESTINATION_INDEX")
dest_onehot = OneHotEncoder(inputCol="DESTINATION_INDEX",
                            outputCol="DESTINATION_FACT")

airline_indexer = StringIndexer(inputCol="AIRLINE",
                                outputCol="AIRLINE_INDEX")
airline_onehot = OneHotEncoder(inputCol="AIRLINE_INDEX",
                               outputCol="AIRLINE_FACT")

In [27]:
# Make a VectorAssembler
vec_assembler = VectorAssembler(inputCols=["ARRIVAL_TIME", "MONTH", "YEAR", "DAY_OF_WEEK", \
                                           "DESTINATION_FACT", "AIRLINE_FACT"],
                                outputCol="features")

In [29]:
flights_sea_pipe = Pipeline(stages=[dest_indexer, dest_onehot, \
                                   airline_indexer, airline_onehot, \
                                   vec_assembler])

In [30]:
# Create pipe_data from pipeline
pipe_data = flights_sea_pipe.fit(model_data).transform(model_data)

                                                                                

In [35]:
pipe_data.limit(3).toPandas().head()

Unnamed: 0,ARRIVAL_DELAY,ARRIVAL_TIME,MONTH,YEAR,DAY_OF_WEEK,DESTINATION_AIRPORT,AIRLINE,IS_DELAY,label,DESTINATION_INDEX,DESTINATION_FACT,AIRLINE_INDEX,AIRLINE_FACT,features
0,8,557,557,557,557,MSP,DL,True,1,3.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",0.0,(1.0),"(557.0, 557.0, 557.0, 557.0, 0.0, 0.0, 0.0, 1...."
1,1,939,939,939,939,MIA,AA,True,1,13.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0,(0.0),"(939.0, 939.0, 939.0, 939.0, 0.0, 0.0, 0.0, 0...."
2,16,1206,1206,1206,1206,DFW,AA,True,1,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0,(0.0),"(1206.0, 1206.0, 1206.0, 1206.0, 1.0, 0.0, 0.0..."


# Train and evaluate model

In [36]:
train, test = pipe_data.randomSplit([0.8, 0.2])

## train model

In [37]:
lr = LogisticRegression()
evaluator = evals.BinaryClassificationEvaluator(metricName='areaUnderROC')

In [38]:
grid = tune.ParamGridBuilder()
grid = grid.addGrid(lr.regParam, np.arange(0, 0.1, 0.01))
grid = grid.build()

In [39]:
cv = tune.CrossValidator(estimator=lr,
                         estimatorParamMaps=grid,
                         evaluator=evaluator)
models = cv.fit(train)

                                                                                

In [40]:
best_lr = models.bestModel
print(best_lr)

LogisticRegressionModel: uid=LogisticRegression_19d14ba790f6, numClasses=2, numFeatures=29


In [41]:
test_results = best_lr.transform(test)
print(evaluator.evaluate(test_results))

                                                                                

0.5991524617982568
