In this project, we investigate the South Korea  Covid cases. Our preliminary goal is to classify the confirmed cases based on main infection causes and the region of the country. 

Data Source : https://github.com/hyunjoonbok/PySpark/blob/master/data/Case.csv

# Import Libraries and Innitialization

In [26]:
#Import required libraries
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import time
import pandas as pd
import numpy as np
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col
from pyspark.sql.functions import max,min

In [27]:
spark=SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
cases=spark.read.csv('Case.csv',header=True)

# Data Cleaning

In [28]:
#Removing Cases without location information
cases= cases.filter( (cases.longitude != '-') | (cases.latitude !='-'))

In [29]:
cases.show()

+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| case_id|province|           city|group|      infection_case|confirmed| latitude| longitude|
+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| 1000001|   Seoul|     Yongsan-gu| TRUE|       Itaewon Clubs|      139|37.538621|126.992652|
| 1000002|   Seoul|      Gwanak-gu| TRUE|             Richway|      119| 37.48208|126.901384|
| 1000003|   Seoul|        Guro-gu| TRUE| Guro-gu Call Center|       95|37.508163|126.884387|
| 1000004|   Seoul|   Yangcheon-gu| TRUE|Yangcheon Table T...|       43|37.546061|126.874209|
| 1000005|   Seoul|      Dobong-gu| TRUE|     Day Care Center|       43|37.679422|127.044374|
| 1000006|   Seoul|        Guro-gu| TRUE|Manmin Central Ch...|       41|37.481059|126.894343|
| 1000008|   Seoul|  Dongdaemun-gu| TRUE|       Dongan Church|       17|37.592888|127.056766|
| 1000010|   Seoul|      Gwanak-gu| TRUE|     Wangsung Churc

In [30]:
#Case_id  is not useful , lets remove it
cases=cases.drop(cases[' case_id'])

In [31]:
#Lets Row_ ID to our column
cases=cases.withColumn('Row_ID',F.monotonically_increasing_id())

In [32]:
#Rearranging  Column
cases=cases.select ('Row_ID','province','city','infection_case','confirmed','latitude','longitude')

In [33]:
#Finding Number of column with missing location
cases_initial=spark.read.csv('Case.csv',header=True)
print(" Number of cases removed due to the missing location is " ,cases_initial.count()-cases.count())

 Number of cases removed due to the missing location is  109


# Total number of cases and finding cases based on the infection cause

In [34]:
#Total Number of Cases
cases=cases.withColumn("confirmed",F.col("confirmed").astype(IntegerType()))
#print("Number of cases in church is ", sum(church_casess.confirmed).show)
cases.agg({'confirmed': 'sum'}).show()

+--------------+
|sum(confirmed)|
+--------------+
|          6521|
+--------------+



In [35]:
#Number of cases in Church
church_casess=cases.filter(cases['infection_case'].contains('Church'))
church_casess=church_casess.withColumn("confirmed",F.col("confirmed").astype(IntegerType()))
#print("Number of cases in church is ", sum(church_casess.confirmed).show)
church_casess.agg({'confirmed': 'sum'}).show()


+--------------+
|sum(confirmed)|
+--------------+
|          4755|
+--------------+



In [36]:
#Number of Overseas Cases
overseas_cases=cases_initial.filter(cases_initial['infection_case']=='overseas inflow')
overseas_cases=overseas_cases.withColumn("confirmed",F.col("confirmed").astype(IntegerType()))
overseas_cases.agg({'confirmed': 'sum'}).show()

+--------------+
|sum(confirmed)|
+--------------+
|           949|
+--------------+



In [37]:
overseas_cases.show()

+--------+-----------------+----+-----+---------------+---------+--------+---------+
| case_id|         province|city|group| infection_case|confirmed|latitude|longitude|
+--------+-----------------+----+-----+---------------+---------+--------+---------+
| 1000036|            Seoul|   -|FALSE|overseas inflow|      298|       -|        -|
| 1100008|            Busan|   -|FALSE|overseas inflow|       36|       -|        -|
| 1200008|            Daegu|   -|FALSE|overseas inflow|       41|       -|        -|
| 1300003|          Gwangju|   -|FALSE|overseas inflow|       23|       -|        -|
| 1400005|          Incheon|   -|FALSE|overseas inflow|       68|       -|        -|
| 1500008|          Daejeon|   -|FALSE|overseas inflow|       15|       -|        -|
| 1600002|            Ulsan|   -|FALSE|overseas inflow|       25|       -|        -|
| 1700004|           Sejong|   -|FALSE|overseas inflow|        5|       -|        -|
| 2000020|      Gyeonggi-do|   -|FALSE|overseas inflow|      305|

In [38]:
#Finding safe and unsafe Province and Cities
cases = cases.withColumn("confirmed", F.col("confirmed").astype(IntegerType()))
cases2=cases.groupBy('province','city').avg("confirmed")
cases2=cases2.withColumn('Safe', cases2['avg(confirmed)']>10)
cases2=cases2.withColumn('Safe',cases2['Safe'].cast('integer'))
cases2_UnSAFE=cases2.filter(cases2['Safe']==1)
cases2_UnSAFE.count()
cases2.count()

45

In [40]:
#Finding Most Unsafe Places
cases2_UnSAFE=cases2_UnSAFE.orderBy(cases2_UnSAFE['avg(confirmed)'].desc())
cases2_UnSAFE=cases2_UnSAFE.drop(cases2.Safe)
cases2_UnSAFE.show(100)

