# GBTRegressor

In [None]:
#匯入套件
import numpy as np
import pandas as pd
import pyspark
import pyspark.sql.functions as fn
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

#Spark設定
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)

#讀取資料
df = spark.read.parquet('added_ArrDelayMinutes_encoded_dataset.parquet', header=True, inferSchema=True)

#設定features欄位
featuresCols = ['Year', 'Quarter','Month','DayofMonth','DayOfWeek','CRSDepTime','CRSArrTime','Distance','AWND','PRCP','TMAX','TMIN','WSF2',
                                       'WSF5','SNOW','WT01', 'WT02', 'WT03','WT04','WT05','WT06', 'WT07', 'WT08','WT09','WT10','WT11','WT18','MFR Year', 'Origin','Dest',
                                       'Tail Number','Manufacturer Name', 'Model', 'Airline_AA', 'Airline_AS', 'Airline_B6', 'Airline_DL','Airline_F9', 'Airline_G4', 'Airline_HA', 
                                       'Airline_NK', 'Airline_UA', 'Airline_VX', 'Airline_WN', 'TypeOfAircraft_Balloon', 'TypeOfAircraft_Fixed Wing Multi-Engine', 
                                       'TypeOfAircraft_Fixed Wing Single-Engine', 'TypeOfAircraft_Rotorcraft','TypeOfEngine_4 Cycle', 'TypeOfEngine_None', 'TypeOfEngine_Reciprocating', 
                                       'TypeOfEngine_Turbo-fan', 'TypeOfEngine_Turbo-jet', 'TypeOfEngine_Turbo-prop', 'TypeOfEngine_Turbo-shaft']

#將features欄位，合併成一個欄位並輸出欄位名稱features
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="features")

#使用gbt模型做預測，label欄位指定ArrDelayMinutes
gbt = GBTRegressor(labelCol="ArrDelayMinutes")

#設定模型的深度跟跌代次數
paramGrid = ParamGridBuilder()\
    .addGrid(gbt.maxDepth, [5])\
    .addGrid(gbt.maxIter, [5])\
    .build()

#決定評估模型的方式r2
evaluator = RegressionEvaluator(metricName="r2", 
            labelCol=gbt.getLabelCol(), 
            predictionCol=gbt.getPredictionCol())

#將多個設定串起來
cv = CrossValidator(estimator=gbt, evaluator=evaluator, 
        estimatorParamMaps=paramGrid)

#用這個多個設定串接features欄位跟剩餘設定參數
pipeline = Pipeline(stages=[vectorAssembler, cv])

#將X套入這些設定
pipelineModel = pipeline.fit(train_df)

#去抓取最大跌代次數、最深深度
pipelineModel.stages[1].bestModel._java_obj.getMaxDepth()
pipelineModel.stages[1].bestModel._java_obj.getMaxIter()

#模型套入測試資料
predictions = pipelineModel.transform(test_df)

#將評估值給予參數並印出
r2 = evaluator.evaluate(predictions)
print("test r2 = %f" % (r2))

# LassoRegression

In [None]:
#匯入套件
import numpy as np
import pandas as pd
import pyspark
import pyspark.sql.functions as fn
import pyspark.pandas as ps
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.mllib.evaluation import MulticlassMetrics

#Spark,Pyspark設定
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled",True)
ps.set_option("compute.default_index_type", "distributed")

#讀取資料
df = spark.read.parquet('added_WeatherDelay_encoded_dataset.parquet',header=True, inferSchema=True)

#設定features欄位合併單一個欄位並向量化，輸出成features欄位
assembler = VectorAssembler(inputCols=['CRSDepTime','CRSArrTime','PRCP','TMAX','WSF2','SNOW','WT01','WT02','WT03','WT04',
                                       'WT05','WT06','WT07','WT08','WT09','WT10','WT11','WT18','Origin','Dest',
                                       'Airline_AA','Airline_AS','Airline_B6','Airline_DL','Airline_F9','Airline_G4','Airline_HA','Airline_NK','Airline_UA','Airline_VX',
                                       'Airline_WN'], 
                            outputCol='features')
df = assembler.transform(df)
data = df.select(['features', 'WeatherDelay'])

#將weather delay重新編碼輸出成label欄位
label_indexer = StringIndexer(inputCol='WeatherDelay', outputCol='label').fit(data)
data = label_indexer.transform(data)
data = data.select(['features', 'label'])

#設定train,test比重
train_df, test_df = data.randomSplit([0.80, 0.20], seed=42)

