In [1]:
from pyspark.sql import SparkSession
spark =  SparkSession.builder.appName('Processing').getOrCreate()
"""
spark =  SparkSession.builder.appName('Processing')\
.master("yarn")\
.config("spark.executor.memory","12g")\
.config("spark.executor.cores","2")\
.config("spark.executor.instances","2")\
.getOrCreate()
print(spark)
"""

'\nspark =  SparkSession.builder.appName(\'Processing\').master("yarn").config("spark.executor.memory","12g").config("spark.executor.cores","2").config("spark.executor.instances","2").getOrCreate()\nprint(spark)\n'

In [2]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType, TimestampType
from pyspark.sql.functions import isnull, when, count, col, desc

In [4]:
df = spark.read.csv("Flights_2021_1.csv", header=True)
for i in range(2,13):
    df=df.union(spark.read.csv(f"Flights_2021_{i}.csv",header=True))
df.count()

6311871

In [4]:
#numaram valorile nule din DepDelayMinutes
df.select(count(when(isnull("DepDelayMinutes"),"DepDelayMinutes"))).show()
df.select(count(when(isnull("ArrDelayMinutes"),"ArrDelayMinutes"))).show()

+-------------------------------------------------------------------+
|count(CASE WHEN (DepDelayMinutes IS NULL) THEN DepDelayMinutes END)|
+-------------------------------------------------------------------+
|                                                             108413|
+-------------------------------------------------------------------+

+-------------------------------------------------------------------+
|count(CASE WHEN (ArrDelayMinutes IS NULL) THEN ArrDelayMinutes END)|
+-------------------------------------------------------------------+
|                                                             126001|
+-------------------------------------------------------------------+



In [5]:
df=df.select(*(col(c).cast("float").alias(c) for c in df.columns))

In [32]:
#Determinam coloanele care au cele mai mari corelatii cu coloana ArrDelay
for column in df.columns:
    correlation=df.stat.corr(column,"ArrDelayMinutes")
    print(f"{column}: {correlation}")

Year: nan
Quarter: 0.023337430831709513
Month: 0.024645759396537747
DayofMonth: 0.0047066972077218
DayOfWeek: 0.00023115613608019217
FlightDate: nan
Marketing_Airline_Network: nan
Operated_or_Branded_Code_Share_Partners: nan
DOT_ID_Marketing_Airline: -0.02481471185520768
IATA_Code_Marketing_Airline: nan
Flight_Number_Marketing_Airline: 0.01670329707122373
Originally_Scheduled_Code_Share_Airline: nan
DOT_ID_Originally_Scheduled_Code_Share_Airline: 0.023565365250150596
IATA_Code_Originally_Scheduled_Code_Share_Airline: nan
Flight_Num_Originally_Scheduled_Code_Share_Airline: 0.02408811176475596
Operating_Airline : nan
DOT_ID_Operating_Airline: -0.01806040149379832
IATA_Code_Operating_Airline: nan
Tail_Number: nan
Flight_Number_Operating_Airline: 0.01669536140763225
OriginAirportID: -0.0022179493696779717
OriginAirportSeqID: -0.0022180443811063984
OriginCityMarketID: -0.009431711240518182
Origin: nan
OriginCityName: nan
OriginState: nan
OriginStateFips: 0.0008312922101911207
OriginStateNam

In [6]:
# Vom utiliza coloanele DepDelayMinutes,CarrierDelay,WeatherDelay,NASDelay,LateAircraftDelay

from pyspark.ml.feature import Imputer
imputer = Imputer()
imputer.setInputCols(["DepDelayMinutes","CarrierDelay","WeatherDelay","NASDelay","LateAircraftDelay","ArrDelayMinutes"])
imputer.setOutputCols(["out_DepDelayMinutes","out_CarrierDelay","out_WeatherDelay","out_NASDelay","out_LateAircraftDelay","out_ArrDelayMinutes"])
model=imputer.fit(df)
imputed_df=model.transform(df)

