In [99]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('new_session').getOrCreate()

In [100]:
#reading from csv file
cases = spark.read.load('archive/case.csv', format='csv', sep=',', header='true', inferSchema='true')

In [101]:
#selecting required columns
cases = cases.select(' case_id','province', 'city', 'confirmed')
cases.show()

+--------+--------+---------------+---------+
| case_id|province|           city|confirmed|
+--------+--------+---------------+---------+
| 1000001|   Seoul|     Yongsan-gu|      139|
| 1000002|   Seoul|      Gwanak-gu|      119|
| 1000003|   Seoul|        Guro-gu|       95|
| 1000004|   Seoul|   Yangcheon-gu|       43|
| 1000005|   Seoul|      Dobong-gu|       43|
| 1000006|   Seoul|        Guro-gu|       41|
| 1000007|   Seoul|from other city|       36|
| 1000008|   Seoul|  Dongdaemun-gu|       17|
| 1000009|   Seoul|from other city|       25|
| 1000010|   Seoul|      Gwanak-gu|       30|
| 1000011|   Seoul|   Eunpyeong-gu|       14|
| 1000012|   Seoul|   Seongdong-gu|       13|
| 1000013|   Seoul|      Jongno-gu|       10|
| 1000014|   Seoul|     Gangnam-gu|        7|
| 1000015|   Seoul|        Jung-gu|        7|
| 1000016|   Seoul|   Seodaemun-gu|        5|
| 1000017|   Seoul|      Jongno-gu|        7|
| 1000018|   Seoul|     Gangnam-gu|        6|
| 1000019|   Seoul|from other city

In [102]:
#renaming columns
cases1 = spark.read.csv('archive/case.csv', sep=',', header='true', inferSchema='true')
cases1 = cases1.withColumnRenamed('infection_case', 'infection_source')
cases1.show()

+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| case_id|province|           city|group|    infection_source|confirmed| latitude| longitude|
+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| 1000001|   Seoul|     Yongsan-gu| true|       Itaewon Clubs|      139|37.538621|126.992652|
| 1000002|   Seoul|      Gwanak-gu| true|             Richway|      119| 37.48208|126.901384|
| 1000003|   Seoul|        Guro-gu| true| Guro-gu Call Center|       95|37.508163|126.884387|
| 1000004|   Seoul|   Yangcheon-gu| true|Yangcheon Table T...|       43|37.546061|126.874209|
| 1000005|   Seoul|      Dobong-gu| true|     Day Care Center|       43|37.679422|127.044374|
| 1000006|   Seoul|        Guro-gu| true|Manmin Central Ch...|       41|37.481059|126.894343|
| 1000007|   Seoul|from other city| true|SMR Newly Planted...|       36|        -|         -|
| 1000008|   Seoul|  Dongdaemun-gu| true|       Dongan Churc

In [103]:
#sorting
from pyspark.sql import functions as F
cases.sort(F.desc('confirmed')).show()

+--------+-----------------+---------------+---------+
| case_id|         province|           city|confirmed|
+--------+-----------------+---------------+---------+
| 1200001|            Daegu|         Nam-gu|     4511|
| 1200009|            Daegu|              -|      917|
| 1200010|            Daegu|              -|      747|
| 6000001| Gyeongsangbuk-do|from other city|      566|
| 2000020|      Gyeonggi-do|              -|      305|
| 1000036|            Seoul|              -|      298|
| 1200002|            Daegu|   Dalseong-gun|      196|
| 6000012| Gyeongsangbuk-do|              -|      190|
| 1000037|            Seoul|              -|      162|
| 1000001|            Seoul|     Yongsan-gu|      139|
| 6000013| Gyeongsangbuk-do|              -|      133|
| 1200003|            Daegu|         Seo-gu|      124|
| 1000002|            Seoul|      Gwanak-gu|      119|
| 6000002| Gyeongsangbuk-do|   Cheongdo-gun|      119|
| 4100001|Chungcheongnam-do|     Cheonan-si|      103|
| 1200004|

