## Processing COVID cases
The input files are daily cumulative COVID Confirmed, Deaths, Recovered, and Active case count by country, province and administration area. This script load in the daily file, aggregate it to the country-month level, producing both cumulative and newly COVID Confirmed, Deaths, Recovered, and Active case count.
- Data comes from [COVID-19 case data from Johns Hopkins CSSE](https://github.com/CSSEGISandData/COVID-19).
- Data cleaning
    - There are some daily files that does not come with the consistent schema, e.g. missing `Admin2` and `Province_State`, and have `Country/Region` instead of `Country_Region`. Those are a small portion of the data and hard to identify because all daily file has the same name. We just skip those files in this analysis.
    - There are 487 (0.011%) records with abnormal dates, e.g. 2682-01-01, or `Country_Region` name as numbers. Those records are dropped.
    - The daily file could have records not on that date. For example, a 01-01-2021 file can have records for 2020 or Apr 2021, maybe because it is the last updated date.

In [30]:
# start a spark session
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType
from pyspark.sql.window import Window
from pyspark.sql import DataFrame

import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

spark = SparkSession.builder.appName("COVID-19 and Flight Volume Analysis").getOrCreate()

In [37]:

def load_covid_data(data_path: str, batch_date: str) -> DataFrame:
    """
    Load the covid data for a given batch date
    """
    schema = StructType([
        StructField("FIPS", StringType(), True),
        StructField("Admin2", StringType(), True),
        StructField("Province_State", StringType(), True),
        StructField("Country_Region", StringType(), True),
        StructField("Last_Update", TimestampType(), True),
        StructField("Lat", FloatType(), True),
        StructField("Long_", FloatType(), True),
        StructField("Confirmed", IntegerType(), True),
        StructField("Deaths", IntegerType(), True),
        StructField("Recovered", IntegerType(), True),
        StructField("Active", IntegerType(), True),
        StructField("Combined_Key", StringType(), True),
        StructField("Incident_Rate", FloatType(), True),
        StructField("Case_Fatality_Ratio", FloatType(), True),
    ]
    )
    
    df_covid = spark.read.csv(f"{data_path}/{batch_date}.csv", header=True, schema=schema, mode='DROPMALFORMED')
    return df_covid

# load all csv files from data/csse_covid_19_daily_reports
df_covid = load_covid_data('../data/csse_covid_19_daily_reports', '01-*-2021')
df_covid.show(5)


+----+------+--------------+--------------+-------------------+--------+--------+---------+------+---------+------+------------+-------------+-------------------+
|FIPS|Admin2|Province_State|Country_Region|        Last_Update|     Lat|   Long_|Confirmed|Deaths|Recovered|Active|Combined_Key|Incident_Rate|Case_Fatality_Ratio|
+----+------+--------------+--------------+-------------------+--------+--------+---------+------+---------+------+------------+-------------+-------------------+
|NULL|  NULL|          NULL|   Afghanistan|2021-02-01 05:22:53|33.93911|67.70995|    55023|  2400|    47679|  4944| Afghanistan|    141.34433|           4.361812|
|NULL|  NULL|          NULL|       Albania|2021-02-01 05:22:53| 41.1533| 20.1683|    78127|  1380|    47424| 29323|     Albania|     2714.817|          1.7663548|
|NULL|  NULL|          NULL|       Algeria|2021-02-01 05:22:53| 28.0339|  1.6596|   107339|  2891|    73344| 31104|     Algeria|    244.78094|           2.693336|
|NULL|  NULL|         

Warning message like below is because some daily data has a different header. Those fail will fail to load

```log
25/04/05 16:45:55 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Country/Region, Last Update, Confirmed, Deaths, , , , 
 Schema: Admin2, Province_State, Country_Region, Last_Update, Confirmed, Deaths, Recovered, Active
Expected: Admin2 but found: Country/Region
CSV file: file:///Users/yc/Documents/HU/CISC_525_Big_Data_Architecture/CISC525_final_project/data/csse_covid_19_daily_reports/02-26-2020.csv
```

In [29]:
# hide the warning
spark.sparkContext.setLogLevel("ERROR")
def process_daily_data(df: DataFrame, country: list[str] = None) -> DataFrame:
    """
    Process the daily data for given country.

    Args:
        df: DataFrame, the daily data
        country: list[str], the country to process

    Returns:
        DataFrame, the daily data for the given country
    """
    locations = ['Admin2', 'Province_State', 'Country_Region']
    stats = ['Confirmed', 'Deaths', 'Recovered', 'Active']

    if country is None:
        df_daily = df
    else:
        df_daily = df.filter(df['Country_Region'].isin(country))

    df_daily = df_daily.withColumn('date', F.date_format(F.col('Last_Update'), 'yyyy-MM-dd'))
    df_daily = df_daily[['date'] + locations + stats]

    # drop rows with missing date
    logger.info(f"{'='*10} date is missing or abnormal: {'='*10}")
    df_daily.select(
        F.count(F.when(F.isnull('date') | (F.year('date') >= 2024) | (F.year('date') < 2020), 1)).alias('date Missing Abnormal'),
        (F.count(F.when(F.isnull('date') | (F.year('date') >= 2024) | (F.year('date') < 2020), 1)) / F.count('*') * 100).alias('date Missing Abnormal %')
    ).show()
    df_daily.filter(
        F.isnull('date') | (F.year('date') >= 2024) | (F.year('date') < 2020)
    ).show(10)
    
    logger.info(f"Dropping rows with missing or abnormal date")
    df_daily = df_daily.na.drop(subset=['date']).filter((F.year('date') >= 2020) & (F.year('date') <= 2024))

    # aggregate by date and country
    df_daily = df_daily.groupBy('date', 'Country_Region').agg(
        *[F.sum(c).alias(c) for c in stats]
    ).sort(['Country_Region', 'date'])

    # get daily change from the previous day for Confirmed, Deaths, Recovered, Active
    for stat in stats:
        df_daily = df_daily.withColumn(f'{stat}_daily_new', 
                            F.col(stat) - F.lag(stat).over(Window.partitionBy().orderBy('date'))) \
                 .withColumnRenamed(stat, f'{stat}_cumulative')
        
    logger.info(f"{'='*10} Aggregate by date: {'='*10}")
    df_daily.show(5)

    # check missing
    logger.info(f"{'='*10} Count na values: {'='*10}")
    df_daily.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df_daily.columns]).show()
    return df_daily