In [7]:
imputed_df.select(count(when(isnull("out_DepDelayMinutes"),"out_DepDelayMinutes"))).show()
imputed_df.select(count(when(isnull("out_CarrierDelay"),"out_CarrierDelay"))).show()
imputed_df.select(count(when(isnull("out_WeatherDelay"),"out_WeatherDelay"))).show()
imputed_df.select(count(when(isnull("out_NASDelay"),"out_NASDelay"))).show()
imputed_df.select(count(when(isnull("out_LateAircraftDelay"),"out_LateAircraftDelay"))).show()
imputed_df.select(count(when(isnull("out_ArrDelayMinutes"),"out_ArrDelayMinutes"))).show()

+---------------------------------------------------------------------------+
|count(CASE WHEN (out_DepDelayMinutes IS NULL) THEN out_DepDelayMinutes END)|
+---------------------------------------------------------------------------+
|                                                                          0|
+---------------------------------------------------------------------------+

+---------------------------------------------------------------------+
|count(CASE WHEN (out_CarrierDelay IS NULL) THEN out_CarrierDelay END)|
+---------------------------------------------------------------------+
|                                                                    0|
+---------------------------------------------------------------------+

+---------------------------------------------------------------------+
|count(CASE WHEN (out_WeatherDelay IS NULL) THEN out_WeatherDelay END)|
+---------------------------------------------------------------------+
|                               

In [7]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["out_DepDelayMinutes","out_CarrierDelay","out_WeatherDelay","out_NASDelay","out_LateAircraftDelay"], outputCol='features')
output = assembler.transform(imputed_df)
final_df=output.select('features','out_ArrDelayMinutes')
final_df.show(5)
train_data, test_data = final_df.randomSplit([0.7, 0.3])