+--------+--------+---------------+---------+
| case_id|province|           city|confirmed|
+--------+--------+---------------+---------+
| 1000001|   Seoul|     Yongsan-gu|      139|
| 1000002|   Seoul|      Gwanak-gu|      119|
| 1000003|   Seoul|        Guro-gu|       95|
| 1000004|   Seoul|   Yangcheon-gu|       43|
| 1000005|   Seoul|      Dobong-gu|       43|
| 1000006|   Seoul|        Guro-gu|       41|
| 1000007|   Seoul|from other city|       36|
| 1000008|   Seoul|  Dongdaemun-gu|       17|
| 1000009|   Seoul|from other city|       25|
| 1000010|   Seoul|      Gwanak-gu|       30|
| 1000011|   Seoul|   Eunpyeong-gu|       14|
| 1000012|   Seoul|   Seongdong-gu|       13|
| 1000013|   Seoul|      Jongno-gu|       10|
| 1000014|   Seoul|     Gangnam-gu|        7|
| 1000015|   Seoul|        Jung-gu|        7|
| 1000016|   Seoul|   Seodaemun-gu|        5|
| 1000017|   Seoul|      Jongno-gu|        7|
| 1000018|   Seoul|     Gangnam-gu|        6|
| 1000019|   Seoul|from other city

In [105]:
#filter 
#filter different infection_case in Daegu Province with more than 10 confirmed cases
cases.filter((cases.confirmed>10) & (cases.province=='Daegu')).show()

+--------+--------+------------+---------+
| case_id|province|        city|confirmed|
+--------+--------+------------+---------+
| 1200001|   Daegu|      Nam-gu|     4511|
| 1200002|   Daegu|Dalseong-gun|      196|
| 1200003|   Daegu|      Seo-gu|      124|
| 1200004|   Daegu|Dalseong-gun|      101|
| 1200005|   Daegu|     Dong-gu|       39|
| 1200008|   Daegu|           -|       41|
| 1200009|   Daegu|           -|      917|
| 1200010|   Daegu|           -|      747|
+--------+--------+------------+---------+



In [106]:
#groupby
from pyspark.sql import functions as F
cases.groupBy(['province', 'city']).agg(F.sum('confirmed'), F.max('confirmed')).show()

+----------------+---------------+--------------+--------------+
|        province|           city|sum(confirmed)|max(confirmed)|
+----------------+---------------+--------------+--------------+
|Gyeongsangnam-do|       Jinju-si|             9|             9|
|           Seoul|        Guro-gu|           139|            95|
|           Seoul|     Gangnam-gu|            18|             7|
|         Daejeon|              -|           100|            55|
|    Jeollabuk-do|from other city|             6|             3|
|Gyeongsangnam-do|Changnyeong-gun|             7|             7|
|           Seoul|              -|           561|           298|
|         Jeju-do|from other city|             1|             1|
|Gyeongsangbuk-do|              -|           345|           190|
|Gyeongsangnam-do|   Geochang-gun|            18|            10|
|Gyeongsangbuk-do|        Gumi-si|            10|            10|
|         Incheon|from other city|           117|            53|
|           Busan|       

In [107]:
cases.groupBy(['province', 'city']).agg(F.sum('confirmed'), F.mean('confirmed').cast(IntegerType())).show()

+----------------+---------------+--------------+---------------------------+
|        province|           city|sum(confirmed)|CAST(avg(confirmed) AS INT)|
+----------------+---------------+--------------+---------------------------+
|Gyeongsangnam-do|       Jinju-si|             9|                          9|
|           Seoul|        Guro-gu|           139|                         46|
|           Seoul|     Gangnam-gu|            18|                          4|
|         Daejeon|              -|           100|                         25|
|    Jeollabuk-do|from other city|             6|                          2|
|Gyeongsangnam-do|Changnyeong-gun|             7|                          7|
|           Seoul|              -|           561|                        140|
|         Jeju-do|from other city|             1|                          1|
|Gyeongsangbuk-do|              -|           345|                        115|
|Gyeongsangnam-do|   Geochang-gun|            18|               

