# Install pyspark library

In [1]:
!pip install pyspark



In [2]:
#import pyspark library
import pyspark

In [3]:
#import spark session library
from pyspark.sql import SparkSession

In [4]:
# Create SparkSession object
spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('linear_regression') \
                    .getOrCreate()

23/11/30 11:11:44 WARN Utils: Your hostname, kaarthiks-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.219 instead (on interface en0)
23/11/30 11:11:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/30 11:11:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Create DataFrame

In [6]:
#To create dataframe form External datasets
df = spark.read.option("header", "true").csv("/Users/kaarthiksekar/Downloads/data/*")

                                                                                

In [7]:
df.show()

23/11/30 11:20:53 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/11/30 11:20:54 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Year, Quarter, Month, DayofMonth, DayOfWeek, FlightDate, UniqueCarrier, AirlineID, Carrier, TailNum, FlightNum, OriginAirportID, OriginAirportSeqID, OriginCityMarketID, Origin, OriginCityName, OriginState, OriginStateFips, OriginStateName, OriginWac, DestAirportID, DestAirportSeqID, DestCityMarketID, Dest, DestCityName, DestState, DestStateFips, DestStateName, DestWac, CRSDepTime, DepTime, DepDelay, DepDelayMinutes, DepDel15, DepartureDelayGroups, DepTimeBlk, TaxiOut, WheelsOff, WheelsOn, TaxiIn, CRSArrTime, ArrTime, ArrDelay, ArrDelayMinutes, ArrDel15, ArrivalDelayGroups, ArrTimeBlk, Cancelled, CancellationCode, Diverted, CRSElapsedTime, ActualElapsedTime, AirTime, Flights, Distance, DistanceGroup, CarrierDe

+----+-------+-----+----------+---------+----------+-------------+---------+-------+-------+---------+---------------+------------------+------------------+------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+----+---------------+---------+-------------+-------------+-------+----------+-------+--------+---------------+--------+--------------------+----------+-------+---------+--------+------+----------+-------+--------+---------------+--------+------------------+----------+---------+----------------+--------+--------------+-----------------+-------+-------+--------+-------------+------------+------------+--------+-------------+-----------------+------------+-------------+---------------+------------------+--------------+--------------------+-----------+-----------+-----------+-------------+----------------+------------+--------------+----------------+-------------+-----------+-----------+-------------+--------------

# Data Preparation

In [8]:
# create new dataframe as per required columns for prediction
AirlineDF = df.select("Origin","Dest","AirTime","Distance")

In [9]:
#cache data in-memory for fast read write operation
AirlineDF.cache()

DataFrame[Origin: string, Dest: string, AirTime: string, Distance: string]

In [10]:
#check data in new dataframe
AirlineDF.show()



+------+----+-------+--------+
|Origin|Dest|AirTime|Distance|
+------+----+-------+--------+
|   JFK| LAX| 338.00| 2475.00|
|   JFK| LAX| 349.00| 2475.00|
|   JFK| LAX| 370.00| 2475.00|
|   JFK| LAX| 350.00| 2475.00|
|   JFK| LAX| 335.00| 2475.00|
|   JFK| LAX| 336.00| 2475.00|
|   JFK| LAX| 380.00| 2475.00|
|   JFK| LAX| 359.00| 2475.00|
|   JFK| LAX| 368.00| 2475.00|
|   JFK| LAX| 356.00| 2475.00|
|   JFK| LAX| 353.00| 2475.00|
|   JFK| LAX| 332.00| 2475.00|
|   JFK| LAX| 339.00| 2475.00|
|   JFK| LAX| 339.00| 2475.00|
|   JFK| LAX| 335.00| 2475.00|
|   JFK| LAX| 340.00| 2475.00|
|   JFK| LAX| 327.00| 2475.00|
|   JFK| LAX| 308.00| 2475.00|
|   JFK| LAX| 315.00| 2475.00|
|   JFK| LAX| 323.00| 2475.00|
+------+----+-------+--------+
only showing top 20 rows



                                                                                

In [11]:
#check data types of each columns
AirlineDF.printSchema()

root
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- Distance: string (nullable = true)



# We need to change data types of columns

In [12]:
#import sql library for data types 
from pyspark.sql.types import IntegerType

In [13]:
AirlineDF = AirlineDF.withColumn("Distance", AirlineDF["Distance"].cast(IntegerType()))

In [14]:
AirlineDF = AirlineDF.withColumn("AirTime", AirlineDF["AirTime"].cast(IntegerType()))

In [15]:
#check data types of each columns
AirlineDF.printSchema()

root
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- AirTime: integer (nullable = true)
 |-- Distance: integer (nullable = true)