#設定lasso的參數，elasticNetParam設定1是lasso regression,0是ridge regression
lin_Reg=LinearRegression(labelCol='label', regParam=0.1, elasticNetParam=1, solver='normal')

#train模型
lr_model=lin_Reg.fit(train_df)

#印出模型相關係數、殘差
print("model intercept = %f" % (lr_model.intercept))
print("model coeffiecients = %f" % (lr_model.coefficients))

#把test套進模型，印出mse,rmse,r2
training_predictions=lr_model.evaluate(train_df)
print("mse r2 = %f" % (training_predictions.meanSquaredError))
print("rmse r2 = %f" % (training_predictions.rootMeanSquaredError))
print("test r2 = %f" % (training_predictions.r2))

# RandomForestRegressor

In [None]:
#匯入套件
import numpy as np
import pandas as pd
import pyspark
import sys
import pyspark.sql.functions as fn
import pyspark.pandas as ps
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.evaluation import RegressionEvaluator

#Spark,Pyspark設定
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled",True)
ps.set_option("compute.default_index_type", "distributed")

#讀取資料
df = spark.read.parquet('added_ArrDelayMinutes_encoded_dataset.parquet',header=True, inferSchema=True)

#設定features欄位合併單一個欄位並向量化，輸出成features欄位
featureCols=['Year', 'Quarter','Month','DayofMonth','DayOfWeek','CRSDepTime','CRSArrTime','Distance','AWND','PRCP','TMAX','TMIN','WSF2',
                                       'WSF5','SNOW','WT01', 'WT02', 'WT03','WT04','WT05','WT06', 'WT07', 'WT08','WT09','WT10','WT11','WT18','MFR Year', 'Origin','Dest',
                                       'Tail Number','Manufacturer Name', 'Model', 'Airline_AA', 'Airline_AS', 'Airline_B6', 'Airline_DL','Airline_F9', 'Airline_G4', 'Airline_HA', 
                                       'Airline_NK', 'Airline_UA', 'Airline_VX', 'Airline_WN', 'TypeOfAircraft_Balloon', 'TypeOfAircraft_Fixed Wing Multi-Engine', 
                                       'TypeOfAircraft_Fixed Wing Single-Engine', 'TypeOfAircraft_Rotorcraft','TypeOfEngine_4 Cycle', 'TypeOfEngine_None', 'TypeOfEngine_Reciprocating', 
                                       'TypeOfEngine_Turbo-fan', 'TypeOfEngine_Turbo-jet', 'TypeOfEngine_Turbo-prop', 'TypeOfEngine_Turbo-shaft']
assembler = VectorAssembler(inputCols=featureCols, 
                            outputCol='features')
df = assembler.transform(df)
data = df.select(['features', 'ArrDelayMinutes'])

#將arrDelayMinutes重新編碼輸出成label欄位
label_indexer = StringIndexer(inputCol='ArrDelayMinutes', outputCol='label').fit(data)
data = label_indexer.transform(data)
data = data.select(['features', 'label'])

#設定train,test比重
train_df, test_df = data.randomSplit([0.80, 0.20], seed=42)

#設定模型抓取X,y給予參數，train模型
rf=RandomForestRegressor(featuresCol="features", labelCol="label")
model=rf.fit(train_df)
training_predictions=model.transform(train_df)

#設定r2評估模型，並印出
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
print("train r2 = %f" % (evaluator.evaluate(training_predictions)))

#將model套入test，並印出r2
test_results=model.transform(test_df)
print("test r2 = %f" % (evaluator.evaluate(test_results)))

# LogisticRegression

In [None]:
#匯入套件
import pyspark.sql.types as typ
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.pandas as ps
import os

#讀取資料
df = spark.read.parquet('added_ArrDelayMinutes_encoded_dataset.parquet', header=True, inferSchema=True)

#設定features欄位合併單一個欄位並向量化，輸出成features欄位
assembler = VectorAssembler(inputCols=['Year', 'Quarter','Month','DayofMonth','DayOfWeek','CRSDepTime','CRSArrTime','Distance','AWND','PRCP','TMAX','TMIN','WSF2',
                                       'WSF5','SNOW','WT01', 'WT02', 'WT03','WT04','WT05','WT06', 'WT07', 'WT08','WT09','WT10','WT11','WT18','MFR Year', 'Origin','Dest',
                                       'Tail Number','Manufacturer Name', 'Model', 'Airline_AA', 'Airline_AS', 'Airline_B6', 'Airline_DL','Airline_F9', 'Airline_G4', 'Airline_HA', 
                                       'Airline_NK', 'Airline_UA', 'Airline_VX', 'Airline_WN', 'TypeOfAircraft_Balloon', 'TypeOfAircraft_Fixed Wing Multi-Engine', 
                                       'TypeOfAircraft_Fixed Wing Single-Engine', 'TypeOfAircraft_Rotorcraft','TypeOfEngine_4 Cycle', 'TypeOfEngine_None', 'TypeOfEngine_Reciprocating', 
                                       'TypeOfEngine_Turbo-fan', 'TypeOfEngine_Turbo-jet', 'TypeOfEngine_Turbo-prop', 'TypeOfEngine_Turbo-shaft'], 
                            outputCol='features')
