# Install pyspark library

In [None]:
!pip install pyspark

In [1]:
#import pyspark library
import pyspark

In [2]:
#import spark session library
from pyspark.sql import SparkSession

In [3]:
# Create SparkSession object
spark = SparkSession\
        .builder\
        .appName("K_Means")\
        .getOrCreate()

# Create DataFrame

In [4]:
#To create dataframe form External datasets
df = spark.read.option("header", "true").csv("airlines1.csv")

# Data Preparation

In [5]:
# create new dataframe as per required columns for prediction
AirlineDF = df.select("Origin","Dest","AirTime","Distance")

In [6]:
#cache data in-memory
AirlineDF.cache()

DataFrame[Origin: string, Dest: string, AirTime: string, Distance: string]

In [7]:
AirlineDF.show()

+------+----+-------+--------+
|Origin|Dest|AirTime|Distance|
+------+----+-------+--------+
|   MSP| SLC|  153.0|   991.0|
|   MKE| MCO|  141.0|  1066.0|
|   GJT| DFW|  103.0|   773.0|
|   LAX| DTW|  220.0|  1979.0|
|   EWR| CLT|   80.0|   529.0|
|   DFW| SHV|   28.0|   190.0|
|   BOS| CLE|   94.0|   563.0|
|   ATL| CAE|   35.0|   192.0|
|   ORD| CLE|   59.0|   316.0|
|   MDW| DAL|  114.0|   793.0|
|   SAN| LAX|   null|   109.0|
|   ELP| DAL|   77.0|   562.0|
|   SJU| MIA|   null|  1045.0|
|   ABQ| LAX|   95.0|   677.0|
|   ORD| LGA|   99.0|   733.0|
|   GSO| BWI|   null|   278.0|
|   DTW| MBS|   24.0|    98.0|
|   SLC| SEA|  102.0|   689.0|
|   LAX| IAD|  255.0|  2288.0|
|   SMF| LAX|   null|   373.0|
+------+----+-------+--------+
only showing top 20 rows



In [8]:
#check data types of each columns
AirlineDF.printSchema()

root
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- Distance: string (nullable = true)



In [9]:
from pyspark.sql.types import IntegerType

In [10]:
AirlineDF = AirlineDF.withColumn("Distance", AirlineDF["Distance"].cast(IntegerType()))

In [11]:
AirlineDF = AirlineDF.withColumn("AirTime", AirlineDF["AirTime"].cast(IntegerType()))

In [12]:
AirlineDF.printSchema()

root
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- AirTime: integer (nullable = true)
 |-- Distance: integer (nullable = true)



In [13]:
AirlineDF.show()

+------+----+-------+--------+
|Origin|Dest|AirTime|Distance|
+------+----+-------+--------+
|   MSP| SLC|    153|     991|
|   MKE| MCO|    141|    1066|
|   GJT| DFW|    103|     773|
|   LAX| DTW|    220|    1979|
|   EWR| CLT|     80|     529|
|   DFW| SHV|     28|     190|
|   BOS| CLE|     94|     563|
|   ATL| CAE|     35|     192|
|   ORD| CLE|     59|     316|
|   MDW| DAL|    114|     793|
|   SAN| LAX|   null|     109|
|   ELP| DAL|     77|     562|
|   SJU| MIA|   null|    1045|
|   ABQ| LAX|     95|     677|
|   ORD| LGA|     99|     733|
|   GSO| BWI|   null|     278|
|   DTW| MBS|     24|      98|
|   SLC| SEA|    102|     689|
|   LAX| IAD|    255|    2288|
|   SMF| LAX|   null|     373|
+------+----+-------+--------+
only showing top 20 rows



In [14]:
#check data types of each columns
AirlineDF.printSchema()

root
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- AirTime: integer (nullable = true)
 |-- Distance: integer (nullable = true)



# Check is there any null values in DataFrame 

In [15]:
#import libray for sql function col
from pyspark.sql.functions import col

In [16]:
#check null values in each columns
print(AirlineDF.where(col("Origin").isNull()).count())
print(AirlineDF.where(col("Dest").isNull()).count())
print(AirlineDF.where(col("AirTime").isNull()).count())
print(AirlineDF.where(col("Distance").isNull()).count())

0
0
10529
0


#  Handel null values 

In [17]:
#now drop row as correspondence to null values
AirlineDF = AirlineDF.dropna(subset=["AirTime"])

In [18]:
#check again
print(AirlineDF.where(col("Origin").isNull()).count())
print(AirlineDF.where(col("Dest").isNull()).count())
print(AirlineDF.where(col("AirTime").isNull()).count())

0
0
0


# Convert distance miles into km

In [19]:
from pyspark.sql.functions import round

# Convert 'mile' to 'km' 
AirlineDF1 = AirlineDF.withColumn('Distance', round(AirlineDF.Distance * 1.60934, 0))

AirlineDF1.show()

+------+----+-------+--------+
|Origin|Dest|AirTime|Distance|
+------+----+-------+--------+
|   MSP| SLC|    153|  1595.0|
|   MKE| MCO|    141|  1716.0|
|   GJT| DFW|    103|  1244.0|
|   LAX| DTW|    220|  3185.0|
|   EWR| CLT|     80|   851.0|
|   DFW| SHV|     28|   306.0|
|   BOS| CLE|     94|   906.0|
|   ATL| CAE|     35|   309.0|
|   ORD| CLE|     59|   509.0|
|   MDW| DAL|    114|  1276.0|
|   ELP| DAL|     77|   904.0|
|   ABQ| LAX|     95|  1090.0|
|   ORD| LGA|     99|  1180.0|
|   DTW| MBS|     24|   158.0|
|   SLC| SEA|    102|  1109.0|
|   LAX| IAD|    255|  3682.0|
|   MSY| ORD|    119|  1347.0|
|   SGF| ATL|     85|   906.0|
|   PHX| LGB|     62|   571.0|
|   BUR| SFO|     53|   525.0|
+------+----+-------+--------+
only showing top 20 rows