# Check is there any null values in DataFrame 

In [16]:
#import libray for sql function col
from pyspark.sql.functions import col

In [17]:
#check null values in each columns
print(AirlineDF.where(col("Origin").isNull()).count())
print(AirlineDF.where(col("Dest").isNull()).count())
print(AirlineDF.where(col("AirTime").isNull()).count())
print(AirlineDF.where(col("Distance").isNull()).count())

0
0
183789
0


# we need to handel null values 

In [18]:
#now drop row as correspondence to null values
AirlineDF = AirlineDF.dropna(subset=["AirTime"])

In [19]:
#check again
print(AirlineDF.where(col("Origin").isNull()).count())
print(AirlineDF.where(col("Dest").isNull()).count())
print(AirlineDF.where(col("AirTime").isNull()).count())
print(AirlineDF.where(col("Distance").isNull()).count())

0
0
0
0


# we need convert distance miles into km

In [20]:
from pyspark.sql.functions import round

# Convert 'mile' to 'km' 
AirlineDF = AirlineDF.withColumn('Distance', round(AirlineDF.Distance * 1.60934, 0))

AirlineDF.show()

+------+----+-------+--------+
|Origin|Dest|AirTime|Distance|
+------+----+-------+--------+
|   JFK| LAX|    338|  3983.0|
|   JFK| LAX|    349|  3983.0|
|   JFK| LAX|    370|  3983.0|
|   JFK| LAX|    350|  3983.0|
|   JFK| LAX|    335|  3983.0|
|   JFK| LAX|    336|  3983.0|
|   JFK| LAX|    380|  3983.0|
|   JFK| LAX|    359|  3983.0|
|   JFK| LAX|    368|  3983.0|
|   JFK| LAX|    356|  3983.0|
|   JFK| LAX|    353|  3983.0|
|   JFK| LAX|    332|  3983.0|
|   JFK| LAX|    339|  3983.0|
|   JFK| LAX|    339|  3983.0|
|   JFK| LAX|    335|  3983.0|
|   JFK| LAX|    340|  3983.0|
|   JFK| LAX|    327|  3983.0|
|   JFK| LAX|    308|  3983.0|
|   JFK| LAX|    315|  3983.0|
|   JFK| LAX|    323|  3983.0|
+------+----+-------+--------+
only showing top 20 rows



In [21]:
AirlineDF.printSchema()

root
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- AirTime: integer (nullable = true)
 |-- Distance: double (nullable = true)



## Verctorize the features

In [22]:
from pyspark.ml.feature import *

In [23]:
from pyspark.ml.feature import VectorAssembler

In [24]:
vectorizer = VectorAssembler()
vectorizer.setInputCols(["Distance"])
vectorizer.setOutputCol("features")

df_vect = vectorizer.transform(AirlineDF)

In [25]:
df_vect.show()

+------+----+-------+--------+--------+
|Origin|Dest|AirTime|Distance|features|
+------+----+-------+--------+--------+
|   JFK| LAX|    338|  3983.0|[3983.0]|
|   JFK| LAX|    349|  3983.0|[3983.0]|
|   JFK| LAX|    370|  3983.0|[3983.0]|
|   JFK| LAX|    350|  3983.0|[3983.0]|
|   JFK| LAX|    335|  3983.0|[3983.0]|
|   JFK| LAX|    336|  3983.0|[3983.0]|
|   JFK| LAX|    380|  3983.0|[3983.0]|
|   JFK| LAX|    359|  3983.0|[3983.0]|
|   JFK| LAX|    368|  3983.0|[3983.0]|
|   JFK| LAX|    356|  3983.0|[3983.0]|
|   JFK| LAX|    353|  3983.0|[3983.0]|
|   JFK| LAX|    332|  3983.0|[3983.0]|
|   JFK| LAX|    339|  3983.0|[3983.0]|
|   JFK| LAX|    339|  3983.0|[3983.0]|
|   JFK| LAX|    335|  3983.0|[3983.0]|
|   JFK| LAX|    340|  3983.0|[3983.0]|
|   JFK| LAX|    327|  3983.0|[3983.0]|
|   JFK| LAX|    308|  3983.0|[3983.0]|
|   JFK| LAX|    315|  3983.0|[3983.0]|
|   JFK| LAX|    323|  3983.0|[3983.0]|
+------+----+-------+--------+--------+
only showing top 20 rows



In [26]:
print(vectorizer.explainParams())

