### Starting out spark session, creating the dataframe, and importing modules

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as sf
import pyspark.sql.types as st
from datetime import date, datetime, timedelta
from functools import reduce
from pyspark.sql.functions import col, mean
from pyspark.sql.types import StringType

spark = SparkSession.builder.getOrCreate()

daily_country_confirmed_cases = "./data/Daily_Country_Wise_Confirmed_Cases.csv"

daily_df = spark.read.csv(daily_country_confirmed_cases,inferSchema=True,header=True)

22/07/11 08:23:07 WARN Utils: Your hostname, DESKTOP-EJLBN3A resolves to a loopback address: 127.0.1.1; using 172.23.191.21 instead (on interface eth0)
22/07/11 08:23:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/07/11 08:23:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Defining functions to check current date and a time delta in weeks function.

In [2]:
def date_check():
    today = date.today()
    return str(today)

def date_minus_weeks(x:int) -> str:
    """
    Returns a timedelta STRING
    """
    return str(date.today() - timedelta(weeks = x))

def column_add(a,b):
     return  a.__add__(b)

### Shows if a country has had at least one case with the most recent recorded date.

In [258]:
most_recent_df = daily_df.select('*').where(daily_df[-1] >= 1)

most_recent_df.toPandas()

Unnamed: 0,Country,2022-05-06,2022-05-12,2022-05-13,2022-05-15,2022-05-17,2022-05-18,2022-05-19,2022-05-20,2022-05-21,...,2022-06-22,2022-06-23,2022-06-24,2022-06-25,2022-06-26,2022-06-27,2022-06-28,2022-06-29,2022-06-30,2022-07-01
0,United States,0,0,0,0,0,1,0,1,0,...,14,17,28,0,0,43,62,45,44,64
1,Germany,0,0,0,0,0,0,1,1,2,...,54,74,52,0,0,145,65,35,106,85
2,Italy,0,0,0,0,0,0,1,2,0,...,0,0,42,0,0,0,32,0,0,33
3,Switzerland,0,0,0,0,0,0,0,0,1,...,5,6,3,0,0,13,4,9,2,8
4,Austria,0,0,0,0,0,0,0,0,0,...,0,0,8,0,0,0,0,0,0,17


### Shows the change over 1 week from the most recent data that have had at least 10 cases last week. Can infer which countries are on the rise with cases.

In [260]:
one_week_change_df = daily_df.select('Country', daily_df[date_minus_weeks(2)],daily_df[date_minus_weeks(1)]).where(daily_df[date_minus_weeks(1)] > 10)
one_week_change_df.toPandas()

Unnamed: 0,Country,2022-06-24,2022-07-01
0,United States,28,64
1,Germany,52,85
2,Italy,42,33
3,Austria,8,17


### Shows the weekly changes in cases starting from 7 weeks ago to last week.


In [261]:
# Shows the weekly changes in cases starting from 7 weeks ago to last week.
weekly_df = daily_df.select(
        'Country',
        daily_df[date_minus_weeks(7)],
        daily_df[date_minus_weeks(6)],
        daily_df[date_minus_weeks(5)],
        daily_df[date_minus_weeks(4)],
        daily_df[date_minus_weeks(3)],
        daily_df[date_minus_weeks(2)],
        daily_df[date_minus_weeks(1)])

weekly_df.toPandas()

Unnamed: 0,Country,2022-05-20,2022-05-27,2022-06-03,2022-06-10,2022-06-17,2022-06-24,2022-07-01
0,England,11,0,15,0,0,107,0
1,Portugal,0,16,5,0,35,20,0
2,Spain,23,21,19,16,1,0,0
3,United States,1,3,5,2,13,28,64
4,Germany,1,6,17,12,87,52,85
5,Belgium,2,0,3,0,10,0,0
6,Sweden,0,0,0,1,0,0,0
7,Italy,2,0,1,1,23,42,33
8,Canada,3,0,3,3,10,0,0
9,France,0,0,18,0,0,0,0


### Worldwide total, sum of all.

In [264]:
agg1_df = daily_df.drop('Country')
agg1_df = daily_df.withColumn('total', reduce(column_add, (agg1_df[col] for col in agg1_df.columns)))
agg1_df = agg1_df.groupBy().sum()