# Verctorize the features

In [20]:
from pyspark.ml.feature import *

In [21]:
from pyspark.ml.feature import VectorAssembler

In [22]:
vectorizer = VectorAssembler()
vectorizer.setInputCols(["AirTime","Distance"])
vectorizer.setOutputCol("features")

dataset = vectorizer.setHandleInvalid("keep").transform(AirlineDF1)

In [23]:
dataset.show()

+------+----+-------+--------+--------------+
|Origin|Dest|AirTime|Distance|      features|
+------+----+-------+--------+--------------+
|   MSP| SLC|    153|  1595.0|[153.0,1595.0]|
|   MKE| MCO|    141|  1716.0|[141.0,1716.0]|
|   GJT| DFW|    103|  1244.0|[103.0,1244.0]|
|   LAX| DTW|    220|  3185.0|[220.0,3185.0]|
|   EWR| CLT|     80|   851.0|  [80.0,851.0]|
|   DFW| SHV|     28|   306.0|  [28.0,306.0]|
|   BOS| CLE|     94|   906.0|  [94.0,906.0]|
|   ATL| CAE|     35|   309.0|  [35.0,309.0]|
|   ORD| CLE|     59|   509.0|  [59.0,509.0]|
|   MDW| DAL|    114|  1276.0|[114.0,1276.0]|
|   ELP| DAL|     77|   904.0|  [77.0,904.0]|
|   ABQ| LAX|     95|  1090.0| [95.0,1090.0]|
|   ORD| LGA|     99|  1180.0| [99.0,1180.0]|
|   DTW| MBS|     24|   158.0|  [24.0,158.0]|
|   SLC| SEA|    102|  1109.0|[102.0,1109.0]|
|   LAX| IAD|    255|  3682.0|[255.0,3682.0]|
|   MSY| ORD|    119|  1347.0|[119.0,1347.0]|
|   SGF| ATL|     85|   906.0|  [85.0,906.0]|
|   PHX| LGB|     62|   571.0|  [6

# Divide data into multiple clusters

In [24]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [25]:
# Trains a k-means model.
kmeans = KMeans().setK(5).setSeed(1)
model = kmeans.fit(dataset)

In [26]:
# Make predictions
predictions = model.transform(dataset)

In [27]:
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

In [28]:
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.7426365575241805


In [29]:
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[ 49.34561149 449.61860833]
[ 298.26466165 3907.4952381 ]
[ 90.66013693 997.68115253]
[ 207.17976611 2611.4819146 ]
[ 137.96114948 1639.54785397]


In [30]:
predictions.show()

+------+----+-------+--------+--------------+----------+
|Origin|Dest|AirTime|Distance|      features|prediction|
+------+----+-------+--------+--------------+----------+
|   MSP| SLC|    153|  1595.0|[153.0,1595.0]|         4|
|   MKE| MCO|    141|  1716.0|[141.0,1716.0]|         4|
|   GJT| DFW|    103|  1244.0|[103.0,1244.0]|         2|
|   LAX| DTW|    220|  3185.0|[220.0,3185.0]|         3|
|   EWR| CLT|     80|   851.0|  [80.0,851.0]|         2|
|   DFW| SHV|     28|   306.0|  [28.0,306.0]|         0|
|   BOS| CLE|     94|   906.0|  [94.0,906.0]|         2|
|   ATL| CAE|     35|   309.0|  [35.0,309.0]|         0|
|   ORD| CLE|     59|   509.0|  [59.0,509.0]|         0|
|   MDW| DAL|    114|  1276.0|[114.0,1276.0]|         2|
|   ELP| DAL|     77|   904.0|  [77.0,904.0]|         2|
|   ABQ| LAX|     95|  1090.0| [95.0,1090.0]|         2|
|   ORD| LGA|     99|  1180.0| [99.0,1180.0]|         2|
|   DTW| MBS|     24|   158.0|  [24.0,158.0]|         0|
|   SLC| SEA|    102|  1109.0|[

# how to get clustered data

In [31]:
predictions.select("Origin","Dest","AirTime","Distance").where(predictions.prediction=='1').show()

+------+----+-------+--------+
|Origin|Dest|AirTime|Distance|
+------+----+-------+--------+
|   LAX| IAD|    255|  3682.0|
|   HNL| ORD|    428|  6830.0|
|   IAD| LAS|    274|  3323.0|
|   DTW| SFO|    266|  3346.0|
|   EWR| LAX|    326|  3949.0|
|   DTW| SFO|    273|  3346.0|
|   JFK| SFO|    327|  4162.0|
|   SEA| JFK|    280|  3896.0|
|   LAX| LIH|    335|  4208.0|
|   LAS| EWR|    243|  3584.0|
|   SLC| BOS|    247|  3388.0|
|   LAX| IAD|    256|  3682.0|
|   CLE| LAX|    291|  3304.0|
|   ATL| SEA|    281|  3512.0|
|   SFO| JFK|    300|  4162.0|
|   BOS| SFO|    365|  4352.0|
|   SFO| EWR|    284|  4128.0|
|   JFK| LAS|    286|  3618.0|
|   LIH| OAK|    296|  3954.0|
|   PHX| BOS|    261|  3701.0|
+------+----+-------+--------+
only showing top 20 rows

