In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q ftp://mirror.klaus-uwe.me/apache/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz

In [3]:
!tar xf spark-2.4.7-bin-hadoop2.7.tgz

In [4]:
!pip install -q findspark

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

In [6]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Test the spark
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])
df.show(3, False)



+-----+
|hello|
+-----+
|world|
|world|
|world|
+-----+
only showing top 3 rows



In [7]:
# Check the pyspark version
import pyspark
print(pyspark.__version__)

2.4.7


In [8]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('cluster').getOrCreate()

In [10]:
from pyspark.ml.clustering import KMeans

dataset = spark.read.csv("raw_data1.csv",header=True,inferSchema=True)

dataset.show()


+--------------+--------------------+--------------+--------------------+-----------+------+--------------------+-----------------+--------------+----------+--------------+--------------------+-----------------------------------------+-----------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+---------+
|Patient_Number|State_Patient Number|Date_Announced|Estimated_Onset_Date|Age_Bracket|Gender|       Detected_City|Detected_District|Detected_State|State_code|Current_Status|               Notes|Contracted_from_which_Patient (Suspected)|Nationality|Type_of_transmission|Status_Change_Date|            Source_1|            Source_2|            Source_3|        Backup_Notes|Num_Cases|
+--------------+--------------------+--------------+--------------------+-----------+------+--------------------+-----------------+--------------+----------+--------------+--------------------+-----------------------------------------+-

In [11]:
type(dataset)

pyspark.sql.dataframe.DataFrame

In [12]:
dataset.head()

Row(Patient_Number='1', State_Patient Number='KL-TS-P1', Date_Announced='30-01-2020', Estimated_Onset_Date=None, Age_Bracket='20', Gender='F', Detected_City='Thrissur', Detected_District='Thrissur', Detected_State='Kerala', State_code='KL', Current_Status='Recovered', Notes='Travelled from Wuhan', Contracted_from_which_Patient (Suspected)=None, Nationality='India', Type_of_transmission='Imported', Status_Change_Date='14-02-2020', Source_1='https://twitter.com/vijayanpinarayi/status/1222819465143832577', Source_2='https://weather.com/en-IN/india/news/news/2020-02-14-kerala-defeats-coronavirus-indias-three-covid-19-patients-successfully', Source_3=None, Backup_Notes='Student from Wuhan', Num_Cases=1)

In [13]:
 dataset.collect()


