-
Notifications
You must be signed in to change notification settings - Fork 0
/
task3.py
155 lines (118 loc) · 4.29 KB
/
task3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pandas as pd
MAX_MEMORY = "14g"
spark = SparkSession \
.builder \
.appName("testFlight") \
.config("spark.executor.memory", MAX_MEMORY) \
.config("spark.driver.memory", MAX_MEMORY) \
.getOrCreate()
#enter path of file here
path = "C:/Users/Christos Kallaras/Desktop/Master/1st Year/2nd Trimester/Big Data Systems and Architectures/Spark/Assignment"
flights = spark\
.read\
.option("inferSchema", "true")\
.option("header", "true")\
.csv( path + "/671009038_T_ONTIME_REPORTING.csv")
flights.show(5)
#show the colums names
flights.printSchema()
#convert the dataframe into a table
flights.createOrReplaceTempView("flights")
airports = spark.sql("""
SELECT ORIGIN,count(*) AS FLIGHTS
FROM flights
GROUP BY ORIGIN
ORDER BY FLIGHTS DESC
""")
airports.show(5)
airways = spark.sql("""
SELECT CARRIER,count(*) AS FLIGHTS
FROM flights
GROUP BY CARRIER
ORDER BY FLIGHTS DESC
""")
airways.show(5)
#convert the dataframe into a table
airports.createOrReplaceTempView("airports")
#we want the result in a variable so we can use it for the next queries so we retuen the first value (since the query will return one value)
limit_airports = spark.sql("""
SELECT percentile(FLIGHTS, 0.1) AS limit FROM airports
""").first()["limit"]
limit_airports
#convert the dataframe into a table
airways.createOrReplaceTempView("airways")
limit_airways = spark.sql("""
SELECT percentile(FLIGHTS, 0.1) AS limit FROM airways
""").first()["limit"]
limit_airways
#construt querry
query = """
SELECT flights.* FROM flights,
(
select CARRIER,COUNT (*) as number_flights
from flights
group by CARRIER
) AS A ,
(
select ORIGIN,COUNT (*) as number_flights
from flights
group by ORIGIN
) AS B
WHERE flights.CARRIER = A.CARRIER
AND flights.ORIGIN = B.ORIGIN
AND A.number_flights > """ + str(limit_airways) + """
AND B.number_flights > """ + str(limit_airports)
query
data_without_outliers = spark.sql(query)
data_without_outliers.show(5)
from pyspark.sql.functions import date_format, col
prepped_dataframe = flights\
.na.fill(0)\
.withColumn("hour_of_day", (col("DEP_TIME")/100).cast('int'))
prepped_dataframe.select("hour_of_day").distinct().orderBy("hour_of_day").show()
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
indexer1 = StringIndexer()\
.setInputCol("ORIGIN")\
.setOutputCol("origin_index")
indexer2 = StringIndexer()\
.setInputCol("CARRIER")\
.setOutputCol("carrier_index")
indexer3 = StringIndexer()\
.setInputCol("hour_of_day")\
.setOutputCol("hour_of_day_index")
encoder = OneHotEncoder(dropLast=False)\
.setInputCols(["origin_index","carrier_index","hour_of_day_index"])\
.setOutputCols(["origin_encoded","carrier_encoded","hour_of_day_encoded"])
vector_assembler = VectorAssembler()\
.setInputCols(["origin_encoded","carrier_encoded","hour_of_day_encoded"])\
.setOutputCol("features")
transformation_pipeline = Pipeline()\
.setStages([indexer1,indexer2,indexer3, encoder, vector_assembler])
fitted_pipeline = transformation_pipeline.fit(prepped_dataframe)
transformed_dataframe = fitted_pipeline.transform(prepped_dataframe)
transformed_dataframe.cache()
transformed_dataframe.show(5)
splits = transformed_dataframe.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='DEP_DELAY', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("R2 = " + str(lr_model.summary.r2))
#make the predictions
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","DEP_DELAY","features").show(10)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="DEP_DELAY",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))
#find the Root Mean Squared Error (RMSE)
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) = " + str(test_result.rootMeanSquaredError))
spark.stop()