# Infectious disease data loading and merging

## Dataset1. Johns Hopkins University（JHU）：Classical time series data

**Warehouse address：**
https://github.com/CSSEGISandData/COVID-19

**Key files**
（Path：csse_covid_19_data/csse_covid_19_time_series/）：

1.time_series_covid19_confirmed_global.csv:Daily global confirmed cases (wide table structure, with dynamically expanding date columns)

2.time_series_covid19_deaths_global.csv：Daily global death cases (the same as above)

3.time_series_covid19_recovered_global.csv：Daily global recovered cases (Historical data is complete. It may not be updated in the later stage, but it is suitable for project demonstration.)

**Features**: Timely data updates, divided by region (country/province), in wide table format (with date as columns), **suitable for testing Spark's processing ability for "dynamic columns**"


In [62]:
# load the data
import requests

base_url = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/"
sub_path = "csse_covid_19_data/csse_covid_19_time_series/"

files = [
    "time_series_covid19_confirmed_global.csv",
    "time_series_covid19_deaths_global.csv",
    "time_series_covid19_recovered_global.csv"
]

for f in files:
    url = base_url + sub_path + f
    r = requests.get(url)
    with open(f, 'wb') as fp:
        fp.write(r.content)


In [63]:
# Save and view basic data information
import pandas as pd

# setting path
path_confirmed = "time_series_covid19_confirmed_global.csv"
path_deaths = "time_series_covid19_deaths_global.csv"
path_recovered = "time_series_covid19_recovered_global.csv"

# read each csv
df_confirmed = pd.read_csv(path_confirmed)
df_deaths = pd.read_csv(path_deaths)
df_recovered = pd.read_csv(path_recovered)

# check the main info
print("1.Confirmed Cases")
print(df_confirmed.head())
print("\nMissing values:")
print(df_confirmed.isnull().sum())

print("\n 2.Deaths")
print(df_deaths.head())
print("\nMissing values:")
print(df_deaths.isnull().sum())

print("\n 3.Recovered")
print(df_recovered.head())
print("\nMissing values:")
print(df_recovered.isnull().sum())


1.Confirmed Cases
  Province/State Country/Region       Lat       Long  1/22/20  1/23/20  \
0            NaN    Afghanistan  33.93911  67.709953        0        0   
1            NaN        Albania  41.15330  20.168300        0        0   
2            NaN        Algeria  28.03390   1.659600        0        0   
3            NaN        Andorra  42.50630   1.521800        0        0   
4            NaN         Angola -11.20270  17.873900        0        0   

   1/24/20  1/25/20  1/26/20  1/27/20  ...  2/28/23  3/1/23  3/2/23  3/3/23  \
0        0        0        0        0  ...   209322  209340  209358  209362   
1        0        0        0        0  ...   334391  334408  334408  334427   
2        0        0        0        0  ...   271441  271448  271463  271469   
3        0        0        0        0  ...    47866   47875   47875   47875   
4        0        0        0        0  ...   105255  105277  105277  105277   

   3/4/23  3/5/23  3/6/23  3/7/23  3/8/23  3/9/23  
0  209369 

## Data set2. Our World in Data（OWID）：Multi - dimensional comprehensive data

**Warehouse address：**
https://github.com/owid/covid-19-data

**Key files：**
owid-covid-data.csv(in the root directory)

**Content:**Covers daily data of countries around the world,including: Core indicators: confirmed cases, deaths, severe cases; Extended indicators: vaccination (total doses, full vaccination rate), nucleic acid testing, policies (lockdowns, mask mandates), economic impact, etc.

**Features:** Long table format (each row = country + date), with extremely rich dimensions, suitable for demonstrating "indicator differences" in "multi-source heterogeneous" data.**Spark's multi-field processing can be tested**.

In [67]:
# load the OWID data
url = "https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/owid-covid-data.csv"
local_path = "/content/owid-covid-data.csv"
response = requests.get(url)
with open(local_path, 'wb') as f:
    f.write(response.content)
df_owid = spark.read.option("header", "true").csv(local_path)


In [73]:
print("\n owid-covid-data")
df_owid = spark.read.option("header", "true").csv(local_path)
df_owid.show(5)
print(df_owid.head())
print("\nMissing values:")
print(df_deaths.isnull().sum())


 owid-covid-data
+--------+---------+-----------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+-------------

## Dataset3.The New York Times (NYT) US COVID-19 Data Warehouse

**Warehouse address：**
https://github.com/nytimes/covid-19-data

**Key files：** us.csv  National Summary of the United States,us-states.csv   Data of each state in the United States,us-counties.csv   Data of each county in the United States

**Features:**data,case,deaths
NYT focuses on core case indicators (only confirmed cases and deaths). The indicators are more concise, and **it can simulate the cleaning scenario of "simply structured data"** (for example, calculating new cases: new cases on the current day = cumulative cases on the current day - cumulative cases on the previous day).