handleInvalid: How to handle invalid data (NULL and NaN values). Options are 'skip' (filter out rows with invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the output). Column lengths are taken from the size of ML Attribute Group, which can be set using `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'). (default: error)
inputCols: input column names. (current: ['Distance'])
outputCol: output column name. (default: VectorAssembler_b901739542d0__output, current: features)


# Train Test data Splitting

In [27]:
flights_train, flights_test = df_vect.randomSplit([0.8, 0.2])

In [28]:
flights_train.show()

[Stage 30:>                                                         (0 + 1) / 1]

+------+----+-------+--------+--------+
|Origin|Dest|AirTime|Distance|features|
+------+----+-------+--------+--------+
|   ABE| CLT|     70|   774.0| [774.0]|
|   ABE| CLT|     72|   774.0| [774.0]|
|   ABE| CLT|     74|   774.0| [774.0]|
|   ABE| CLT|     77|   774.0| [774.0]|
|   ABE| CLT|     77|   774.0| [774.0]|
|   ABE| CLT|     77|   774.0| [774.0]|
|   ABE| CLT|     78|   774.0| [774.0]|
|   ABE| CLT|     79|   774.0| [774.0]|
|   ABE| CLT|     79|   774.0| [774.0]|
|   ABE| CLT|     82|   774.0| [774.0]|
|   ABE| CLT|     82|   774.0| [774.0]|
|   ABE| CLT|     83|   774.0| [774.0]|
|   ABE| CLT|     84|   774.0| [774.0]|
|   ABE| CLT|     84|   774.0| [774.0]|
|   ABE| CLT|     84|   774.0| [774.0]|
|   ABE| CLT|     86|   774.0| [774.0]|
|   ABE| CLT|     87|   774.0| [774.0]|
|   ABE| CLT|     87|   774.0| [774.0]|
|   ABE| CLT|     87|   774.0| [774.0]|
|   ABE| CLT|     88|   774.0| [774.0]|
+------+----+-------+--------+--------+
only showing top 20 rows



                                                                                

# Regression Model Training

In [29]:
from pyspark.ml.regression import LinearRegression

In [30]:
lr = LinearRegression()
print(lr.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
epsilon: The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber (default: 1.35)
featuresCol: features column name. (default: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label)
loss: The loss function to be optimized. Supported options: squaredError, huber. (default: squaredError)
maxBlockSizeInMB: maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. Default 0.0 represents choosing optimal value, depends on specific algorithm. Must be >= 0. (default: 0.0)
maxIter: max number of iterations (>= 0). (default: 100)
predic

In [31]:
lr.setLabelCol("AirTime")
lr.setFeaturesCol("features")
model = lr.fit(flights_train)

23/11/30 11:24:04 WARN Instrumentation: [6a257743] regParam is zero, which might cause numerical instability and overfitting.
23/11/30 11:24:06 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/11/30 11:24:09 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [32]:
type(model)

pyspark.ml.regression.LinearRegressionModel

# View model summary

In [33]:
print("R2:", model.summary.r2)
print("Intercept: ", model.intercept, "Coefficients", model.coefficients)

R2: 0.9581936131096397
Intercept:  18.172679583008737 Coefficients [0.07330813037523519]


# Model Testing 

In [34]:
df_pred = model.transform(flights_test)
df_pred.show()

+------+----+-------+--------+--------+------------------+
|Origin|Dest|AirTime|Distance|features|        prediction|
+------+----+-------+--------+--------+------------------+
|   ABE| CLT|     76|   774.0| [774.0]| 74.91317249344078|
|   ABE| CLT|     77|   774.0| [774.0]| 74.91317249344078|
|   ABE| CLT|     83|   774.0| [774.0]| 74.91317249344078|
|   ABE| CLT|     84|   774.0| [774.0]| 74.91317249344078|
|   ABE| CLT|     89|   774.0| [774.0]| 74.91317249344078|
|   ABE| CLT|     94|   774.0| [774.0]| 74.91317249344078|
|   ABE| PHL|     15|    89.0|  [89.0]|24.697103186404668|
|   ABE| PHL|     17|    89.0|  [89.0]|24.697103186404668|
|   ABE| PHL|     19|    89.0|  [89.0]|24.697103186404668|
|   ABE| PHL|     19|    89.0|  [89.0]|24.697103186404668|
|   ABE| PHL|     19|    89.0|  [89.0]|24.697103186404668|
|   ABE| PHL|     21|    89.0|  [89.0]|24.697103186404668|
|   ABE| PHL|     21|    89.0|  [89.0]|24.697103186404668|
|   ABE| PHL|     21|    89.0|  [89.0]|24.69710318640466

23/11/30 13:40:52 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 122605 ms exceeds timeout 120000 ms
23/11/30 13:40:52 WARN SparkContext: Killing executors is not supported by current scheduler.
23/11/30 13:40:57 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at o