In [1]:
from pyspark.sql import SparkSession
from urllib.parse import quote_plus
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi

### Creating MongoDB Atlas config and Spark session

In [2]:
username = quote_plus('pranjal_tripathi')
password = quote_plus('Pranjal_01')
cluster = 'spark-project.9yp16zi.mongodb.net'

uri = 'mongodb+srv://' + username + ':' + password + '@' + cluster + '/?retryWrites=true&w=majority'

client = MongoClient(uri, server_api=ServerApi('1'))

In [3]:
spark = SparkSession.builder \
    .appName("MongoDB Atlas to Spark") \
    .config("spark.mongodb.input.uri", uri) \
    .config("spark.jars.packages",  "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .getOrCreate()

:: loading settings :: url = jar:file:/home/pranjal/anaconda3/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/pranjal/.ivy2/cache
The jars for the packages stored in: /home/pranjal/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e8a32b3b-687b-42d5-a771-e41bedf89ffd;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 162ms :: artifacts dl 9ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifact

In [4]:
spark

### Reading data from MongoDB Atlas into Spark dataframe

In [5]:
case_df= spark.read.format("com.mongodb.spark.sql").option("collection", "case").option("database", "spark_project").load()

                                                                                

In [6]:
region_df= spark.read.format("com.mongodb.spark.sql").option("collection", "region").option("database", "spark_project").load()

                                                                                

In [7]:
time_province_df= spark.read.format("com.mongodb.spark.sql").option("collection", "time_province").option("database", "spark_project").load()

### Spark Analysis

In [8]:
case_df.describe()

DataFrame[summary: string,  case_id: string, city: string, confirmed: string, infection_case: string, latitude: string, longitude: string, province: string]

In [9]:
case_df.printSchema()

root
 |--  case_id: integer (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- city: string (nullable = true)
 |-- confirmed: integer (nullable = true)
 |-- group: boolean (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- province: string (nullable = true)



In [10]:
case_df.limit(10).show()

[Stage 3:>                                                          (0 + 1) / 1]

+--------+--------------------+---------------+---------+-----+--------------------+---------+----------+--------+
| case_id|                 _id|           city|confirmed|group|      infection_case| latitude| longitude|province|
+--------+--------------------+---------------+---------+-----+--------------------+---------+----------+--------+
|    null|{64567cdd4106c4df...|        Guro-gu|       95| true| Guro-gu Call Center|37.508163|126.884387|   Seoul|
|    null|{64567cdf4106c4df...|   Yangcheon-gu|       43| true|Yangcheon Table T...|37.546061|126.874209|   Seoul|
|    null|{64567ce04106c4df...|      Dobong-gu|       43| true|     Day Care Center|37.679422|127.044374|   Seoul|
|    null|{64567ce04106c4df...|        Guro-gu|       41| true|Manmin Central Ch...|37.481059|126.894343|   Seoul|
|    null|{64567ce04106c4df...|from other city|       36| true|SMR Newly Planted...|        -|         -|   Seoul|
|    null|{64567ce04106c4df...|  Dongdaemun-gu|       17| true|       Dongan Chu

                                                                                

In [11]:
case_df.count()

                                                                                

1041

In [12]:
case_df.na.drop().count()

                                                                                

522

In [13]:
case_df.withColumnRenamed('_id', 'id').limit(5).show()

+--------+--------------------+---------------+---------+-----+--------------------+---------+----------+--------+
| case_id|                  id|           city|confirmed|group|      infection_case| latitude| longitude|province|
+--------+--------------------+---------------+---------+-----+--------------------+---------+----------+--------+
|    null|{64567cdd4106c4df...|        Guro-gu|       95| true| Guro-gu Call Center|37.508163|126.884387|   Seoul|
|    null|{64567cdf4106c4df...|   Yangcheon-gu|       43| true|Yangcheon Table T...|37.546061|126.874209|   Seoul|
|    null|{64567ce04106c4df...|      Dobong-gu|       43| true|     Day Care Center|37.679422|127.044374|   Seoul|
|    null|{64567ce04106c4df...|        Guro-gu|       41| true|Manmin Central Ch...|37.481059|126.894343|   Seoul|
|    null|{64567ce04106c4df...|from other city|       36| true|SMR Newly Planted...|        -|         -|   Seoul|
+--------+--------------------+---------------+---------+-----+-----------------

                                                                                

In [14]:
case_df1= case_df.na.drop()
case_df1.count()

                                                                                

522

In [15]:
case_df1.filter((case_df1.confirmed < 50) & (case_df1.city=='Jongno-gu')).select(case_df1._id, case_df1.city, case_df1.confirmed, case_df1.infection_case).orderBy(case_df1.confirmed.asc()).show()

+--------------------+---------+---------+--------------------+
|                 _id|     city|confirmed|      infection_case|
+--------------------+---------+---------+--------------------+
|{64567d0d4106c4df...|Jongno-gu|        7|Korea Campus Crus...|
|{64567d8c4106c4df...|Jongno-gu|        7|Korea Campus Crus...|
|{64567db74106c4df...|Jongno-gu|        7|Korea Campus Crus...|
|{64567d0c4106c4df...|Jongno-gu|       10|Jongno Community ...|
|{64567d8b4106c4df...|Jongno-gu|       10|Jongno Community ...|
|{64567db64106c4df...|Jongno-gu|       10|Jongno Community ...|
+--------------------+---------+---------+--------------------+



In [16]:
case_df1.groupBy(case_df1.province).agg({'confirmed':'sum'}).show()

[Stage 15:>                                                         (0 + 1) / 1]

+-----------------+--------------+
|         province|sum(confirmed)|
+-----------------+--------------+
|           Sejong|           147|
|            Ulsan|           153|
|Chungcheongbuk-do|           180|
|       Gangwon-do|           186|
|          Gwangju|           129|
| Gyeongsangbuk-do|          3972|
|            Daegu|         20040|
| Gyeongsangnam-do|           396|
|          Incheon|           606|
|          Jeju-do|            57|
|      Gyeonggi-do|          3000|
|            Busan|           468|
|          Daejeon|           393|
|            Seoul|          3840|
|Chungcheongnam-do|           474|
|     Jeollabuk-do|            69|
|     Jeollanam-do|            75|
+-----------------+--------------+



                                                                                

In [17]:
region_df.describe()

DataFrame[summary: string, academy_ratio: string, city: string, code: string, elderly_alone_ratio: string, elderly_population_ratio: string, elementary_school_count: string, kindergarten_count: string, latitude: string, longitude: string, nursing_home_count: string, province: string, university_count: string]

In [18]:
region_df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- academy_ratio: double (nullable = true)
 |-- city: string (nullable = true)
 |-- code: integer (nullable = true)
 |-- elderly_alone_ratio: double (nullable = true)
 |-- elderly_population_ratio: double (nullable = true)
 |-- elementary_school_count: integer (nullable = true)
 |-- kindergarten_count: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- nursing_home_count: integer (nullable = true)
 |-- province: string (nullable = true)
 |-- university_count: integer (nullable = true)



In [19]:
region_df.limit(10).show(truncate= False)

                                                                                

+--------------------------+-------------+------------+-----+-------------------+------------------------+-----------------------+------------------+---------+----------+------------------+--------+----------------+
|_id                       |academy_ratio|city        |code |elderly_alone_ratio|elderly_population_ratio|elementary_school_count|kindergarten_count|latitude |longitude |nursing_home_count|province|university_count|
+--------------------------+-------------+------------+-----+-------------------+------------------------+-----------------------+------------------+---------+----------+------------------+--------+----------------+
|{64577fbf9adfe5637a7927a6}|4.18         |Gangnam-gu  |10010|4.3                |13.17                   |33                     |38                |37.518421|127.047222|3088              |Seoul   |0               |
|{64577fc19adfe5637a7927a7}|1.44         |Seoul       |10000|5.8                |15.38                   |607                    |830   

In [20]:
region_df.count()

                                                                                

732

In [21]:
region_df1= region_df.na.drop()
region_df1.count()

                                                                                

732

In [22]:
region_df1.groupBy(region_df1.province).agg({"elderly_population_ratio":"avg"}).show()

+-----------------+-----------------------------+
|         province|avg(elderly_population_ratio)|
+-----------------+-----------------------------+
|           Sejong|                         9.48|
|            Ulsan|           11.773333333333333|
|Chungcheongbuk-do|           23.192500000000006|
|       Gangwon-do|           22.890526315789472|
|          Gwangju|           14.814999999999998|
| Gyeongsangbuk-do|            27.55624999999999|
|            Daegu|           17.031111111111116|
| Gyeongsangnam-do|           24.749473684210535|
|          Incheon|            16.39909090909091|
|          Jeju-do|                         15.1|
|      Gyeonggi-do|                   14.4296875|
|            Busan|           19.357647058823527|
|          Daejeon|           14.376666666666669|
|            Seoul|            15.73807692307692|
|Chungcheongnam-do|           23.844374999999996|
|            Korea|                        15.67|
|     Jeollabuk-do|           27.470666666666673|


In [23]:
region_df1.select(region_df1._id, region_df1.city, region_df1.university_count).orderBy(region_df1.city.asc(), region_df1.university_count.desc()).show()

[Stage 28:>                                                         (0 + 1) / 1]

+--------------------+-----------+----------------+
|                 _id|       city|university_count|
+--------------------+-----------+----------------+
|{64577ff19adfe563...|  Andong-si|               3|
|{645780289adfe563...|  Andong-si|               3|
|{645780609adfe563...|  Andong-si|               3|
|{64577fd79adfe563...|   Ansan-si|               4|
|{6457800e9adfe563...|   Ansan-si|               4|
|{645780469adfe563...|   Ansan-si|               4|
|{64577fd79adfe563...| Anseong-si|               3|
|{6457800f9adfe563...| Anseong-si|               3|
|{645780469adfe563...| Anseong-si|               3|
|{64577fd79adfe563...|  Anyang-si|               4|
|{6457800f9adfe563...|  Anyang-si|               4|
|{645780469adfe563...|  Anyang-si|               4|
|{64577fe49adfe563...|    Asan-si|               3|
|{6457801c9adfe563...|    Asan-si|               3|
|{645780539adfe563...|    Asan-si|               3|
|{64577fe09adfe563...|  Boeun-gun|               0|
|{645780179a

                                                                                

In [24]:
time_province_df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- confirmed: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- deceased: integer (nullable = true)
 |-- province: string (nullable = true)
 |-- released: integer (nullable = true)
 |-- time: integer (nullable = true)



In [25]:
time_province_df1= time_province_df.na.drop()
time_province_df1.count()

                                                                                

2771

In [26]:
time_province_df1.limit(5).show()

+--------------------+---------+----------+--------+--------+--------+----+
|                 _id|confirmed|      date|deceased|province|released|time|
+--------------------+---------+----------+--------+--------+--------+----+
|{6457acab7924e510...|        0|2020-01-20|       0|   Seoul|       0|  16|
|{6457acad7924e510...|        0|2020-01-20|       0|   Busan|       0|  16|
|{6457acad7924e510...|        0|2020-01-20|       0|   Daegu|       0|  16|
|{6457acad7924e510...|        1|2020-01-20|       0| Incheon|       0|  16|
|{6457acae7924e510...|        0|2020-01-20|       0| Gwangju|       0|  16|
+--------------------+---------+----------+--------+--------+--------+----+



In [35]:
time_province_df1.join(region_df1, on= '_id', how='left').where(time_province_df1.province== 'Seoul').show()

                                                                                

+--------------------+---------+----------+--------+--------+--------+----+-------------+----+----+-------------------+------------------------+-----------------------+------------------+--------+---------+------------------+--------+----------------+
|                 _id|confirmed|      date|deceased|province|released|time|academy_ratio|city|code|elderly_alone_ratio|elderly_population_ratio|elementary_school_count|kindergarten_count|latitude|longitude|nursing_home_count|province|university_count|
+--------------------+---------+----------+--------+--------+--------+----+-------------+----+----+-------------------+------------------------+-----------------------+------------------+--------+---------+------------------+--------+----------------+
|{6457acb97924e510...|        0|2020-01-23|       0|   Seoul|       0|  16|         null|null|null|               null|                    null|                   null|              null|    null|     null|              null|    null|          

In [36]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def casehighlow(case):
    if case < 50:
        return "low"
    else:
        return "high"


In [38]:
casehighlow_udf = udf(casehighlow, StringType())

In [39]:
casehighlow_df = case_df1.withColumn("result", casehighlow_udf(case_df1["confirmed"])).show()

[Stage 87:>                                                         (0 + 1) / 1]

+--------+--------------------+---------------+---------+-----+--------------------+---------+----------+--------+------+
| case_id|                 _id|           city|confirmed|group|      infection_case| latitude| longitude|province|result|
+--------+--------------------+---------------+---------+-----+--------------------+---------+----------+--------+------+
| 1000001|{64567d094106c4df...|     Yongsan-gu|      139| true|       Itaewon Clubs|37.538621|126.992652|   Seoul|  high|
| 1000001|{64567d094106c4df...|     Yongsan-gu|      139| true|       Itaewon Clubs|37.538621|126.992652|   Seoul|  high|
| 1000002|{64567d0a4106c4df...|      Gwanak-gu|      119| true|             Richway| 37.48208|126.901384|   Seoul|  high|
| 1000003|{64567d0a4106c4df...|        Guro-gu|       95| true| Guro-gu Call Center|37.508163|126.884387|   Seoul|  high|
| 1000004|{64567d0a4106c4df...|   Yangcheon-gu|       43| true|Yangcheon Table T...|37.546061|126.874209|   Seoul|   low|
| 1000005|{64567d0a4106c

                                                                                