In [36]:
import pyspark
from pyspark.sql import SparkSession

In [37]:
spark = SparkSession.builder\
                    .appName("raw-processing")\
                    .getOrCreate()

In [38]:
spark

In [39]:
spark.sparkContext.setLogLevel("DEBUG")


In [40]:
from pyspark.sql import types as T
from pyspark.sql import functions as F

In [41]:
covid19_scehema = T.StructType([
    T.StructField("Date_reported", T.DateType(), True),
    T.StructField("Country_code", T.StringType(), True),
    T.StructField("Country", T.StringType(), True),
    T.StructField("WHO_region", T.StringType(), True),
    T.StructField("New_cases", T.IntegerType(), True),
    T.StructField("Cumulative_cases", T.IntegerType(), True),
    T.StructField("New_deaths", T.IntegerType(),True),
    T.StructField("Cumulative_deaths", T.IntegerType(), True)
    
])

In [42]:
covid_19_df = spark.read.format("csv")\
                            .option("header",True)\
                            .option("schema", covid19_scehema)\
                            .option("mode","Failfast")\
                                .load("../Raw data/WHO-COVID-19-global-daily-data.csv")

In [43]:
covid_19_df.show(4)

+-------------+------------+----------+----------+---------+----------------+----------+-----------------+
|Date_reported|Country_code|   Country|WHO_region|New_cases|Cumulative_cases|New_deaths|Cumulative_deaths|
+-------------+------------+----------+----------+---------+----------------+----------+-----------------+
|   2020-01-04|          AI|  Anguilla|       AMR|     NULL|               0|      NULL|                0|
|   2020-01-04|          AZ|Azerbaijan|       EUR|     NULL|               0|      NULL|                0|
|   2020-01-04|          BD|Bangladesh|      SEAR|        0|               0|         0|                0|
|   2020-01-04|          BB|  Barbados|       AMR|     NULL|               0|      NULL|                0|
+-------------+------------+----------+----------+---------+----------------+----------+-----------------+
only showing top 4 rows



In [44]:
covid_19_df.printSchema()

root
 |-- Date_reported: string (nullable = true)
 |-- Country_code: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- WHO_region: string (nullable = true)
 |-- New_cases: string (nullable = true)
 |-- Cumulative_cases: string (nullable = true)
 |-- New_deaths: string (nullable = true)
 |-- Cumulative_deaths: string (nullable = true)



In [45]:
droped_duplicated_covid19_df = covid_19_df.dropDuplicates()

In [46]:
dropped_column_covid19_df = droped_duplicated_covid19_df.drop(F.col("Country_code"))

In [47]:
dropped_column_covid19_df.show(4)

+-------------+--------------------+----------+---------+----------------+----------+-----------------+
|Date_reported|             Country|WHO_region|New_cases|Cumulative_cases|New_deaths|Cumulative_deaths|
+-------------+--------------------+----------+---------+----------------+----------+-----------------+
|   2020-01-09|    Marshall Islands|       WPR|        0|               0|         0|                0|
|   2020-01-09|    Papua New Guinea|       WPR|        0|               0|         0|                0|
|   2020-01-14|                Fiji|       WPR|        0|               0|         0|                0|
|   2020-01-18|Northern Mariana ...|       WPR|        0|               0|         0|                0|
+-------------+--------------------+----------+---------+----------------+----------+-----------------+
only showing top 4 rows



In [48]:
renamed_column_covid19_df = dropped_column_covid19_df.withColumnRenamed("Date_reported","date_reported")\
                                                        .withColumnRenamed("Country","country")\
                                                            .withColumnRenamed("WHO_region", "who_region")\
                                                            .withColumnRenamed("New_cases","new_cases")\
                                                            .withColumnRenamed("Cumulative_cases","cumulative_cases")\
                                                            .withColumnRenamed("New_deaths","new_deaths")\
                                                            .withColumnRenamed("Cumulative_deaths","cumulative_deaths")

In [49]:
renamed_column_covid19_df.show(5, truncate=False)

+-------------+------------------------+----------+---------+----------------+----------+-----------------+
|date_reported|country                 |who_region|new_cases|cumulative_cases|new_deaths|cumulative_deaths|
+-------------+------------------------+----------+---------+----------------+----------+-----------------+
|2020-01-09   |Marshall Islands        |WPR       |0        |0               |0         |0                |
|2020-01-09   |Papua New Guinea        |WPR       |0        |0               |0         |0                |
|2020-01-14   |Fiji                    |WPR       |0        |0               |0         |0                |
|2020-01-18   |Northern Mariana Islands|WPR       |0        |0               |0         |0                |
|2020-01-23   |Brunei Darussalam       |WPR       |0        |0               |0         |0                |
+-------------+------------------------+----------+---------+----------------+----------+-----------------+
only showing top 5 rows



In [50]:
renamed_column_covid19_df.printSchema()

root
 |-- date_reported: string (nullable = true)
 |-- country: string (nullable = true)
 |-- who_region: string (nullable = true)
 |-- new_cases: string (nullable = true)
 |-- cumulative_cases: string (nullable = true)
 |-- new_deaths: string (nullable = true)
 |-- cumulative_deaths: string (nullable = true)