+--------------------+-------------------+
|            features|out_ArrDelayMinutes|
+--------------------+-------------------+
|[3.0,26.829744338...|                0.0|
|[0.0,26.829744338...|                0.0|
|[7.0,26.829744338...|                0.0|
|[0.0,26.829744338...|                0.0|
|[0.0,26.829744338...|               14.0|
+--------------------+-------------------+
only showing top 5 rows



In [8]:
from pyspark.ml.regression import LinearRegression
from pyspark.sql import functions as F
import time
lr = LinearRegression(labelCol='out_ArrDelayMinutes')

start=time.time()
lr_model = lr.fit(train_data)
end=time.time()
duration=end-start
print("Training time (s): ", duration)
test_results = lr_model.evaluate(test_data)
test_results.residuals.orderBy(F.col('residuals').desc()).show()

Training time (s):  10.026498556137085
+------------------+
|         residuals|
+------------------+
| 192.0718587034596|
| 187.6539837548486|
|186.51823932713094|
|182.81063438530566|
|178.35920395167503|
| 173.8006860571531|
|172.55579491282398|
|170.64632601273036|
|166.77443574758547|
|163.53833556798173|
|157.42554399396604|
|157.19098585393252|
|154.17713571066088|
|151.29599237979085|
| 148.3300008007115|
|146.97673536007304|
|144.26433713153406|
|144.26370923714532|
|142.95388126068397|
|141.16054202357023|
+------------------+
only showing top 20 rows



In [9]:
test_results.rootMeanSquaredError

9.498622707319955

In [10]:
test_results.r2

0.9582704670924013

In [13]:
from pyspark.sql.functions import col, abs

predictions = lr_model.transform(test_data)
predictions = predictions.withColumn("absolute_percentage_error", abs((col("out_ArrDelayMinutes") - col("prediction")) / col("out_ArrDelayMinutes")))
mape = predictions.selectExpr("avg(absolute_percentage_error) as mape").collect()[0]["mape"] * 100

print(f"Mean Absolute Percentage Error (MAPE): {mape:.2f}%")
predictions.orderBy(desc("features")).show()

Mean Absolute Percentage Error (MAPE): 78.45%
+--------------------+-------------------+------------------+-------------------------+
|            features|out_ArrDelayMinutes|        prediction|absolute_percentage_error|
+--------------------+-------------------+------------------+-------------------------+
|[2100.0,12.0,0.0,...|             2081.0|2011.6235493986442|      0.03333803488772505|
|[2031.0,10.0,0.0,...|             2033.0|1946.9596142008218|      0.04232188184907931|
|[1948.0,550.0,0.0...|             1961.0|1881.5904279528625|      0.04049442735703088|
|[1942.0,8.0,0.0,6...|             1948.0|1862.7501326058054|     0.043762765602769274|
|[1917.0,1367.0,0....|             1893.0|1861.9534802955807|     0.016400697149719666|
|[1915.0,20.0,0.0,...|             1890.0|1833.5857657543786|     0.029848801188159472|
|[1815.0,716.0,0.0...|             1797.0| 1751.448937347494|     0.025348393240125798|
|[1716.0,951.0,0.0...|             1712.0|1661.6159350054704|     0.029429

In [19]:
# Decision Tree

from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import Imputer
import time

dt = DecisionTreeRegressor(featuresCol="features", labelCol="out_ArrDelayMinutes",maxDepth=30)

start=time.time()
model = dt.fit(train_data)
end=time.time()
duration=end-start
print("Training time (s):",duration)
predictions = model.transform(test_data)
predictions.show(50)

Training time (s): 143.1710855960846
+--------------------+-------------------+------------------+
|            features|out_ArrDelayMinutes|        prediction|
+--------------------+-------------------+------------------+
|      (5,[0],[48.0])|               34.0|30.886524822695037|
|      (5,[0],[87.0])|               65.0| 56.75824175824176|
|(5,[0,1],[17.0,16...|               16.0|16.839423076923076|
|(5,[0,1],[17.0,17...|               17.0|16.839423076923076|
|(5,[0,1],[20.0,16...|               16.0|17.666858457997698|
|(5,[0,1],[20.0,20...|               20.0|17.666858457997698|
|(5,[0,1],[22.0,18...|               18.0|18.465199590583417|
|(5,[0,1],[23.0,17...|               17.0|18.777673104097975|
|(5,[0,1],[23.0,23...|               23.0|18.777673104097975|
|(5,[0,1],[25.0,17...|               17.0|19.418328383538395|
|(5,[0,1],[28.0,15...|               15.0|20.456113399902243|
|(5,[0,1],[28.0,17...|               17.0|20.456113399902243|
|(5,[0,1],[28.0,27...|           

In [20]:
predictions.orderBy(desc("features")).show()
predictions = predictions.withColumn("absolute_percentage_error", abs((col("out_ArrDelayMinutes") - col("prediction")) / col("out_ArrDelayMinutes")))
mape = predictions.selectExpr("avg(absolute_percentage_error) as mape").collect()[0]["mape"] * 100

print(f"Mean Absolute Percentage Error (MAPE): {mape:.2f}%")

+--------------------+-------------------+------------------+
|            features|out_ArrDelayMinutes|        prediction|
+--------------------+-------------------+------------------+
|[2100.0,12.0,0.0,...|             2081.0|146.30797824116047|
|[2031.0,10.0,0.0,...|             2033.0|161.92549019607844|
|[1948.0,550.0,0.0...|             1961.0| 333.1707317073171|
|[1942.0,8.0,0.0,6...|             1948.0|163.73460537727667|
|[1917.0,1367.0,0....|             1893.0| 300.6959420289855|
|[1915.0,20.0,0.0,...|             1890.0| 152.8556854410202|
|[1815.0,716.0,0.0...|             1797.0| 300.6959420289855|
|[1716.0,951.0,0.0...|             1712.0| 300.6959420289855|
|[1710.0,1710.0,0....|             1722.0|  214.991452991453|
|[1707.0,224.0,0.0...|             1682.0| 300.6959420289855|
|[1677.0,1155.0,0....|             1647.0| 300.6959420289855|
|[1660.0,28.0,0.0,...|             1623.0|149.53061224489795|
|[1649.0,1589.0,0....|             1623.0|164.88505747126436|
|[1646.0

In [21]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol='out_ArrDelayMinutes', predictionCol='prediction')
r2 = evaluator.evaluate(predictions, {evaluator.metricName: 'r2'})
rmse = evaluator.evaluate(predictions, {evaluator.metricName: 'rmse'})
print("r2:",r2)
print("rmse:",rmse)

r2: 0.6025596400755497
rmse: 29.31398487695787
