#### Start of the Program
Import Spark Context, Spark Session and Spark Conf

In [73]:
from pyspark import SparkContext, SparkConf

In [74]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
from pyspark.sql.window import Window
from pyspark.sql.functions import monotonically_increasing_id, row_number

##### Create a Spark session with name Spark session
Because we are running it in local mode the master is local otherwise we need to mention Yarn or some other

In [75]:
spark = SparkSession.builder.master("local[*]").appName("PySpark Assignment").getOrCreate()

##### Specify the path where the file is Saved
Since the hadoop cluster is running locally we need to mention the location where the namenode is running, which in this case is localhost:9000
So the final Path will be - 
hdfs://localhost:9000/folder/filename

In [76]:
hdfs_file_path = "hdfs://hdfs:8020/data/time_series_covid19_confirmed_global.csv"

##### Read the file from the location sepcified above.
From Spark session reading the file stored in HDFS location mentioning option "header" as "true" to read the schema as it is. 

In [77]:
fileDf = spark.read.option("header", "true").csv(hdfs_file_path)

##### Create a temp view
Creating a temp view from the dataframe which can be used in Spark.sql

In [62]:
fileDf.createOrReplaceTempView("number_of_cases")

##### Calculate the number of columns

In [63]:
numberOfColumns = len(fileDf.columns)

#### List to store the last 14 days column names

In [64]:
arrayOfLast14DaysColumn = []

#### Loop through to get the latest 14 days column

In [65]:
for i in range(1, 15):
    arrayOfLast14DaysColumn.append(numberOfColumns - i)

#### Get last 14 days data into a dataframe

In [66]:
last14DaysDF = fileDf.select(*(fileDf.columns[i] for i in arrayOfLast14DaysColumn))

#### Showing 5 rows out of the last 14 days dataframe

In [67]:
last14DaysDF.show(5)

+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|8/31/20|8/30/20|8/29/20|8/28/20|8/27/20|8/26/20|8/25/20|8/24/20|8/23/20|8/22/20|8/21/20|8/20/20|8/19/20|8/18/20|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|  38165|  38162|  38143|  38140|  38129|  38113|  38070|  38054|  37999|  37953|  37894|  37856|  37599|  37599|
|   9513|   9380|   9279|   9195|   9083|   8927|   8759|   8605|   8427|   8275|   8119|   7967|   7812|   7654|
|  44494|  44146|  43781|  43403|  43016|  42619|  42228|  41858|  41460|  41068|  40667|  40258|  39847|  39444|
|   1176|   1124|   1124|   1124|   1098|   1098|   1060|   1060|   1045|   1045|   1045|   1024|   1024|   1005|
|   2654|   2624|   2551|   2471|   2415|   2332|   2283|   2222|   2171|   2134|   2068|   2044|   2015|   1966|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------

#### Summing up the values of the last 14 days and saving it into a column named result

In [68]:
resultSumDf = last14DaysDF.withColumn('result', sum(last14DaysDF[col] for col in last14DaysDF.columns))

#### Show 5 rows out of the resultSumDf

In [69]:
resultSumDf.show(5)

+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+
|8/31/20|8/30/20|8/29/20|8/28/20|8/27/20|8/26/20|8/25/20|8/24/20|8/23/20|8/22/20|8/21/20|8/20/20|8/19/20|8/18/20|  result|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+
|  38165|  38162|  38143|  38140|  38129|  38113|  38070|  38054|  37999|  37953|  37894|  37856|  37599|  37599|531876.0|
|   9513|   9380|   9279|   9195|   9083|   8927|   8759|   8605|   8427|   8275|   8119|   7967|   7812|   7654|120995.0|
|  44494|  44146|  43781|  43403|  43016|  42619|  42228|  41858|  41460|  41068|  40667|  40258|  39847|  39444|588289.0|
|   1176|   1124|   1124|   1124|   1098|   1098|   1060|   1060|   1045|   1045|   1045|   1024|   1024|   1005| 15052.0|
|   2654|   2624|   2551|   2471|   2415|   2332|   2283|   2222|   2171|   2134|   2068|   2044|   2015|   1966| 31950.0|
+-------+-------

#### Getting the country details from the main dataframe