agg1_df.toPandas()

Unnamed: 0,sum(2022-05-06),sum(2022-05-12),sum(2022-05-13),sum(2022-05-15),sum(2022-05-17),sum(2022-05-18),sum(2022-05-19),sum(2022-05-20),sum(2022-05-21),sum(2022-05-23),...,sum(2022-06-23),sum(2022-06-24),sum(2022-06-25),sum(2022-06-26),sum(2022-06-27),sum(2022-06-28),sum(2022-06-29),sum(2022-06-30),sum(2022-07-01),sum(total)
0,1,1,1,4,3,21,16,46,16,73,...,436,329,3,168,367,366,147,816,207,6178


### Create lists to define regions

In [3]:
africa = ["Algeria", "Angola", "Benin", "Botswana", "Burkina Faso", "Burundi", "Cameroon", "Cape Verde", "Central African Republic","Chad", "Comoros", "Cote d'Ivoire", "Democratic Republic of the Congo", "Djibouti", "Egypt", "Equatorial Guinea", "Eritrea","Ethiopia", "Gabon", "Gambia", "Ghana", "Guinea", "Guinea-Bissau", "Kenya", "Lesotho", "Liberia", "Libya", "Madagascar", "Malawi","Mali", "Mauritania", "Mauritius", "Morocco", "Mozambique", "Namibia", "Niger", "Nigeria", "Republic of the Congo", "Reunion","Rwanda", "Saint Helena", "Sao Tome and Principe", "Senegal", "Seychelles", "Sierra Leone", "Somalia", "South Africa", "South Sudan", "Sudan", "Swaziland", "Tanzania", "Togo", "Tunisia", "Uganda", "Western Sahara", "Zambia", "Zimbabwe"]
asia = ["Afghanistan", "Armenia", "Azerbaijan", "Bahrain", "Bangladesh", "Bhutan", "Brunei", "Burma", "Cambodia", "China", "Cyprus","East Timor", "Georgia", "Hong Kong", "India", "Indonesia", "Iran", "Iraq", "Israel", "Japan", "Jordan", "Kazakhstan", "Kuwait","Kyrgyzstan", "Laos", "Lebanon", "Macau", "Malaysia", "Maldives", "Mongolia", "Nepal", "North Korea", "Oman", "Pakistan", "Philippines","Qatar", "Saudi Arabia", "Singapore", "South Korea", "Sri Lanka", "Syria", "Taiwan", "Tajikistan", "Thailand", "Turkey", "Turkmenistan","United Arab Emirates", "Uzbekistan", "Vietnam", "Yemen"]
caribbean = ["Anguilla", "Antigua and Barbuda", "Aruba", "The Bahamas", "Barbados", "Bermuda", "British Virgin Islands", "Cayman Islands","Cuba", "Dominica", "Dominican Republic", "Grenada", "Guadeloupe", "Haiti", "Jamaica", "Martinique", "Montserrat", "Netherlands Antilles","Puerto Rico", "Saint Kitts and Nevis", "Saint Lucia", "Saint Vincent and the Grenadines", "Trinidad and Tobago", "Turks and Caicos Islands", "U.S. Virgin Islands"]
central_america = ["Belize", "Costa Rica", "El Salvador", "Guatemala", "Honduras", "Nicaragua", "Panama"]
europe = ["Albania", "Andorra", "Austria", "Belarus", "Belgium", "Bosnia and Herzegovina", "Bulgaria", "Croatia", "Czech Republic", "Denmark", "England", "Estonia", "Finland", "France", "Germany", "Gibraltar", "Greece", "Holy See", "Hungary", "Iceland", "Ireland", "Italy", "Kosovo", "Latvia","Liechtenstein", "Lithuania", "Luxembourg", "Macedonia", "Malta", "Moldova", "Monaco", "Montenegro", "Netherlands", "Norway", "Poland", "Portugal", "Romania", "Russia", "San Marino", "Slovak Republic", "Slovenia", "Spain", "Serbia", "Serbia and Montenegro", "Sweden", "Switzerland", "Ukraine", "United Kingdom", "Northern Ireland", "Wales"]
north_america = ["Canada", "Greenland", "Mexico", "Saint Pierre and Miquelon", "United States"]
oceania = ["American Samoa", "Australia", "Christmas Island", "Cocos (Keeling) Islands", "Cook Islands", "Federated States of Micronesia", "Fiji", "French Polynesia", "Guam", "Kiribati", "Marshall Islands", "Nauru", "New Caledonia", "New Zealand", "Niue", "Northern Mariana Islands", "Palau", "Papua New Guinea", "Pitcairn Islands","Samoa", "Solomon Islands", "Tokelau", "Tonga", "Tuvalu", "Vanuatu", "Wallis and Futuna Islands"]
south_america = ["Argentina", "Bolivia", "Brazil", "Chile", "Colombia", "Ecuador", "Falkland Islands", "French Guiana", "Guyana", "Paraguay", "Peru", "Suriname", "Uruguay","Venezuela"]