In [108]:
cases.groupBy(['city']).agg(F.sum('confirmed').alias('totalConfirmed')).show()

+---------------+--------------+
|           city|totalConfirmed|
+---------------+--------------+
|     Gangnam-gu|            18|
|     Cheonan-si|           103|
|from other city|          1217|
|      Anyang-si|            39|
|      Gwanak-gu|           149|
|     Yongsan-gu|           139|
|        Dong-gu|            44|
|         Sejong|            39|
|     Gangseo-gu|             0|
|       Wonju-si|             4|
|     Suyeong-gu|             5|
|   Geochang-gun|            18|
|  Dongdaemun-gu|            17|
|     Dongnae-gu|            39|
|         Jin-gu|             4|
|     Yangsan-si|             3|
|    Changwon-si|             7|
|         Nam-gu|          4511|
|   Gyeongsan-si|            99|
|      Jongno-gu|            17|
+---------------+--------------+
only showing top 20 rows



### Joins

In [109]:
regions = spark.read.csv('archive/Region.csv', sep=',', header='true', inferSchema='true')

cases = cases.join(regions, ['province', 'city'], how='left')
cases.limit(10).toPandas()

Unnamed: 0,province,city,case_id,confirmed,code,latitude,longitude,elementary_school_count,kindergarten_count,university_count,academy_ratio,elderly_population_ratio,elderly_alone_ratio,nursing_home_count
0,Seoul,Yongsan-gu,1000001,139,10210.0,37.532768,126.990021,15.0,13.0,1.0,0.68,16.87,6.5,435.0
1,Seoul,Gwanak-gu,1000002,119,10050.0,37.47829,126.951502,22.0,33.0,1.0,0.89,15.12,4.9,909.0
2,Seoul,Guro-gu,1000003,95,10070.0,37.495632,126.88765,26.0,34.0,3.0,1.0,16.21,5.7,741.0
3,Seoul,Yangcheon-gu,1000004,43,10190.0,37.517189,126.866618,30.0,43.0,0.0,2.26,13.55,5.5,816.0
4,Seoul,Dobong-gu,1000005,43,10100.0,37.668952,127.047082,23.0,26.0,1.0,0.95,17.89,7.2,485.0
5,Seoul,Guro-gu,1000006,41,10070.0,37.495632,126.88765,26.0,34.0,3.0,1.0,16.21,5.7,741.0
6,Seoul,from other city,1000007,36,,,,,,,,,,
7,Seoul,Dongdaemun-gu,1000008,17,10110.0,37.574552,127.039721,21.0,31.0,4.0,1.06,17.26,6.7,832.0
8,Seoul,from other city,1000009,25,,,,,,,,,,
9,Seoul,Gwanak-gu,1000010,30,10050.0,37.47829,126.951502,22.0,33.0,1.0,0.89,15.12,4.9,909.0


### SQL with Dataframes

In [110]:
cases = spark.read.load('archive/case.csv', format='csv', sep=',', header='true', inferSchema='true')
cases = cases.withColumnRenamed(' case_id', 'case_id')
cases = cases.select('province', 'confirmed', 'city')
cases.registerTempTable('cases_table')
case_new_df = spark.sql('select province, count(city), sum(confirmed) from cases_table group by province having sum(confirmed) > 100')
case_new_df.toPandas()



Unnamed: 0,province,count(city),sum(confirmed)
0,Gyeongsangbuk-do,13,1324
1,Daegu,10,6680
2,Gyeongsangnam-do,12,132
3,Incheon,7,202
4,Gyeonggi-do,22,1000
5,Busan,10,156
6,Daejeon,10,131
7,Seoul,38,1280
8,Chungcheongnam-do,8,158


