In [1]:
import pyspark
import pandas as pd
import numpy as np

from pyspark.sql import SparkSession

#In short all the funcitons in mllib library only work on RDDs
from pyspark.mllib.feature import StandardScaler,PCA #Only applicable to RDDs, therefore have to convert the DF in RDD 
from pyspark.mllib.stat import Statistics #Can only be applied on RDDs

#As here we are using ml library therefore, this function can be applied directly to DFs
from pyspark.ml.feature import VectorAssembler

In [2]:
import findspark
findspark.init()

In [3]:
spark = SparkSession.builder \
.master("local") \
.config("spark.driver.extraClassPath","C:/Users/AnshumaanChauhan/Documents/spark-3.3.0-bin-hadoop3/spark-3.3.0-bin-hadoop3/jars/mysql-connector-java-5.1.48.jar") \
.appName("Scalability Check of Systems for ML applications") \
.getOrCreate()

In [4]:
from pyspark.sql.functions import col, count, isnan, when

In [83]:
dataset = spark.read.csv('C:\\Users\AnshumaanChauhan\\Documents\\Systems for DS Umass\\Project\\archive (5)\\DelayedFlights.csv',
                         header=True)

In [67]:
#Checking which column has null or nan values how many times 
dataset.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dataset.columns]).show(vertical=True)

-RECORD 0-------------------
 _c0               | 0      
 Year              | 0      
 Month             | 0      
 DayofMonth        | 0      
 DayOfWeek         | 0      
 DepTime           | 0      
 CRSDepTime        | 0      
 ArrTime           | 7110   
 CRSArrTime        | 0      
 UniqueCarrier     | 0      
 FlightNum         | 0      
 TailNum           | 5      
 ActualElapsedTime | 8387   
 CRSElapsedTime    | 198    
 AirTime           | 8387   
 ArrDelay          | 8387   
 DepDelay          | 0      
 Origin            | 0      
 Dest              | 0      
 Distance          | 0      
 TaxiIn            | 7110   
 TaxiOut           | 455    
 Cancelled         | 0      
 CancellationCode  | 0      
 Diverted          | 0      
 CarrierDelay      | 689270 
 WeatherDelay      | 689270 
 NASDelay          | 689270 
 SecurityDelay     | 689270 
 LateAircraftDelay | 689270 



In [68]:
dataset.select(col("ArrDelay")).where((col("ArrDelay").isNull() | isnan(col("ArrDelay"))) & (col('Diverted')==1)).count()

7754

For all the flights that are diverted their ActualElaspedTime, ArrDelay and AirTime is null, because they did not land at correct destination

In [69]:
dataset.select(*(col(c) for c in dataset.columns)).where((col("TaxiIn").isNull() | isnan(col("TaxiIn")))).count()

7110

In [70]:
dataset.select(*(col(c) for c in dataset.columns)).where((col("TaxiIn").isNull() | isnan(col("TaxiIn"))) & (col("Diverted")==0)).count()

633

In [71]:
dataset.select(*(col(c) for c in dataset.columns)).where((col("TaxiIn").isNull() | isnan(col("TaxiIn"))) & (col("Diverted")==0) & (col("CancellationCode")=='N')).count()

0

As we can infer above the TaxiIn are ArrTime are Null a few times. But it is the case when the flight is either cancelled or diverted. So it is fine if we not exclude the null values

In [72]:
dataset.select(*(col(c) for c in dataset.columns)).where((col("TaxiOut").isNull() | isnan(col("TaxiOut")))).show(vertical=True)