df = assembler.transform(df)
data = df.select(['features', 'ArrDelayMinutes'])

#將arrDelayMinutes重新編碼輸出成label欄位
label_indexer = StringIndexer(inputCol='ArrDelayMinutes', outputCol='label').fit(data)
data = label_indexer.transform(data)
data = data.select(['features', 'label'])

#設定train,test比重
train, test = data.randomSplit([0.80, 0.20], seed=42)

#設定模型，train模型
lr = LogisticRegression()
model = lr.fit(train)
prediction = model.transform(test)

#設定r2評估模型，並印出
evaluator = MulticlassClassificationEvaluator(metricName='accuracy')
accuracy = evaluator.evaluate(prediction)
print("test r2 = %f" % (accuracy))

# LinearRegression

In [None]:
#匯入套件
import numpy as np
import pandas as pd
import pyspark
import sys
import pyspark.sql.functions as fn
import pyspark.pandas as ps
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.mllib.evaluation import MulticlassMetrics

#Spark,Pyspark設定
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled",True)
ps.set_option("compute.default_index_type", "distributed")

#讀取資料
df = spark.read.parquet('added_WeatherDelay_encoded_dataset.parquet',header=True, inferSchema=True)

#設定features欄位合併單一個欄位並向量化，輸出成features欄位
featuresCols = ['Year', 'Quarter','Month','DayofMonth','DayOfWeek','CRSDepTime','CRSArrTime','Distance','AWND','PRCP','TMAX','TMIN','WSF2',
                                       'WSF5','SNOW','WT01', 'WT02', 'WT03','WT04','WT05','WT06', 'WT07', 'WT08','WT09','WT10','WT11','WT18','MFR Year', 'Origin','Dest',
                                       'Tail Number','Manufacturer Name', 'Model', 'Airline_AA', 'Airline_AS', 'Airline_B6', 'Airline_DL','Airline_F9', 'Airline_G4', 'Airline_HA', 
                                       'Airline_NK', 'Airline_UA', 'Airline_VX', 'Airline_WN', 'TypeOfAircraft_Balloon', 'TypeOfAircraft_Fixed Wing Multi-Engine', 
                                       'TypeOfAircraft_Fixed Wing Single-Engine', 'TypeOfAircraft_Rotorcraft','TypeOfEngine_4 Cycle', 'TypeOfEngine_None', 'TypeOfEngine_Reciprocating', 
                                       'TypeOfEngine_Turbo-fan', 'TypeOfEngine_Turbo-jet', 'TypeOfEngine_Turbo-prop', 'TypeOfEngine_Turbo-shaft']
assembler = VectorAssembler(inputCols=featuresCols, 
                            outputCol='features')
df = assembler.transform(df)
data = df.select(['features', 'WeatherDelay'])

#將weather delay重新編碼輸出成label欄位
label_indexer = StringIndexer(inputCol='WeatherDelay', outputCol='label').fit(data)
data = label_indexer.transform(data)
data = data.select(['features', 'label'])

#設定train,test比重
train_df, test_df = data.randomSplit([0.80, 0.20], seed=42)

#設定線性回歸參數，並訓練模型
reg = 0.01
lin_Reg=LinearRegression(labelCol='label', regParam=reg)
lr_model=lin_Reg.fit(train_df)

#印出模型相關係數、殘差
lr_model.intercept
lr_model.coefficients

#把套進模型的train，印出mse,rmse,r2
training_predictions=lr_model.evaluate(train_df)
print("train mse = %f" % (training_predictions.meanSquaredError))
print("train r2 = %f" % (training_predictions.r2))

#把test套進模型，印出mse,rmse,r2
test_results=lr_model.evaluate(test_df)
print("test r2 = %f" % (test_results.r2))
print("test rmse = %f" % (test_results.rootMeanSquaredError))
print("test mse = %f" % (test_results.meanSquaredError))