### Working with columns

In [117]:
# Adding columns
#using native spark functions
from pyspark.sql import functions as F
casesWithNewConfirmed = cases.withColumn('NewConfirmed', 10 + F.col('confirmed'))
casesWithNewConfirmed.show()

+--------+---------+---------------+------------+
|province|confirmed|           city|NewConfirmed|
+--------+---------+---------------+------------+
|   Seoul|      139|     Yongsan-gu|         149|
|   Seoul|      119|      Gwanak-gu|         129|
|   Seoul|       95|        Guro-gu|         105|
|   Seoul|       43|   Yangcheon-gu|          53|
|   Seoul|       43|      Dobong-gu|          53|
|   Seoul|       41|        Guro-gu|          51|
|   Seoul|       36|from other city|          46|
|   Seoul|       17|  Dongdaemun-gu|          27|
|   Seoul|       25|from other city|          35|
|   Seoul|       30|      Gwanak-gu|          40|
|   Seoul|       14|   Eunpyeong-gu|          24|
|   Seoul|       13|   Seongdong-gu|          23|
|   Seoul|       10|      Jongno-gu|          20|
|   Seoul|        7|     Gangnam-gu|          17|
|   Seoul|        7|        Jung-gu|          17|
|   Seoul|        5|   Seodaemun-gu|          15|
|   Seoul|        7|      Jongno-gu|          17|


### Windows Functions

In [119]:
timeprovince = spark.read.csv('archive/TimeProvince.csv', sep=',', header='true', inferSchema='true')
timeprovince.show()

+----------+----+-----------------+---------+--------+--------+
|      date|time|         province|confirmed|released|deceased|
+----------+----+-----------------+---------+--------+--------+
|2020-01-20|  16|            Seoul|        0|       0|       0|
|2020-01-20|  16|            Busan|        0|       0|       0|
|2020-01-20|  16|            Daegu|        0|       0|       0|
|2020-01-20|  16|          Incheon|        1|       0|       0|
|2020-01-20|  16|          Gwangju|        0|       0|       0|
|2020-01-20|  16|          Daejeon|        0|       0|       0|
|2020-01-20|  16|            Ulsan|        0|       0|       0|
|2020-01-20|  16|           Sejong|        0|       0|       0|
|2020-01-20|  16|      Gyeonggi-do|        0|       0|       0|
|2020-01-20|  16|       Gangwon-do|        0|       0|       0|
|2020-01-20|  16|Chungcheongbuk-do|        0|       0|       0|
|2020-01-20|  16|Chungcheongnam-do|        0|       0|       0|
|2020-01-20|  16|     Jeollabuk-do|     

In [122]:
from pyspark.sql.window import Window
windowsSpec = Window().partitionBy(['province']).orderBy(F.desc('confirmed'))
cases.withColumn('rank', F.rank().over(windowsSpec)).show()

+-----------------+---------+---------------+----+
|         province|confirmed|           city|rank|
+-----------------+---------+---------------+----+
|            Busan|       39|     Dongnae-gu|   1|
|            Busan|       36|              -|   2|
|            Busan|       30|              -|   3|
|            Busan|       19|              -|   4|
|            Busan|       12|from other city|   5|
|            Busan|        6|    Haeundae-gu|   6|
|            Busan|        5|     Suyeong-gu|   7|
|            Busan|        4|         Jin-gu|   8|
|            Busan|        4|from other city|   8|
|            Busan|        1|from other city|  10|
|Chungcheongbuk-do|       13|              -|   1|
|Chungcheongbuk-do|       11|     Goesan-gun|   2|
|Chungcheongbuk-do|       11|              -|   2|
|Chungcheongbuk-do|        9|from other city|   4|
|Chungcheongbuk-do|        8|              -|   5|
|Chungcheongbuk-do|        6|from other city|   6|
|Chungcheongbuk-do|        2|fr