+-----------------+-------------+------------------+
|         province|         city|    avg(confirmed)|
+-----------------+-------------+------------------+
|            Daegu|       Nam-gu|            4511.0|
|            Daegu| Dalseong-gun|             148.5|
|            Seoul|   Yongsan-gu|             139.0|
|            Daegu|       Seo-gu|             124.0|
| Gyeongsangbuk-do| Cheongdo-gun|             119.0|
|Chungcheongnam-do|   Cheonan-si|             103.0|
|            Seoul|    Gwanak-gu|              74.5|
| Gyeongsangbuk-do|  Bonghwa-gun|              68.0|
|      Gyeonggi-do|   Bucheon-si|              67.0|
|      Gyeonggi-do| Uijeongbu-si|              50.0|
|            Seoul|      Guro-gu|46.333333333333336|
|      Gyeonggi-do|  Seongnam-si|              44.5|
|            Seoul|    Dobong-gu|              43.0|
| Gyeongsangbuk-do|   Yechun-gun|              40.0|
|            Busan|   Dongnae-gu|              39.0|
|            Daegu|      Dong-gu|             

In [41]:
#Unsafest cities defineds as cities with Average Confirmed cases above 100
case2_Unsafest=cases2_UnSAFE.filter(cases2_UnSAFE['avg(confirmed)']>100)
case2_Unsafest.show()

+-----------------+------------+--------------+
|         province|        city|avg(confirmed)|
+-----------------+------------+--------------+
|            Daegu|      Nam-gu|        4511.0|
|            Daegu|Dalseong-gun|         148.5|
|            Seoul|  Yongsan-gu|         139.0|
|            Daegu|      Seo-gu|         124.0|
| Gyeongsangbuk-do|Cheongdo-gun|         119.0|
|Chungcheongnam-do|  Cheonan-si|         103.0|
+-----------------+------------+--------------+



In [42]:
#Finding Unsafest  City
cases_NamGU=cases.filter(cases['city']=='Nam-gu')
cases_NamGU.show()

print('The main reason for high infection in the unsafest city is' , cases_NamGU.first()[3])

+------+--------+------+------------------+---------+--------+---------+
|Row_ID|province|  city|    infection_case|confirmed|latitude|longitude|
+------+--------+------+------------------+---------+--------+---------+
|    27|   Daegu|Nam-gu|Shincheonji Church|     4511|35.84008| 128.5667|
+------+--------+------+------------------+---------+--------+---------+

The main reason for high infection in the unsafest city is Shincheonji Church


# Infection by Region

In [50]:
#Finding  the dimension of North, South, East and West and Center 
min_long, max_long = cases.select(F.min(cases.longitude).cast("float"), F.max(cases.longitude).cast("float")).first()
min_lat, max_lat = cases.select(F.min(cases.latitude).cast("float"), F.max(cases.latitude).cast("float")).first()
diff_long=max_long-min_long
diff_lat=max_lat-min_lat
cen_lat=min_lat+diff_lat/2
cen_long=min_long+diff_long/2

In [55]:
#Adding the Column for Region
cases=cases.withColumn("Region", F.when((cases['longitude'].cast("float")>cen_long) & (cases['latitude'].cast("float")>cen_lat),F.lit('North_East'))
                                .when((cases['longitude'].cast("float")>cen_long) & (cases['latitude'].cast("float")<cen_lat),F.lit('South_East'))
                                .when((cases['longitude'].cast("float")<cen_long) & (cases['latitude'].cast("float")<cen_lat),F.lit('South_West'))
                                .when((cases['longitude'].cast("float")<cen_long) & (cases['latitude'].cast("float")>cen_lat),F.lit('North_West')))
cases.show(500)

+------+-----------------+---------------+--------------------+---------+----------+-----------+----------+
|Row_ID|         province|           city|      infection_case|confirmed|  latitude|  longitude|    Region|
+------+-----------------+---------------+--------------------+---------+----------+-----------+----------+
|     0|            Seoul|     Yongsan-gu|       Itaewon Clubs|      139| 37.538621| 126.992652|North_West|
|     1|            Seoul|      Gwanak-gu|             Richway|      119|  37.48208| 126.901384|North_West|
|     2|            Seoul|        Guro-gu| Guro-gu Call Center|       95| 37.508163| 126.884387|North_West|
|     3|            Seoul|   Yangcheon-gu|Yangcheon Table T...|       43| 37.546061| 126.874209|North_West|
|     4|            Seoul|      Dobong-gu|     Day Care Center|       43| 37.679422| 127.044374|North_West|
|     5|            Seoul|        Guro-gu|Manmin Central Ch...|       41| 37.481059| 126.894343|North_West|
|     6|            Seoul|  

In [54]:
#South East Cases
South_east_cases=cases.filter(cases['Region']=='South_East')
South_east_cases.agg({'confirmed': 'sum'}).show()

+--------------+
|sum(confirmed)|
+--------------+
|          5323|
+--------------+



In [53]:
#NorthEast Cases
North_east_cases=cases.filter(cases['Region']=='North_East')
North_east_cases.agg({'confirmed': 'sum'}).show()

+--------------+
|sum(confirmed)|
+--------------+
|           123|
+--------------+



In [52]:
#Northwest Cases
North_west_cases=cases.filter(cases['Region']=='North_West')
North_west_cases.agg({'confirmed': 'sum'}).show()

+--------------+
|sum(confirmed)|
+--------------+
|          1041|
+--------------+



In [51]:
#Southwest Cases
South_west_cases=cases.filter(cases['Region']=='South_West')
South_west_cases.agg({'confirmed': 'sum'}).show()

+--------------+
|sum(confirmed)|
+--------------+
|            34|
+--------------+