df_daily = process_daily_data(df_covid, None)

                                                                                

+---------------------+-----------------------+
|date Missing Abnormal|date Missing Abnormal %|
+---------------------+-----------------------+
|                  487|   0.011442357304171221|
+---------------------+-----------------------+



INFO:__main__:Dropping rows with missing or abnormal date                       


+----------+--------------+-------------------+--------------+---------+------+---------+------+
|      date|        Admin2|     Province_State|Country_Region|Confirmed|Deaths|Recovered|Active|
+----------+--------------+-------------------+--------------+---------+------+---------+------+
|2727-01-01|Mainland China|2020-02-29T12:13:10|         66337|     NULL|  NULL|     NULL|  NULL|
|2682-01-01|Mainland China|2020-02-28T00:43:01|         65914|     NULL|  NULL|     NULL|  NULL|
|2641-01-01|Mainland China|2020-02-27T12:13:14|         65596|     NULL|  NULL|     NULL|  NULL|
|2615-01-01|Mainland China|2020-02-26T14:13:10|         65187|     NULL|  NULL|     NULL|  NULL|
|2563-01-01|Mainland China|2020-02-25T15:23:04|         64786|     NULL|  NULL|     NULL|  NULL|
|2495-01-01|Mainland China|2020-02-24T11:13:09|         64287|     NULL|  NULL|     NULL|  NULL|
|2346-01-01|Mainland China|2020-02-23T11:33:03|         64084|     NULL|  NULL|     NULL|  NULL|
|2346-01-01|Mainland China|202



+----------+--------------+--------------------+-----------------+--------------------+-----------------+-------------------+----------------+-------------------+----------------+
|      date|Country_Region|Confirmed_cumulative|Deaths_cumulative|Recovered_cumulative|Active_cumulative|Confirmed_daily_new|Deaths_daily_new|Recovered_daily_new|Active_daily_new|
+----------+--------------+--------------------+-----------------+--------------------+-----------------+-------------------+----------------+-------------------+----------------+
|2020-02-23|         China|                 570|                0|                 570|                0|               NULL|            NULL|               NULL|            NULL|
|2020-03-08|         China|               13170|              156|               13014|                0|              12600|             156|              12444|               0|
|2020-03-12|         China|                 186|                2|                 184|             

                                                                                

+----+--------------+--------------------+-----------------+--------------------+-----------------+-------------------+----------------+-------------------+----------------+
|date|Country_Region|Confirmed_cumulative|Deaths_cumulative|Recovered_cumulative|Active_cumulative|Confirmed_daily_new|Deaths_daily_new|Recovered_daily_new|Active_daily_new|
+----+--------------+--------------------+-----------------+--------------------+-----------------+-------------------+----------------+-------------------+----------------+
|   0|             0|                   0|                0|              113621|           113621|                  1|               1|             116520|          116520|
+----+--------------+--------------------+-----------------+--------------------+-----------------+-------------------+----------------+-------------------+----------------+



In [22]:
# year month from Last_Update
def process_monthly_data(df: DataFrame) -> DataFrame:
    """
    Process the monthly data.

    Args:
        df: DataFrame, the daily data

    Returns:
        DataFrame, the monthly data
    """
    locations = ['Admin2', 'Province_State', 'Country_Region']
    stats = ['Confirmed', 'Deaths', 'Recovered', 'Active']
    df_monthly = df.withColumn('year_month', F.date_format(F.col('date'), 'yyyy-MM'))
    df_monthly = df_monthly.groupBy(['year_month', 'Country_Region']).agg(
        *[F.max(c + '_cumulative').alias(c + '_cumulative') for c in stats],
        *[F.sum(c + '_daily_new').alias(c + '_monthly_new') for c in stats]
    ).sort(['Country_Region', 'year_month'])
    return df_monthly

df_monthly = process_monthly_data(df_daily)
df_monthly.show(5)


                                                                                

+----------+--------------+--------------------+-----------------+--------------------+-----------------+---------------------+------------------+---------------------+------------------+
|year_month|Country_Region|Confirmed_cumulative|Deaths_cumulative|Recovered_cumulative|Active_cumulative|Confirmed_monthly_new|Deaths_monthly_new|Recovered_monthly_new|Active_monthly_new|
+----------+--------------+--------------------+-----------------+--------------------+-----------------+---------------------+------------------+---------------------+------------------+
|   2020-03|   Afghanistan|                 166|                4|                   5|              157|                -8312|               -21|                -8672|               381|
|   2020-04|   Afghanistan|                1827|               60|                 252|             1515|              -139574|             -4997|               -24743|           -109834|
|   2020-05|   Afghanistan|               14529|            