In [70]:
countryDF = fileDf.select(col("Lat").alias("latitude"), col("Long").alias("longitude"))

In [71]:
countryDF.show(5)

+--------+---------+
|latitude|longitude|
+--------+---------+
|33.93911|67.709953|
| 41.1533|  20.1683|
| 28.0339|   1.6596|
| 42.5063|   1.5218|
|-11.2027|  17.8739|
+--------+---------+
only showing top 5 rows



#### Add a row index to resultSumDf and countryDF, so that we can Join the dataframes

In [35]:
resultSumDfRowIndex=resultSumDf.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))

In [36]:
countryDFRowIndex = countryDF.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))

#### Joining the dataframes to get the final result

In [37]:
countryDFWithSum = resultSumDfRowIndex.join(countryDFRowIndex, on=["row_index"]).drop("row_index")

#### Show 5 rows from the countryDFWithSum

In [38]:
countryDFWithSum.show(5)

+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+---------+
|8/31/20|8/30/20|8/29/20|8/28/20|8/27/20|8/26/20|8/25/20|8/24/20|8/23/20|8/22/20|8/21/20|8/20/20|8/19/20|8/18/20|  result|latitude|longitude|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+---------+
|  38165|  38162|  38143|  38140|  38129|  38113|  38070|  38054|  37999|  37953|  37894|  37856|  37599|  37599|531876.0|33.93911|67.709953|
|   9513|   9380|   9279|   9195|   9083|   8927|   8759|   8605|   8427|   8275|   8119|   7967|   7812|   7654|120995.0| 41.1533|  20.1683|
|  44494|  44146|  43781|  43403|  43016|  42619|  42228|  41858|  41460|  41068|  40667|  40258|  39847|  39444|588289.0| 28.0339|   1.6596|
|   1176|   1124|   1124|   1124|   1098|   1098|   1060|   1060|   1045|   1045|   1045|   1024|   1024|   1005| 15052.0| 42.5063|   1.5218|
|   26

In [39]:
countryDFWithSum.createOrReplaceTempView("countryDFWithSum")

#### Group by the country name to sum up all the result for a particular country

In [40]:
numberOfCasesPerCountry = spark.sql("select latitude, longitude, sum(result) as total_numner_of_cases from countryDFWithSum group by latitude, longitude")

#### The directory path where we need to store the result

In [56]:
resultLocation = "hdfs://hdfs:8020/data/result"

#### Store the final DF to the result folder as a csv file here coalesce is used to have just one single file instead of partitioned file

In [51]:
numberOfCasesPerCountry.coalesce(1).write.mode('overwrite').option("header", "true").csv(resultLocation)

#### Write the dataframe into the table

In [35]:
numberOfCasesPerCountry.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/stephan_spark") \
    .option("dbtable", "number_of_cases") \
    .option("user", "postgres") \
    .option("password", "root") \
    .option("driver", "org.postgresql.Driver") \
.save()

#### Read the data from the table to verify

In [36]:
database_query_result = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/stephan_spark") \
    .option("dbtable", "number_of_cases") \
    .option("user", "postgres") \
    .option("password", "root") \
    .option("driver", "org.postgresql.Driver") \
.load()

#### Show the result from the table

In [37]:
database_query_result.show()

+---------+----------+---------------------+---------+
| latitude| longitude|total_numner_of_cases|row_index|
+---------+----------+---------------------+---------+
| 33.93911| 67.709953|             531310.0|        1|
|  41.1533|   20.1683|             118981.0|        2|
|  28.0339|    1.6596|             582820.0|        3|
|  42.5063|    1.5218|              14881.0|        4|
| -11.2027|   17.8739|              31231.0|        5|
|  17.0608|  -61.7964|               1314.0|        6|
| -38.4161|  -63.6167|            4909293.0|        7|
|  40.0691|   45.0382|             598732.0|        8|
| -35.4735|  149.0124|               1582.0|        9|
| -33.8688|  151.2093|              55958.0|       10|
| -12.4634|  130.8456|                462.0|       11|
| -27.4698|  153.0251|              15480.0|       12|
| -34.9285|  138.6007|               6476.0|       13|
| -42.8821|  147.3272|               3220.0|       14|
| -37.8136|  144.9631|             256415.0|       15|
| -31.9505