In [74]:
#load the NYT data
import requests

nyt_url = "https://raw.githubusercontent.com/nytimes/covid-19-data/master/us.csv"
local_nyt_path = "/content/nyt_us.csv"

response = requests.get(nyt_url)
with open(local_nyt_path, 'wb') as f:
    f.write(response.content)

# using PySpark
df_nyt = spark.read.option("header", "true").option("inferSchema", "true").csv(local_nyt_path)
df_nyt.show(5)
print(df_nyt.columns)



+----------+-----+------+
|      date|cases|deaths|
+----------+-----+------+
|2020-01-21|    1|     0|
|2020-01-22|    1|     0|
|2020-01-23|    1|     0|
|2020-01-24|    2|     0|
|2020-01-25|    3|     0|
+----------+-----+------+
only showing top 5 rows

['date', 'cases', 'deaths']


## Dataset combination part
After the preliminary data inspection and confirmation, the data sets are now to be merged.
In order to avoid duplication, **data source 1 only uses the confirmed dataset** for merging to clearly retain the structure between the data sources.

In [84]:
import pandas as pd
import numpy as np
# Clean and standardize JHU (time_series_covid19_confirmed_global.csv only)
jhu_fixed_cols = ["Province/State", "Country/Region", "Lat", "Long"]
date_cols = [col for col in df_confirmed.columns if col not in jhu_fixed_cols]

# Melt to long format
df_jhu_long = df_confirmed.melt(
    id_vars=jhu_fixed_cols,
    var_name="date",
    value_name="confirmed"
)

df_jhu_long["date"] = pd.to_datetime(df_jhu_long["date"], format="%m/%d/%y")
df_jhu_long.rename(columns={"Country/Region": "country", "Province/State": "state"}, inplace=True)
df_jhu_long["deaths"] = np.nan
df_jhu_long["recovered"] = np.nan
df_jhu_long["source"] = "JHU"
df_jhu_std = df_jhu_long[["date", "country", "state", "confirmed", "deaths", "recovered", "source"]]

In [85]:
# Clean and standardize OWID - long table
owid_url = "https://covid.ourworldindata.org/data/owid-covid-data.csv"
df_owid = pd.read_csv(owid_url)
df_owid["date"] = pd.to_datetime(df_owid["date"])
df_owid_std = df_owid[["date", "location", "total_cases", "total_deaths"]].copy()
df_owid_std["country"] = df_owid_std["location"]
df_owid_std["state"] = np.nan
df_owid_std["confirmed"] = df_owid_std["total_cases"]
df_owid_std["deaths"] = df_owid_std["total_deaths"]
df_owid_std["recovered"] = np.nan
df_owid_std["source"] = "OWID"
df_owid_std = df_owid_std[["date", "country", "state", "confirmed", "deaths", "recovered", "source"]]


In [87]:
# Clean and standardize NYT
nyt_url = "https://raw.githubusercontent.com/nytimes/covid-19-data/master/us.csv"
df_nyt = pd.read_csv(nyt_url)
df_nyt["date"] = pd.to_datetime(df_nyt["date"])
df_nyt_std = df_nyt.copy()
df_nyt_std["country"] = "United States"
df_nyt_std["state"] = np.nan
df_nyt_std["confirmed"] = df_nyt_std["cases"]
df_nyt_std["recovered"] = np.nan
df_nyt_std["source"] = "NYT"
df_nyt_std = df_nyt_std[["date", "country", "state", "confirmed", "deaths", "recovered", "source"]]

In [88]:
# combinate the dataset
df_all = pd.concat([df_jhu_std, df_owid_std, df_nyt_std], ignore_index=True)
df_all.sort_values(by=["country", "state", "date"], inplace=True)

In [21]:
df_all.show(5)

+----------+-----------+-----+---------+------+---------+------+
|      date|    country|state|confirmed|deaths|recovered|source|
+----------+-----------+-----+---------+------+---------+------+
|2020-01-22|Afghanistan| NULL|        0|  NULL|     NULL|   JHU|
|2020-01-23|Afghanistan| NULL|        0|  NULL|     NULL|   JHU|
|2020-01-24|Afghanistan| NULL|        0|  NULL|     NULL|   JHU|
|2020-01-25|Afghanistan| NULL|        0|  NULL|     NULL|   JHU|
|2020-01-26|Afghanistan| NULL|        0|  NULL|     NULL|   JHU|
+----------+-----------+-----+---------+------+---------+------+
only showing top 5 rows



In [91]:
# save CSV
df_all.to_csv("covid_cleaned_combined.csv", index=False)

# download in Colab
from google.colab import files
files.download("covid_cleaned_combined.csv")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>