## Create Region dataframes and total daily totals

## Asia

In [4]:
asia_df = daily_df.filter(daily_df['Country'].isin(asia))

asia_df = asia_df.agg(*[sf.sum(asia_df[c_name]) for c_name in asia_df.columns])

asia_df = asia_df.withColumn('sum(Country)', col('sum(Country)').cast(StringType()))

asia_df = asia_df.na.fill(value='asia_totals')

asia_df.toPandas()

22/07/11 08:24:19 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,sum(Country),sum(2022-05-06),sum(2022-05-12),sum(2022-05-13),sum(2022-05-15),sum(2022-05-17),sum(2022-05-18),sum(2022-05-19),sum(2022-05-20),sum(2022-05-21),...,sum(2022-06-21),sum(2022-06-22),sum(2022-06-23),sum(2022-06-24),sum(2022-06-25),sum(2022-06-26),sum(2022-06-27),sum(2022-06-28),sum(2022-06-29),sum(2022-06-30)
0,asia_totals,0,0,0,0,0,0,0,0,1,...,6,3,0,4,0,0,13,4,5,1


### Africa

In [5]:
africa_df = daily_df.where(daily_df['Country'].isin(africa))

africa_df = africa_df.agg(*[sf.sum(africa_df[c_name])for c_name in africa_df.columns])

africa_df = africa_df.withColumn('sum(Country)', col('sum(Country)').cast(StringType()))

africa_df = africa_df.na.fill(value='africa_totals')

africa_df.toPandas()

Unnamed: 0,sum(Country),sum(2022-05-06),sum(2022-05-12),sum(2022-05-13),sum(2022-05-15),sum(2022-05-17),sum(2022-05-18),sum(2022-05-19),sum(2022-05-20),sum(2022-05-21),...,sum(2022-06-21),sum(2022-06-22),sum(2022-06-23),sum(2022-06-24),sum(2022-06-25),sum(2022-06-26),sum(2022-06-27),sum(2022-06-28),sum(2022-06-29),sum(2022-06-30)
0,africa_totals,0,0,0,0,0,0,0,0,0,...,0,1,0,0,0,0,0,1,0,0


### Caribbean

In [6]:
caribbean_df = daily_df.where(daily_df['Country'].isin(caribbean))

caribbean_df = caribbean_df.agg(*[sf.sum(caribbean_df[c_name])for c_name in caribbean_df.columns])

caribbean_df = caribbean_df.withColumn('sum(Country)', col('sum(Country)').cast(StringType()))

caribbean_df = caribbean_df.na.fill(value='caribbean_totals')

caribbean_df.toPandas()

Unnamed: 0,sum(Country),sum(2022-05-06),sum(2022-05-12),sum(2022-05-13),sum(2022-05-15),sum(2022-05-17),sum(2022-05-18),sum(2022-05-19),sum(2022-05-20),sum(2022-05-21),...,sum(2022-06-21),sum(2022-06-22),sum(2022-06-23),sum(2022-06-24),sum(2022-06-25),sum(2022-06-26),sum(2022-06-27),sum(2022-06-28),sum(2022-06-29),sum(2022-06-30)
0,caribbean_totals,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,1,0


### Central America