-RECORD 0--------------------
 _c0               | 5484245 
 Year              | 2008    
 Month             | 10      
 DayofMonth        | 25      
 DayOfWeek         | 6       
 DepTime           | 1323.0  
 CRSDepTime        | 1255    
 ArrTime           | null    
 CRSArrTime        | 1442    
 UniqueCarrier     | XE      
 FlightNum         | 2347    
 TailNum           | N26549  
 ActualElapsedTime | null    
 CRSElapsedTime    | 107.0   
 AirTime           | null    
 ArrDelay          | null    
 DepDelay          | 28.0    
 Origin            | CLT     
 Dest              | EWR     
 Distance          | 529     
 TaxiIn            | null    
 TaxiOut           | null    
 Cancelled         | 1       
 CancellationCode  | B       
 Diverted          | 0       
 CarrierDelay      | null    
 WeatherDelay      | null    
 NASDelay          | null    
 SecurityDelay     | null    
 LateAircraftDelay | null    
-RECORD 1--------------------
 _c0               | 5486876 
 Year     

In [73]:
dataset.select(*(col(c) for c in dataset.columns)).where(((col("TaxiOut").isNull() | isnan(col("TaxiOut")))) & (col("CancellationCode")=='N')).show(vertical=True)

(0 rows)



We can infer from the above queries that TaxiOut is null only in the case of a cancelled flight. So it is fine if these are converted to 0s after type casting

In [74]:
dataset.select(*(col(c) for c in dataset.columns)).where((col("CRSElapsedTime").isNull() | isnan(col("CRSElapsedTime")))).show(vertical=True)

-RECORD 0-------------------
 _c0               | 454268 
 Year              | 2008   
 Month             | 1      
 DayofMonth        | 31     
 DayOfWeek         | 4      
 DepTime           | 745.0  
 CRSDepTime        | 715    
 ArrTime           | null   
 CRSArrTime        | 941    
 UniqueCarrier     | 9E     
 FlightNum         | 2001   
 TailNum           | 91469E 
 ActualElapsedTime | null   
 CRSElapsedTime    | null   
 AirTime           | null   
 ArrDelay          | null   
 DepDelay          | 30.0   
 Origin            | ATL    
 Dest              | AUS    
 Distance          | 813    
 TaxiIn            | null   
 TaxiOut           | 15.0   
 Cancelled         | 0      
 CancellationCode  | N      
 Diverted          | 1      
 CarrierDelay      | null   
 WeatherDelay      | null   
 NASDelay          | null   
 SecurityDelay     | null   
 LateAircraftDelay | null   
-RECORD 1-------------------
 _c0               | 454394 
 Year              | 2008   
 Month        

In [75]:
dataset.select(*(col(c) for c in dataset.columns)).where((col("CRSElapsedTime").isNull() | isnan(col("CRSElapsedTime"))) & (col("Diverted")==0)).count()

0

We can infer from the above queries that CRSElapsedTime is null only in the case of a diverted flight. So it is fine if these are converted to 0s after type casting

Last columns have null values if the flight is cancelled instead of getting delayed, we do not drop these columns. Instead when we type cast the column to numeric data type these will be converted to zero which makes sense

In [76]:
dataset.count()

1936758

In [82]:
dataset.select("_c0").distinct().count()

1936758

Means that _c0 column is just for indexing the entreis, therefore we will use this column for joining the 2 datasets

In [85]:
#Have to do some pre processing here as otherwise everything will be sent as Text there 
list_of_columns=dataset.columns
Categorical_columns=['UniqueCarrier','TailNum','Origin','Dest','CancellationCode']
#Because rest everything in minutes or numerical, last columns that are delays for specific reasons are null if the flight is cancelled
#They will be converted to 0 when we do the conversion from string to numeric 

for col_name in Categorical_columns:
    list_of_columns.remove(col_name) 

#Null values will be converted to 0.0
#Adding _c0 for the join operation, so that we have the correct join, and no duplicates are created due to the change of null to 0.0
Categorical_columns.append("_c0")

numeric_dataset= dataset.select(*(col(c).cast('float') for c in list_of_columns))
updated_dataset= numeric_dataset.join(dataset.select(*(col(c) for c in Categorical_columns)),"_c0")

In [None]:
del(numeric_dataset)
del(dataset)
numeric_dataset_dataset.unpersist()
dataset.unpersist()