[Row(Patient_Number='1', State_Patient Number='KL-TS-P1', Date_Announced='30-01-2020', Estimated_Onset_Date=None, Age_Bracket='20', Gender='F', Detected_City='Thrissur', Detected_District='Thrissur', Detected_State='Kerala', State_code='KL', Current_Status='Recovered', Notes='Travelled from Wuhan', Contracted_from_which_Patient (Suspected)=None, Nationality='India', Type_of_transmission='Imported', Status_Change_Date='14-02-2020', Source_1='https://twitter.com/vijayanpinarayi/status/1222819465143832577', Source_2='https://weather.com/en-IN/india/news/news/2020-02-14-kerala-defeats-coronavirus-indias-three-covid-19-patients-successfully', Source_3=None, Backup_Notes='Student from Wuhan', Num_Cases=1),
 Row(Patient_Number='2', State_Patient Number='KL-AL-P1', Date_Announced='02-02-2020', Estimated_Onset_Date=None, Age_Bracket=None, Gender=None, Detected_City='Alappuzha', Detected_District='Alappuzha', Detected_State='Kerala', State_code='KL', Current_Status='Recovered', Notes='Travelled 

In [14]:
dataset.describe().show()

+-------+--------------------+--------------------+--------------+--------------------+------------------+------+-------------+-----------------+--------------------+----------+--------------+--------------------+-----------------------------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|summary|      Patient_Number|State_Patient Number|Date_Announced|Estimated_Onset_Date|       Age_Bracket|Gender|Detected_City|Detected_District|      Detected_State|State_code|Current_Status|               Notes|Contracted_from_which_Patient (Suspected)|         Nationality|Type_of_transmission|Status_Change_Date|            Source_1|            Source_2|            Source_3|        Backup_Notes|          Num_Cases|
+-------+--------------------+--------------------+--------------+--------------------+------------------+------+-------------+-----------------+-------------

In [15]:
from pyspark.ml.feature import StringIndexer

In [16]:
dataset.columns

['Patient_Number',
 'State_Patient Number',
 'Date_Announced',
 'Estimated_Onset_Date',
 'Age_Bracket',
 'Gender',
 'Detected_City',
 'Detected_District',
 'Detected_State',
 'State_code',
 'Current_Status',
 'Notes',
 'Contracted_from_which_Patient (Suspected)',
 'Nationality',
 'Type_of_transmission',
 'Status_Change_Date',
 'Source_1',
 'Source_2',
 'Source_3',
 'Backup_Notes',
 'Num_Cases']

In [17]:
string_indexer = StringIndexer(inputCol ='Patient_Number', outputCol='features')

In [18]:
final_data = string_indexer.fit(dataset).transform(dataset)
final_data.show()

+--------------+--------------------+--------------+--------------------+-----------+------+--------------------+-----------------+--------------+----------+--------------+--------------------+-----------------------------------------+-----------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+---------+--------+
|Patient_Number|State_Patient Number|Date_Announced|Estimated_Onset_Date|Age_Bracket|Gender|       Detected_City|Detected_District|Detected_State|State_code|Current_Status|               Notes|Contracted_from_which_Patient (Suspected)|Nationality|Type_of_transmission|Status_Change_Date|            Source_1|            Source_2|            Source_3|        Backup_Notes|Num_Cases|features|
+--------------+--------------------+--------------+--------------------+-----------+------+--------------------+-----------------+--------------+----------+--------------+--------------------+-------------------------

In [31]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [32]:
dataset.columns

['Patient_Number',
 'State_Patient Number',
 'Date_Announced',
 'Estimated_Onset_Date',
 'Age_Bracket',
 'Gender',
 'Detected_City',
 'Detected_District',
 'Detected_State',
 'State_code',
 'Current_Status',
 'Notes',
 'Contracted_from_which_Patient (Suspected)',
 'Nationality',
 'Type_of_transmission',
 'Status_Change_Date',
 'Source_1',
 'Source_2',
 'Source_3',
 'Backup_Notes',
 'Num_Cases']

In [39]:
dataset.printSchema()

root
 |-- Patient_Number: string (nullable = true)
 |-- State_Patient Number: string (nullable = true)
 |-- Date_Announced: string (nullable = true)
 |-- Estimated_Onset_Date: string (nullable = true)
 |-- Age_Bracket: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Detected_City: string (nullable = true)
 |-- Detected_District: string (nullable = true)
 |-- Detected_State: string (nullable = true)
 |-- State_code: string (nullable = true)
 |-- Current_Status: string (nullable = true)
 |-- Notes: string (nullable = true)
 |-- Contracted_from_which_Patient (Suspected): string (nullable = true)
 |-- Nationality: string (nullable = true)
 |-- Type_of_transmission: string (nullable = true)
 |-- Status_Change_Date: string (nullable = true)
 |-- Source_1: string (nullable = true)
 |-- Source_2: string (nullable = true)
 |-- Source_3: string (nullable = true)
 |-- Backup_Notes: string (nullable = true)
 |-- Num_Cases: integer (nullable = true)



In [80]:
from pyspark.sql.types import IntegerType

dataset = dataset.withColumn('Detected_City',dataset['Detected_City'].cast(IntegerType()))
dataset = dataset.withColumn('Detected_District',dataset['Detected_District'].cast(IntegerType()))
dataset = dataset.withColumn('Detected_State',dataset['Detected_State'].cast(IntegerType()))
dataset = dataset.withColumn('Age_Bracket',dataset['Age_Bracket'].cast(IntegerType()))
dataset = dataset.withColumn('Contracted_from_which_Patient (Suspected)',dataset['Contracted_from_which_Patient (Suspected)'].cast(IntegerType()))


In [81]:
dataset.printSchema()

root
 |-- Patient_Number: string (nullable = true)
 |-- State_Patient Number: string (nullable = true)
 |-- Date_Announced: string (nullable = true)
 |-- Estimated_Onset_Date: string (nullable = true)
 |-- Age_Bracket: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Detected_City: integer (nullable = true)
 |-- Detected_District: integer (nullable = true)
 |-- Detected_State: integer (nullable = true)
 |-- State_code: string (nullable = true)
 |-- Current_Status: string (nullable = true)
 |-- Notes: string (nullable = true)
 |-- Contracted_from_which_Patient (Suspected): integer (nullable = true)
 |-- Nationality: string (nullable = true)
 |-- Type_of_transmission: string (nullable = true)
 |-- Status_Change_Date: string (nullable = true)
 |-- Source_1: string (nullable = true)
 |-- Source_2: string (nullable = true)
 |-- Source_3: string (nullable = true)
 |-- Backup_Notes: string (nullable = true)
 |-- Num_Cases: integer (nullable = true)



In [82]:
dataset.show()

+--------------+--------------------+--------------+--------------------+-----------+------+-------------+-----------------+--------------+----------+--------------+--------------------+-----------------------------------------+-----------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+---------+
|Patient_Number|State_Patient Number|Date_Announced|Estimated_Onset_Date|Age_Bracket|Gender|Detected_City|Detected_District|Detected_State|State_code|Current_Status|               Notes|Contracted_from_which_Patient (Suspected)|Nationality|Type_of_transmission|Status_Change_Date|            Source_1|            Source_2|            Source_3|        Backup_Notes|Num_Cases|
+--------------+--------------------+--------------+--------------------+-----------+------+-------------+-----------------+--------------+----------+--------------+--------------------+-----------------------------------------+-----------+----------

In [97]:
vec_assembler = VectorAssembler(inputCols = ('Age_Bracket','Detected_City','Contracted_from_which_Patient (Suspected)'), outputCol='features')
#vec_assembler = VectorAssembler(inputCols = ('Patient_Number'), outputCol = 'features')

In [84]:
final_data = vec_assembler.transform(dataset)


In [85]:
from pyspark.ml.feature import StandardScaler

In [86]:
scaler = StandardScaler(inputCol ='features', outputCol = 'scaledFeatures', withStd = True, withMean = False)

In [87]:
# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(final_data)

Py4JJavaError: ignored