In [7]:
central_america_df = daily_df.where(daily_df['Country'].isin(central_america))

central_america_df = central_america_df.agg(*[sf.sum(central_america_df[c_name])for c_name in central_america_df.columns])

central_america_df = central_america_df.withColumn('sum(Country)', col('sum(Country)').cast(StringType()))

central_america_df = central_america_df.na.fill(value='caribbean_totals')

central_america_df = central_america_df.na.fill(value=0)

central_america_df.toPandas()

Unnamed: 0,sum(Country),sum(2022-05-06),sum(2022-05-12),sum(2022-05-13),sum(2022-05-15),sum(2022-05-17),sum(2022-05-18),sum(2022-05-19),sum(2022-05-20),sum(2022-05-21),...,sum(2022-06-21),sum(2022-06-22),sum(2022-06-23),sum(2022-06-24),sum(2022-06-25),sum(2022-06-26),sum(2022-06-27),sum(2022-06-28),sum(2022-06-29),sum(2022-06-30)
0,caribbean_totals,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


### Europe

In [8]:
europe_df = daily_df.where(daily_df['Country'].isin(europe))

europe_df = europe_df.agg(*[sf.sum(europe_df[c_name])for c_name in europe_df.columns])

europe_df = europe_df.withColumn('sum(Country)', col('sum(Country)').cast(StringType()))

europe_df = europe_df.na.fill(value='europe_totals')

europe_df.toPandas()

Unnamed: 0,sum(Country),sum(2022-05-06),sum(2022-05-12),sum(2022-05-13),sum(2022-05-15),sum(2022-05-17),sum(2022-05-18),sum(2022-05-19),sum(2022-05-20),sum(2022-05-21),...,sum(2022-06-21),sum(2022-06-22),sum(2022-06-23),sum(2022-06-24),sum(2022-06-25),sum(2022-06-26),sum(2022-06-27),sum(2022-06-28),sum(2022-06-29),sum(2022-06-30)
0,europe_totals,1,1,1,4,3,20,14,40,15,...,202,146,383,288,1,165,289,273,79,106


### North America

In [9]:
north_america_df = daily_df.where(daily_df['Country'].isin(north_america))

north_america_df = north_america_df.agg(*[sf.sum(north_america_df[c_name])for c_name in north_america_df.columns])

north_america_df = north_america_df.withColumn('sum(Country)', col('sum(Country)').cast(StringType()))

north_america_df = north_america_df.na.fill(value='north_america_totals')

north_america_df.toPandas()

Unnamed: 0,sum(Country),sum(2022-05-06),sum(2022-05-12),sum(2022-05-13),sum(2022-05-15),sum(2022-05-17),sum(2022-05-18),sum(2022-05-19),sum(2022-05-20),sum(2022-05-21),...,sum(2022-06-21),sum(2022-06-22),sum(2022-06-23),sum(2022-06-24),sum(2022-06-25),sum(2022-06-26),sum(2022-06-27),sum(2022-06-28),sum(2022-06-29),sum(2022-06-30)
0,north_america_totals,0,0,0,0,0,1,2,4,0,...,62,14,42,28,0,0,65,82,46,0


### Oceania

In [10]:
oceania_df = daily_df.where(daily_df['Country'].isin(oceania))

oceania_df = oceania_df.agg(*[sf.sum(oceania_df[c_name])for c_name in oceania_df.columns])

oceania_df = oceania_df.withColumn('sum(Country)', col('sum(Country)').cast(StringType()))

oceania_df = oceania_df.na.fill(value='oceania_totals')

oceania_df.toPandas()

Unnamed: 0,sum(Country),sum(2022-05-06),sum(2022-05-12),sum(2022-05-13),sum(2022-05-15),sum(2022-05-17),sum(2022-05-18),sum(2022-05-19),sum(2022-05-20),sum(2022-05-21),...,sum(2022-06-21),sum(2022-06-22),sum(2022-06-23),sum(2022-06-24),sum(2022-06-25),sum(2022-06-26),sum(2022-06-27),sum(2022-06-28),sum(2022-06-29),sum(2022-06-30)
0,oceania_totals,0,0,0,0,0,0,0,2,0,...,0,0,0,0,0,0,0,0,0,1