As the null values are handled during the pre-processing step we do not need to treat them by replacing them with the mean value, or some other technique used for handling the null values. Can also use pyspark.ml.features.Imputer module if have dataset which contains null values. (Best case is drop those rows if not facing with the problem of data insufficiency)

In [87]:
updated_dataset.printSchema()

root
 |-- _c0: float (nullable = true)
 |-- Year: float (nullable = true)
 |-- Month: float (nullable = true)
 |-- DayofMonth: float (nullable = true)
 |-- DayOfWeek: float (nullable = true)
 |-- DepTime: float (nullable = true)
 |-- CRSDepTime: float (nullable = true)
 |-- ArrTime: float (nullable = true)
 |-- CRSArrTime: float (nullable = true)
 |-- FlightNum: float (nullable = true)
 |-- ActualElapsedTime: float (nullable = true)
 |-- CRSElapsedTime: float (nullable = true)
 |-- AirTime: float (nullable = true)
 |-- ArrDelay: float (nullable = true)
 |-- DepDelay: float (nullable = true)
 |-- Distance: float (nullable = true)
 |-- TaxiIn: float (nullable = true)
 |-- TaxiOut: float (nullable = true)
 |-- Cancelled: float (nullable = true)
 |-- Diverted: float (nullable = true)
 |-- CarrierDelay: float (nullable = true)
 |-- WeatherDelay: float (nullable = true)
 |-- NASDelay: float (nullable = true)
 |-- SecurityDelay: float (nullable = true)
 |-- LateAircraftDelay: float (nullable 

In [89]:
#Loading dataset into MySQL 
updated_dataset.select(*(col(c) for c in dataset.columns)).write.format("jdbc") \
.option("url", "jdbc:mysql://localhost:3306/Sys") \
.option("driver", "com.mysql.jdbc.Driver").option("dbtable", "dataset") \
.option("user", "root").option("password", "MySQL").save()

In [28]:
#Loading dataset from MySQL (after doing simple anaysis in MySQL)
updated_dataset = spark.read.format("jdbc") \
.option("url", "jdbc:mysql://localhost:3306/Sys") \
.option("driver", "com.mysql.jdbc.Driver").option("dbtable", "dataset") \
.option("user", "root").option("password", "MySQL").load()

In [6]:
updated_dataset.printSchema()

root
 |-- _c0: double (nullable = true)
 |-- Year: double (nullable = true)
 |-- Month: double (nullable = true)
 |-- DayofMonth: double (nullable = true)
 |-- DayOfWeek: double (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- CRSDepTime: double (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- CRSArrTime: double (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: double (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- ArrDelay: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: double (nullable = true)
 |-- TaxiIn: double (nullable = true)
 |-- TaxiOut: double (nullable = true)
 |-- Cancelled: double (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: double (

For a better formatted description we could use toPandas() function 

In [92]:
updated_dataset.describe().toPandas()

Unnamed: 0,summary,_c0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,...,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay,label
0,count,1936758.0,1936758.0,1936758.0,1936758.0,1936758.0,1936758.0,1936758.0,1929648.0,1936758.0,...,1936303.0,1936758.0,1936758,1936758.0,1247488.0,1247488.0,1247488.0,1247488.0,1247488.0,1936758.0
1,mean,3341651.15150628,2008.0,6.11110629206127,15.753470490376186,3.984827221573372,1518.5341168075722,1467.4726439751378,1610.140628757162,1634.224640868916,...,18.232202811233574,0.00032683484462178546,,0.0040035977649246,19.17939892006977,3.7035706956700185,15.021635478657911,0.0901371395957315,25.296466178432176,0.640007682942319
2,stddev,2066064.9577971771,0.0,3.4825463936659657,8.776272060384185,1.9959662750518496,450.4852547937437,424.76679957727015,548.1781425365509,464.6347119906739,...,14.338534198201804,0.0180756242576747,,0.0631472171101066,43.54620724547411,21.492900484108738,33.8330521636536,2.022714047505632,42.054861520954,0.5050477568478486
3,min,0.0,2008.0,1.0,1.0,1.0,1.0,0.0,1.0,0.0,...,0.0,0.0,A,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,max,7009730.0,2008.0,12.0,31.0,7.0,2400.0,2359.0,2400.0,2400.0,...,422.0,1.0,N,1.0,2436.0,1352.0,1357.0,392.0,1316.0,3.0


In [24]:
updated_dataset.select("Month","Diverted").where(col("Diverted")==1).groupBy("Month").count().sort(col('count').desc()).collect()

[Row(Month=12.0, count=1397),
 Row(Month=6.0, count=1026),
 Row(Month=2.0, count=909),
 Row(Month=7.0, count=774),
 Row(Month=3.0, count=726),
 Row(Month=8.0, count=674),
 Row(Month=1.0, count=612),
 Row(Month=4.0, count=481),
 Row(Month=5.0, count=361),
 Row(Month=11.0, count=321),
 Row(Month=10.0, count=285),
 Row(Month=9.0, count=188)]

In [25]:
updated_dataset.select("Month","Cancelled").where(col("Cancelled")==1).groupBy("Month").count().sort(col('count').desc()).collect()

[Row(Month=12.0, count=480),
 Row(Month=11.0, count=94),
 Row(Month=10.0, count=59)]

In [32]:
updated_dataset.select("Month","label").where(col("label")==1).groupBy("Month").count().sort(col('count').desc()).collect()

[Row(Month=12.0, count=138291),
 Row(Month=6.0, count=133275),
 Row(Month=3.0, count=127628),
 Row(Month=2.0, count=125591),
 Row(Month=1.0, count=117727),
 Row(Month=7.0, count=116394),
 Row(Month=8.0, count=97880),
 Row(Month=4.0, count=94917),
 Row(Month=5.0, count=92081),
 Row(Month=11.0, count=62012),
 Row(Month=10.0, count=55154),
 Row(Month=9.0, count=54062)]

In [6]:
new_updated_dataset=updated_dataset[:100,:]

TypeError: Invalid argument, not a string or column: slice(None, 100, None) of type <class 'slice'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

In [7]:
#Can convert this Dataframe into RDD 
updated_dataset_rdd= updated_dataset.rdd

In [9]:
updated_dataset_rdd.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (10.78.80.241 executor driver): java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Unknown Source)
	at java.io.ByteArrayOutputStream.grow(Unknown Source)
	at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
	at java.io.ByteArrayOutputStream.write(Unknown Source)
	at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
	at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
	at java.io.ObjectOutputStream.writeObject0(Unknown Source)
	at java.io.ObjectOutputStream.writeObject(Unknown Source)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:593)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler$$Lambda$2265/473955585.apply(Unknown Source)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler$$Lambda$2263/1540395592.apply(Unknown Source)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDD$$Lambda$1835/961995452.apply(Unknown Source)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Unknown Source)
	at java.io.ByteArrayOutputStream.grow(Unknown Source)
	at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
	at java.io.ByteArrayOutputStream.write(Unknown Source)
	at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
	at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
	at java.io.ObjectOutputStream.writeObject0(Unknown Source)
	at java.io.ObjectOutputStream.writeObject(Unknown Source)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:593)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more


Everything in form Row(.....),Row(...) and so on. Here we cannot apply any function on the given data, so we need to get the actual data from the RDD, which can be done using the map function

In [10]:
dataset_rdd= updated_dataset_rdd.map(lambda row:row[0:])

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:63788)
Traceback (most recent call last):
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1198, in send_command
    self.socket.sendall(command.encode("utf-8"))
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1201, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending

During handling of the above exception, another exception occurred:

Traceback (most recent call 

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:63788)
Traceback (most recent call last):
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1198, in send_command
    self.socket.sendall(command.encode("utf-8"))
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1201, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending

During handling of the above exception, another exception occurred:

Traceback (most recent call 

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:63788)
Traceback (most recent call last):
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1198, in send_command
    self.socket.sendall(command.encode("utf-8"))
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1201, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending

During handling of the above exception, another exception occurred:

Traceback (most recent call 

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:63788)
Traceback (most recent call last):
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1198, in send_command
    self.socket.sendall(command.encode("utf-8"))
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1201, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending

During handling of the above exception, another exception occurred:

Traceback (most recent call 

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:63788)
Traceback (most recent call last):
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1198, in send_command
    self.socket.sendall(command.encode("utf-8"))
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1201, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending

During handling of the above exception, another exception occurred:

Traceback (most recent call 

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:63788)
Traceback (most recent call last):
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1198, in send_command
    self.socket.sendall(command.encode("utf-8"))
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1201, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending

During handling of the above exception, another exception occurred:

Traceback (most recent call 

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:63788)
Traceback (most recent call last):
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1198, in send_command
    self.socket.sendall(command.encode("utf-8"))
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1201, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending

During handling of the above exception, another exception occurred:

Traceback (most recent call 

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:63788)
Traceback (most recent call last):
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1198, in send_command
    self.socket.sendall(command.encode("utf-8"))
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1201, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending

During handling of the above exception, another exception occurred:

Traceback (most recent call 

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:63788)
Traceback (most recent call last):
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1198, in send_command
    self.socket.sendall(command.encode("utf-8"))
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1201, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending

During handling of the above exception, another exception occurred:

Traceback (most recent call 

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:63788)
Traceback (most recent call last):
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1198, in send_command
    self.socket.sendall(command.encode("utf-8"))
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\AnshumaanChauhan\anaconda3\envs\py_tf_cpu\lib\site-packages\py4j\java_gateway.py", line 1201, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending

During handling of the above exception, another exception occurred:

Traceback (most recent call 

Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:63788)

In [None]:
dataset_rdd.collect()

In [None]:
#Checking the relationship between variables, that is correlation and independence 
pearsonCorrMatrix = Statistics.corr(dataset_rdd,method='pearson')
#To visualize this better we will use Pandas dataframe 
DF_Corr= pd.DataFrame(pearsonCorrMatrix)

In [None]:
#Convert categorical features into numerical columns 

In [None]:
#Drop the redundant features - Diverted and Cancellation
dataset_rdd.drop([''])

In [14]:
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
fig = plt.figure(figsize=(15,15))

In [None]:
#Standard Scaling

features_to_be_scaled = dataset_rdd.drop(['List of features to be not scaled'])
scaler= StandardScaler().fit(features_to_be_scaled)
scaled_features = scaler.transform(features_to_be_scaled)

In [None]:
scaled_features.collect()

In [None]:
#Vector Assembler will create a list for each entry in the RDD
#We will here split the dataset into X and Y, where Y will be the label whether the flight was delayed/cancelled/diverted

input_features= dataset_rdd.drop('label')

#Output of a vector assembler is a single column therefore outputCol instead of outputCols
#Initialzing the VectorAssembler
assembler= VectorAssembler(inputCols=input_features.columns,outputCol="features")

final_dataset = assembler.transform(dataset_rdd)

In [None]:
final_dataset.select("features","label").show(truncated=False)

In [None]:
splits = final_dataset.randomSplit([0.7, 0.3])
train_data = splits[0]
test_data = splits[1]

In [None]:
# X=final_dataset.select('features')
# Y=final_dataset.select('label')

In [None]:
# #Principal Component Analysis

# Pca=PCA(k=4)
# #K should be less than the number of features 
# pca_model = pca.fit(X)
# result= pca_model.transform(X)

In [None]:
# result.collect()

Can see that the result is in form of DenseVectors and we need to convert it in DataFrames

In [None]:
# train_data = result.map(lambda x: (x, )).toDF(["PCA Components"])

In [None]:
# train_data.show(truncate=False)

In [None]:
#Training Linear Regression
from pyspark.ml.regression import LinearRegression
lr_classifier = LinearRegression(featuresCol='features', labelCol='label')
lr_model=lr_classifier.fit(train_data)

In [None]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

In [None]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","MV","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="MV",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

In [None]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)