In [51]:
type_casted_covid19_df = renamed_column_covid19_df.withColumn("date_reported", F.col("date_reported").cast(T.DateType()))\
                                                  .withColumn("new_cases", F.col("new_cases").cast(T.IntegerType()))\
                                                   .withColumn("cumulative_cases", F.col("cumulative_cases").cast(T.IntegerType()))\
                                                    .withColumn("new_deaths", F.col("new_deaths").cast(T.IntegerType()))\
                                                    .withColumn("cumulative_deaths", F.col("cumulative_deaths").cast(T.IntegerType()))

In [52]:
type_casted_covid19_df.printSchema()

root
 |-- date_reported: date (nullable = true)
 |-- country: string (nullable = true)
 |-- who_region: string (nullable = true)
 |-- new_cases: integer (nullable = true)
 |-- cumulative_cases: integer (nullable = true)
 |-- new_deaths: integer (nullable = true)
 |-- cumulative_deaths: integer (nullable = true)



In [53]:
trimmed_covid19_df = type_casted_covid19_df.withColumn("country", F.trim(F.col("country")))\
                                            .withColumn("who_region", F.trim(F.col("who_region")))\
                                            .withColumn("new_cases", F.abs(F.col("new_cases")))\
                                            .withColumn("cumulative_cases", F.abs(F.col("cumulative_cases")))\
                                            .withColumn("new_deaths", F.abs(F.col("new_deaths")))\
                                            .withColumn("cumulative_deaths", F.abs(F.col("cumulative_deaths")))

In [54]:
trimmed_covid19_df.show(4)

+-------------+--------------------+----------+---------+----------------+----------+-----------------+
|date_reported|             country|who_region|new_cases|cumulative_cases|new_deaths|cumulative_deaths|
+-------------+--------------------+----------+---------+----------------+----------+-----------------+
|   2020-01-09|    Marshall Islands|       WPR|        0|               0|         0|                0|
|   2020-01-09|    Papua New Guinea|       WPR|        0|               0|         0|                0|
|   2020-01-14|                Fiji|       WPR|        0|               0|         0|                0|
|   2020-01-18|Northern Mariana ...|       WPR|        0|               0|         0|                0|
+-------------+--------------------+----------+---------+----------------+----------+-----------------+
only showing top 4 rows



In [55]:
lowercase_covid19_df = trimmed_covid19_df.withColumn("country", F.lower(F.col("country")))\
                                         .withColumn("who_region", F.lower(F.col("who_region")))

In [56]:
lowercase_covid19_df.show(4)

+-------------+--------------------+----------+---------+----------------+----------+-----------------+
|date_reported|             country|who_region|new_cases|cumulative_cases|new_deaths|cumulative_deaths|
+-------------+--------------------+----------+---------+----------------+----------+-----------------+
|   2020-01-09|    marshall islands|       wpr|        0|               0|         0|                0|
|   2020-01-09|    papua new guinea|       wpr|        0|               0|         0|                0|
|   2020-01-14|                fiji|       wpr|        0|               0|         0|                0|
|   2020-01-18|northern mariana ...|       wpr|        0|               0|         0|                0|
+-------------+--------------------+----------+---------+----------------+----------+-----------------+
only showing top 4 rows



In [57]:
fill_values = {"new_cases":0, "cumulative_cases":0,"new_deaths":0, "cumulative_deaths":0}

In [58]:
filledna_covid19_df = lowercase_covid19_df.fillna(fill_values)

In [59]:
filledna_covid19_df.show(1)

+-------------+----------------+----------+---------+----------------+----------+-----------------+
|date_reported|         country|who_region|new_cases|cumulative_cases|new_deaths|cumulative_deaths|
+-------------+----------------+----------+---------+----------------+----------+-----------------+
|   2020-01-09|marshall islands|       wpr|        0|               0|         0|                0|
+-------------+----------------+----------+---------+----------------+----------+-----------------+
only showing top 1 row



In [60]:
# filledna_covid19_df.write.format("csv")\
#                           .mode("overwrite")\
#                           .save("../Processed data/covid19_data")

In [61]:
filledna_covid19_df.rdd.getNumPartitions()


8

In [62]:
import psycopg2

In [68]:
connection = psycopg2.connect(
    host= "localhost",
    database="covid19",
    user = "postgres",
    password = "1234",
    port = 5432
)

In [69]:
cursor = connection.cursor()

In [70]:
cursor.execute("delete from processed_coviD19")
connection.commit()

In [71]:
cursor.execute("""
               CREATE TABLE IF NOT EXISTS processed_coviD19(
                   id serial primary key ,
                   date_reported date,
                   country varchar(255),
                   who_region varchar(255),
                   new_cases int,
                   cumulative_cases int,
                   new_deaths int,
                   cumulative_deaths int                   
                   );
                   """)

In [72]:
connection.commit()

In [73]:
insert_query = """INSERT into processed_covid19(date_reported, country, who_region, new_cases, cumulative_cases, new_deaths, cumulative_deaths)
                VALUES (%s, %s, %s, %s, %s, %s, %s);"""

In [74]:
for row in filledna_covid19_df.collect():
    cursor.execute(insert_query, (
        row['date_reported'],
        row['country'],
        row['who_region'],
        row['new_cases'],
        row['cumulative_cases'],
        row['new_deaths'],
        row['cumulative_deaths']
    ))

In [75]:
connection.commit()

In [76]:
cursor.close()

In [77]:
connection.close()