In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pymongo

if __name__ == '__main__':
    #Creating spark session
    spark = SparkSession.builder.appName("demo").getOrCreate()

    client = pymongo.MongoClient("mongodb://172.18.0.15:27017/neurolabDB")
    # database name
    dataBase = client["pysparkDatabase"]
    # Collection  Name
    collection_case = db["collection_case"]
    collection_region = db["collection_region"]
    collection_timeProvince = db["collection_timeProvince"]

    # Read the data from MongoDB collection and create a DataFrame
    caseDf = spark.createDataFrame(list(collection_case.find()))
    regionDf = spark.createDataFrame(list(collection_region.find()))
    timeProvinceDf = spark.createDataFrame(list(collection_timeProvince.find()))

    # Show the DataFrame
    caseDf.show()
    regionDf.show()
    timeProvinceDf.show()
    
    # create temp view
    caseDf.createOrReplaceTempView("case")
    regionDf.createOrReplaceTempView("region")
    timeProvinceDf.createOrReplaceTempView("timeProvinve")


a. Read the data, show it and Count the number of records.

In [None]:
spark.sql("select * from case").show()
spark.sql("select count(*) as count_rercords from case").show()

b. Describe the data with a describe function.

In [None]:
caseDf.describe().show()

c. If there is any duplicate value drop it.

In [None]:
Df = timeProvinveDf.dropDuplicates(subset='date')
Df.show()

d. Use limit function for showcasing a limited number of
records.


In [None]:
regionDf.limit(2).show()

e. If you find the column name is not suitable, change the
column name.[optional]

In [None]:
RegionDf.withColumnRenamed("nursing_home_count", "nursingHomeCount")

f. Select the subset of the columns.

In [None]:
#In PySpark, you can select a subset of columns from a DataFrame by passing a list of column names to the select method. Here is an example:
columns_to_keep = ["date", "time", "province"]
df = timeProvinceDf.select([col(c) for c in columns_to_keep])

g. If there is any null value, fill it with any random value or drop
it.

In [None]:
# fill the null values in a column using the fillna method
df = caseDf.fillna(0, subset=["longitude"])

#drop the rows with null values using the dropna method
df = caseDf.dropna(subset=["date"])

h. Filter the data based on different columns or variables and
do the best analysis

In [None]:
caseDf.filter(caseDf["group"] = "True")

i. Sort the number of confirmed cases. Confirmed column is
there in the dataset. Check with descending sort also.


In [None]:
caseDf.sort(ascending=False, "Confirmed").show()

j. In case of any wrong data type, cast that data type from
integer to string or string to integer.

In [None]:
# if you have a column named "age" of data type integer and you want to change it to string
df = df.withColumn("age", df["age"].cast(StringType()))

# if you want to change a column from string to integer
df = df.withColumn("age", df["age"].cast(IntegerType()))

k. Use group by on top of province and city column and agg it
with sum of confirmed cases. For example
df.groupBy(["province","city"]).agg(function.sum("co
nfirmed")


In [None]:
grouped_data = caseDf.groupBy(["province","city"]).agg(sum("confirmed"))

l. For joins we will need one more file.you can use region file.
User different different join methods.for example
cases.join(regions, ['province','city'],how='left')
You can do your best analysis.

In [None]:
caseDf.join(regionDf, ['province','city'],how='left')

5. If you want, you can also use SQL with data frames. Let us try to
run some SQL on the cases table.
For example:
cases.registerTempTable('cases_table')
newDF = sqlContext.sql('select * from cases_table where
confirmed>100')
newDF.show()

Here is a example how you can use df for sql now you can perform
various operations with GROUP BY, HAVING, AND ORDER BY

In [None]:
caseDf.createOrReplaceTempView("case")

spark.sql("SELECT * FROM case GROUP BY province HAVING group = 'true' ORDER BY confirmed").show()

6. Create Spark UDFs
Create function casehighlow()
If case is less than 50 return low else return high
convert into a UDF Function and mention the return type of
function.
Note: You can create as many as udf based on analysis

In [None]:
from pyspark.sql.functions import udf

def casehighlow(confirmed_cases):
    if confirmed_cases < 50:
        return "low"
    else:
        return "high"

casehighlow_udf = udf(casehighlow, StringType())