## Tasks to be performed 
* Read the data, show it and Count the number of records.
* Describe the data with a describe function.
* If there is any duplicate value drop it.
* Use limit function for showcasing a limited number of records.
* If you find the column name is not suitable, change the column name.[optional]
* Select the subset of the columns.
* If there is any null value, fill it with any random value or drop it.
* Filter the data based on different columns or variables and do the best analysis.

For example: We can filter a data frame using multiple
conditions using AND(&), OR(|) and NOT(~) conditions. For
example, we may want to find out all the dif erent

infection_case in Daegu Province with more than 10 confirmed cases.
* Sort the number of confirmed cases. Confirmed column is there in the dataset. Check with descending sort also.
* In case of any wrong data type, cast that data type from integer to string or string to integer.
* Use group by on top of province and city column and agg it with sum of confirmed cases. For example
  df.groupBy(["province","city"]).agg(function.sum("confirmed")

For joins we will need one more file you can use region file.
User different different join methods.for example
cases.join(regions, ['province','city'],how='left')

In [51]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import *

In [52]:
spark = SparkSession.builder.appName("spark-assignment").getOrCreate()

In [53]:
spark

In [54]:
case_df = spark.read.csv(r"C:\Users\Azhar\Desktop\Spark Assignment\Case.csv", header= True, inferSchema=True)
region_df = spark.read.csv(r"C:\Users\Azhar\Desktop\Spark Assignment\Region.csv", header=True, inferSchema=True)
time_prov_df = spark.read.csv(r"C:\Users\Azhar\Desktop\Spark Assignment\TimeProvince.csv", header=True,inferSchema=True)

### Read the data, show it and Count the number of records.

In [55]:
case_df.show(10)

+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| case_id|province|           city|group|      infection_case|confirmed| latitude| longitude|
+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| 1000001|   Seoul|     Yongsan-gu| true|       Itaewon Clubs|      139|37.538621|126.992652|
| 1000002|   Seoul|      Gwanak-gu| true|             Richway|      119| 37.48208|126.901384|
| 1000003|   Seoul|        Guro-gu| true| Guro-gu Call Center|       95|37.508163|126.884387|
| 1000004|   Seoul|   Yangcheon-gu| true|Yangcheon Table T...|       43|37.546061|126.874209|
| 1000005|   Seoul|      Dobong-gu| true|     Day Care Center|       43|37.679422|127.044374|
| 1000006|   Seoul|        Guro-gu| true|Manmin Central Ch...|       41|37.481059|126.894343|
| 1000007|   Seoul|from other city| true|SMR Newly Planted...|       36|        -|         -|
| 1000008|   Seoul|  Dongdaemun-gu| true|       Dongan Churc

In [56]:
region_df.show(10)

+-----+--------+------------+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
| code|province|        city| latitude| longitude|elementary_school_count|kindergarten_count|university_count|academy_ratio|elderly_population_ratio|elderly_alone_ratio|nursing_home_count|
+-----+--------+------------+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
|10000|   Seoul|       Seoul|37.566953|126.977977|                    607|               830|              48|         1.44|                   15.38|                5.8|             22739|
|10010|   Seoul|  Gangnam-gu|37.518421|127.047222|                     33|                38|               0|         4.18|                   13.17|                4.3|              3088|
|10020|   Seoul| Gangdong-gu|37.530492|127.123837|     

In [57]:
time_prov_df.show(10)

+----------+----+-----------+---------+--------+--------+
|      date|time|   province|confirmed|released|deceased|
+----------+----+-----------+---------+--------+--------+
|2020-01-20|  16|      Seoul|        0|       0|       0|
|2020-01-20|  16|      Busan|        0|       0|       0|
|2020-01-20|  16|      Daegu|        0|       0|       0|
|2020-01-20|  16|    Incheon|        1|       0|       0|
|2020-01-20|  16|    Gwangju|        0|       0|       0|
|2020-01-20|  16|    Daejeon|        0|       0|       0|
|2020-01-20|  16|      Ulsan|        0|       0|       0|
|2020-01-20|  16|     Sejong|        0|       0|       0|
|2020-01-20|  16|Gyeonggi-do|        0|       0|       0|
|2020-01-20|  16| Gangwon-do|        0|       0|       0|
+----------+----+-----------+---------+--------+--------+
only showing top 10 rows



In [58]:
case_df.count()

174

In [59]:
region_df.count()

244

In [60]:
time_prov_df.count()

2771

### Describe the data with a describe function.

In [61]:
case_df.describe()

DataFrame[summary: string,  case_id: string, province: string, city: string, infection_case: string, confirmed: string, latitude: string, longitude: string]

In [62]:
region_df.describe()

DataFrame[summary: string, code: string, province: string, city: string, latitude: string, longitude: string, elementary_school_count: string, kindergarten_count: string, university_count: string, academy_ratio: string, elderly_population_ratio: string, elderly_alone_ratio: string, nursing_home_count: string]

In [63]:
time_prov_df.describe()

DataFrame[summary: string, time: string, province: string, confirmed: string, released: string, deceased: string]

### If there is any duplicate value drop it.

In [64]:
case_df1 = case_df.dropDuplicates()

In [65]:
region_df1 = region_df.dropDuplicates()

In [66]:
time_prov_df1 = time_prov_df.dropDuplicates()

In [67]:
case_df1.count()

174

In [68]:
region_df1.count()

244

In [69]:
time_prov_df1.count()

2771

### Use limit function for showcasing a limited number of records.

In [70]:
case_df1_limit = case_df1.limit(12)

In [71]:
case_df1_limit.show()

+--------+-----------------+---------------+-----+--------------------+---------+---------+----------+
| case_id|         province|           city|group|      infection_case|confirmed| latitude| longitude|
+--------+-----------------+---------------+-----+--------------------+---------+---------+----------+
| 1500007|          Daejeon|from other city| true|Seosan-si Laboratory|        2|        -|         -|
| 1500004|          Daejeon|         Seo-gu| true|     Dreaming Church|        4|36.346869|127.368594|
| 3000002|       Gangwon-do|from other city| true|Uijeongbu St. Mar...|       10|        -|         -|
| 1000023|            Seoul|        Jung-gu| true|   KB Life Insurance|       13|37.560899|126.966998|
| 1100007|            Busan|from other city| true|Cheongdo Daenam H...|        1|        -|         -|
| 4100002|Chungcheongnam-do|from other city| true|Door-to-door sale...|       10|        -|         -|
| 5000002|     Jeollabuk-do|from other city| true|Door-to-door sale...|  

### If you find the column name is not suitable, change the column name.[optional]

In [72]:
renamed_case_df1 = case_df1.withColumnRenamed('province', 'country')

In [73]:
renamed_case_df1.show(2)

+--------+-------+---------------+-----+--------------------+---------+---------+----------+
| case_id|country|           city|group|      infection_case|confirmed| latitude| longitude|
+--------+-------+---------------+-----+--------------------+---------+---------+----------+
| 1500007|Daejeon|from other city| true|Seosan-si Laboratory|        2|        -|         -|
| 1500004|Daejeon|         Seo-gu| true|     Dreaming Church|        4|36.346869|127.368594|
+--------+-------+---------------+-----+--------------------+---------+---------+----------+
only showing top 2 rows



### Select the subset of the columns.

In [74]:
case_df1.show(2)

+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| case_id|province|           city|group|      infection_case|confirmed| latitude| longitude|
+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| 1500007| Daejeon|from other city| true|Seosan-si Laboratory|        2|        -|         -|
| 1500004| Daejeon|         Seo-gu| true|     Dreaming Church|        4|36.346869|127.368594|
+--------+--------+---------------+-----+--------------------+---------+---------+----------+
only showing top 2 rows



In [80]:
case_df1.printSchema()

root
 |--  case_id: integer (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- group: boolean (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- confirmed: integer (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



In [85]:
selected_data = case_df1.select(" case_id",'province')
selected_data.show(5)

+--------+----------+
| case_id|  province|
+--------+----------+
| 1500007|   Daejeon|
| 1500004|   Daejeon|
| 3000002|Gangwon-do|
| 1000023|     Seoul|
| 1100007|     Busan|
+--------+----------+
only showing top 5 rows



In [87]:
dropped_case_df1 = case_df1.dropna()

In [88]:
dropped_case_df1.count()

174

In [89]:
case_df1.show(5)

+--------+----------+---------------+-----+--------------------+---------+---------+----------+
| case_id|  province|           city|group|      infection_case|confirmed| latitude| longitude|
+--------+----------+---------------+-----+--------------------+---------+---------+----------+
| 1500007|   Daejeon|from other city| true|Seosan-si Laboratory|        2|        -|         -|
| 1500004|   Daejeon|         Seo-gu| true|     Dreaming Church|        4|36.346869|127.368594|
| 3000002|Gangwon-do|from other city| true|Uijeongbu St. Mar...|       10|        -|         -|
| 1000023|     Seoul|        Jung-gu| true|   KB Life Insurance|       13|37.560899|126.966998|
| 1100007|     Busan|from other city| true|Cheongdo Daenam H...|        1|        -|         -|
+--------+----------+---------------+-----+--------------------+---------+---------+----------+
only showing top 5 rows



In [102]:
filtered_case_df1 = case_df1.filter((case_df1.province == 'Daegu') & (case_df1.confirmed > 10))
filtered_case_df1.show(10)

+--------+--------+------------+-----+--------------------+---------+---------+----------+
| case_id|province|        city|group|      infection_case|confirmed| latitude| longitude|
+--------+--------+------------+-----+--------------------+---------+---------+----------+
| 1200008|   Daegu|           -|false|     overseas inflow|       41|        -|         -|
| 1200005|   Daegu|     Dong-gu| true|     Fatima Hospital|       39| 35.88395|128.624059|
| 1200004|   Daegu|Dalseong-gun| true|Daesil Convalesce...|      101|35.857393|128.466653|
| 1200009|   Daegu|           -|false|contact with patient|      917|        -|         -|
| 1200002|   Daegu|Dalseong-gun| true|Second Mi-Ju Hosp...|      196|35.857375|128.466651|
| 1200010|   Daegu|           -|false|                 etc|      747|        -|         -|
| 1200001|   Daegu|      Nam-gu| true|  Shincheonji Church|     4511| 35.84008|  128.5667|
| 1200003|   Daegu|      Seo-gu| true|Hansarang Convale...|      124|35.885592|128.556649|

In [99]:
filtered_case_df1 = case_df1.filter((case_df1.province != 'Seoul') | (case_df1.confirmed < 5))
filtered_case_df1.show(10)

+--------+-----------------+---------------+-----+--------------------+---------+---------+----------+
| case_id|         province|           city|group|      infection_case|confirmed| latitude| longitude|
+--------+-----------------+---------------+-----+--------------------+---------+---------+----------+
| 1500007|          Daejeon|from other city| true|Seosan-si Laboratory|        2|        -|         -|
| 1500004|          Daejeon|         Seo-gu| true|     Dreaming Church|        4|36.346869|127.368594|
| 3000002|       Gangwon-do|from other city| true|Uijeongbu St. Mar...|       10|        -|         -|
| 1100007|            Busan|from other city| true|Cheongdo Daenam H...|        1|        -|         -|
| 4100002|Chungcheongnam-do|from other city| true|Door-to-door sale...|       10|        -|         -|
| 5000002|     Jeollabuk-do|from other city| true|Door-to-door sale...|        3|        -|         -|
| 1000030|            Seoul|     Gangseo-gu| true|SJ Investment Cal...|  

In [101]:
drop_filter = filtered_case_df1.drop('latitude')
drop_filter.show(2)

+--------+--------+---------------+-----+--------------------+---------+----------+
| case_id|province|           city|group|      infection_case|confirmed| longitude|
+--------+--------+---------------+-----+--------------------+---------+----------+
| 1500007| Daejeon|from other city| true|Seosan-si Laboratory|        2|         -|
| 1500004| Daejeon|         Seo-gu| true|     Dreaming Church|        4|127.368594|
+--------+--------+---------------+-----+--------------------+---------+----------+
only showing top 2 rows



In [109]:
case_df1.orderBy('confirmed').show(10)

+--------+------------+---------------+-----+--------------------+---------+---------+----------+
| case_id|    province|           city|group|      infection_case|confirmed| latitude| longitude|
+--------+------------+---------------+-----+--------------------+---------+---------+----------+
| 1000030|       Seoul|     Gangseo-gu| true|SJ Investment Cal...|        0|37.559649|126.835102|
| 7000002|     Jeju-do|              -|false|contact with patient|        0|        -|         -|
| 3000007|  Gangwon-do|              -|false|contact with patient|        0|        -|         -|
| 1100007|       Busan|from other city| true|Cheongdo Daenam H...|        1|        -|         -|
| 1300005|     Gwangju|              -|false|                 etc|        1|        -|         -|
| 1000025|       Seoul|     Gangnam-gu| true|Gangnam Dongin Ch...|        1|37.522331|127.057388|
| 5000003|Jeollabuk-do|from other city| true|  Shincheonji Church|        1|        -|         -|
| 1700006|      Sejo

In [108]:
case_df1.orderBy(desc('confirmed')).show(10)

+--------+----------------+---------------+-----+--------------------+---------+---------+----------+
| case_id|        province|           city|group|      infection_case|confirmed| latitude| longitude|
+--------+----------------+---------------+-----+--------------------+---------+---------+----------+
| 1200001|           Daegu|         Nam-gu| true|  Shincheonji Church|     4511| 35.84008|  128.5667|
| 1200009|           Daegu|              -|false|contact with patient|      917|        -|         -|
| 1200010|           Daegu|              -|false|                 etc|      747|        -|         -|
| 6000001|Gyeongsangbuk-do|from other city| true|  Shincheonji Church|      566|        -|         -|
| 2000020|     Gyeonggi-do|              -|false|     overseas inflow|      305|        -|         -|
| 1000036|           Seoul|              -|false|     overseas inflow|      298|        -|         -|
| 1200002|           Daegu|   Dalseong-gun| true|Second Mi-Ju Hosp...|      196|35

In [110]:
case_df1.printSchema()

root
 |--  case_id: integer (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- group: boolean (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- confirmed: integer (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



In [112]:
case_df2 = case_df1.withColumn('latitude', col('latitude').cast('integer'))
case_df2.printSchema()

root
 |--  case_id: integer (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- group: boolean (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- confirmed: integer (nullable = true)
 |-- latitude: integer (nullable = true)
 |-- longitude: string (nullable = true)



In [116]:
grouped_df = case_df1.groupBy('province', 'city').agg(sum('confirmed').alias('total_confirmed'))
grouped_df.show()

+----------------+---------------+---------------+
|        province|           city|total_confirmed|
+----------------+---------------+---------------+
|Gyeongsangnam-do|       Jinju-si|              9|
|           Seoul|     Gangnam-gu|             18|
|           Seoul|        Guro-gu|            139|
|    Jeollabuk-do|from other city|              6|
|         Daejeon|              -|            100|
|Gyeongsangnam-do|Changnyeong-gun|              7|
|           Seoul|              -|            561|
|         Jeju-do|from other city|              1|
|Gyeongsangbuk-do|              -|            345|
|Gyeongsangnam-do|   Geochang-gun|             18|
|Gyeongsangbuk-do|        Gumi-si|             10|
|         Incheon|from other city|            117|
|           Busan|              -|             85|
|           Daegu|         Seo-gu|            124|
|     Gyeonggi-do|   Uijeongbu-si|             50|
|           Busan|     Suyeong-gu|              5|
|           Seoul|     Yongsan-

In [117]:
joined_df = case_df1.join(region_df1,['province','city'], how='left')
joined_df.show(10)

+-----------------+---------------+--------+-----+--------------------+---------+---------+----------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
|         province|           city| case_id|group|      infection_case|confirmed| latitude| longitude| code| latitude| longitude|elementary_school_count|kindergarten_count|university_count|academy_ratio|elderly_population_ratio|elderly_alone_ratio|nursing_home_count|
+-----------------+---------------+--------+-----+--------------------+---------+---------+----------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
|          Daejeon|from other city| 1500007| true|Seosan-si Laboratory|        2|        -|         -| null|     null|      null|                   null|              null|            null|       

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def casehighlow(confirmed):
    if confirmed < 50:
        return 'low'
    else:
        return 'high'

casehighlow_udf = udf(casehighlow, StringType())

case_df1 = case_df1.withColumn('case_category', casehighlow_udf(case_df1['confirmed']))

case_df1.show()