### South America

In [11]:
south_america_df = daily_df.where(daily_df['Country'].isin(south_america))

south_america_df = south_america_df.agg(*[sf.sum(south_america_df[c_name])for c_name in south_america_df.columns])

south_america_df = south_america_df.withColumn('sum(Country)', col('sum(Country)').cast(StringType()))

south_america_df = south_america_df.na.fill(value='south_america_totals')

south_america_df.toPandas()

Unnamed: 0,sum(Country),sum(2022-05-06),sum(2022-05-12),sum(2022-05-13),sum(2022-05-15),sum(2022-05-17),sum(2022-05-18),sum(2022-05-19),sum(2022-05-20),sum(2022-05-21),...,sum(2022-06-21),sum(2022-06-22),sum(2022-06-23),sum(2022-06-24),sum(2022-06-25),sum(2022-06-26),sum(2022-06-27),sum(2022-06-28),sum(2022-06-29),sum(2022-06-30)
0,south_america_totals,0,0,0,0,0,0,0,0,0,...,2,2,10,3,2,2,0,4,0,0


### All of the regions dataframe

In [14]:

region_df_list = ['south_america_df', 'oceania_df', 'north_america_df', 'asia_df', 'africa_df', 'caribbean_df', 'europe_df']

region_df = south_america_df.union(oceania_df)

region_df = region_df.union(north_america_df)

region_df = region_df.union(asia_df)

region_df = region_df.union(africa_df)

region_df = region_df.union(caribbean_df)

region_df = region_df.union(europe_df)

region_df = region_df.withColumnRenamed('sum(Country)', 'region')

# region_df.toPandas().to_csv("./regional_cases.csv", index=False, header=True)

### Daily stats followed by the total of each region

In [16]:
dropped_df = region_df.drop('region')
agg2_df = region_df.withColumn('total', reduce(column_add, (dropped_df[col] for col in dropped_df.columns)))
agg2_df.toPandas()
agg2_df.toPandas().to_csv("./regional_cases.csv", index=False, header=True)

### Total for each recorded date and total of totals for the world

In [289]:
world_total_df = agg2_df.groupBy().sum()
world_total_df.toPandas()

Unnamed: 0,sum(sum(2022-05-06)),sum(sum(2022-05-12)),sum(sum(2022-05-13)),sum(sum(2022-05-15)),sum(sum(2022-05-17)),sum(sum(2022-05-18)),sum(sum(2022-05-19)),sum(sum(2022-05-20)),sum(sum(2022-05-21)),sum(sum(2022-05-23)),...,sum(sum(2022-06-23)),sum(sum(2022-06-24)),sum(sum(2022-06-25)),sum(sum(2022-06-26)),sum(sum(2022-06-27)),sum(sum(2022-06-28)),sum(sum(2022-06-29)),sum(sum(2022-06-30)),sum(sum(2022-07-01)),sum(total)
0,1,1,1,4,3,21,16,46,16,72,...,436,321,3,167,367,366,147,809,207,6144


### Averages of worldwide cases

In [246]:
agg3_df = agg2_df.groupBy().mean()
agg3_pd_df = agg3_df.toPandas()
agg3_pd_df.round()

Unnamed: 0,avg(sum(2022-05-06)),avg(sum(2022-05-12)),avg(sum(2022-05-13)),avg(sum(2022-05-15)),avg(sum(2022-05-17)),avg(sum(2022-05-18)),avg(sum(2022-05-19)),avg(sum(2022-05-20)),avg(sum(2022-05-21)),avg(sum(2022-05-23)),...,avg(sum(2022-06-23)),avg(sum(2022-06-24)),avg(sum(2022-06-25)),avg(sum(2022-06-26)),avg(sum(2022-06-27)),avg(sum(2022-06-28)),avg(sum(2022-06-29)),avg(sum(2022-06-30)),avg(sum(2022-07-01)),avg(total)
0,0.0,0.0,0.0,1.0,0.0,3.0,2.0,7.0,2.0,10.0,...,62.0,46.0,0.0,24.0,52.0,52.0,21.0,116.0,30.0,878.0
