# Data Warehouse Query

```sql
CREATE TABLE dim_date (
    date_id SERIAL PRIMARY KEY,
    date DATE NOT NULL,
    year INT NOT NULL,
    month VARCHAR(20) NOT NULL,
    day INT NOT NULL
);

CREATE TABLE dim_location (
    loc_id SERIAL PRIMARY KEY,
    province_state VARCHAR(100) NOT NULL,
    country_region VARCHAR(100) NOT NULL,
    latitude FLOAT,
    longitude FLOAT
);

CREATE TABLE fact_recovered_cases (
    fact_id SERIAL PRIMARY KEY,
    date_key INT NOT NULL,
    location_key INT NOT NULL,
    recovered_cases INT NOT NULL,
    FOREIGN KEY (date_key) REFERENCES dim_date (date_key),
    FOREIGN KEY (location_key) REFERENCES dim_location (location_key)
);
```

# Exploring and Processing Historical Data

In [3]:
import pandas as pd

In [4]:
data = pd.read_csv('https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_recovered_global.csv')
data

Unnamed: 0,Province/State,Country/Region,Lat,Long,1/22/20,1/23/20,1/24/20,1/25/20,1/26/20,1/27/20,...,2/28/23,3/1/23,3/2/23,3/3/23,3/4/23,3/5/23,3/6/23,3/7/23,3/8/23,3/9/23
0,,Afghanistan,33.939110,67.709953,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,,Albania,41.153300,20.168300,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,,Algeria,28.033900,1.659600,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,,Andorra,42.506300,1.521800,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,,Angola,-11.202700,17.873900,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
269,,West Bank and Gaza,31.952200,35.233200,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
270,,Winter Olympics 2022,39.904200,116.407400,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
271,,Yemen,15.552727,48.516388,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
272,,Zambia,-13.133897,27.849332,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [6]:
data['Country/Region'].value_counts()

Unnamed: 0_level_0,count
Country/Region,Unnamed: 1_level_1
China,34
United Kingdom,15
France,12
Australia,8
Netherlands,5
...,...
Guinea,1
Guinea-Bissau,1
Guyana,1
Haiti,1


There are some countries that have more than 1 province data

In [None]:
data['Province/State'].isna().sum()*100/len(data)

There are XX% of missing values in Province/State column, so we will fill to "Unknown"

In [8]:
data['Province/State'].fillna('Unknown',inplace=True)

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  data['Province/State'].fillna('Unknown',inplace=True)


In [9]:
data.head()

Unnamed: 0,Province/State,Country/Region,Lat,Long,1/22/20,1/23/20,1/24/20,1/25/20,1/26/20,1/27/20,...,2/28/23,3/1/23,3/2/23,3/3/23,3/4/23,3/5/23,3/6/23,3/7/23,3/8/23,3/9/23
0,Unknown,Afghanistan,33.93911,67.709953,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,Unknown,Albania,41.1533,20.1683,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,Unknown,Algeria,28.0339,1.6596,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,Unknown,Andorra,42.5063,1.5218,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,Unknown,Angola,-11.2027,17.8739,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


**Create dim_date table**

In [10]:
# Extract dates from the column names
dates = data.columns[4:]

# Build dim_date table
dim_date = pd.DataFrame({
    "date_id": range(1, len(dates) + 1),
    "date": pd.to_datetime(dates),
})
dim_date['year'] = dim_date['date'].dt.year
dim_date['month'] = dim_date['date'].dt.month_name()
dim_date['day'] = dim_date['date'].dt.day

dim_date.head()


  "date": pd.to_datetime(dates),


Unnamed: 0,date_id,date,year,month,day
0,1,2020-01-22,2020,January,22
1,2,2020-01-23,2020,January,23
2,3,2020-01-24,2020,January,24
3,4,2020-01-25,2020,January,25
4,5,2020-01-26,2020,January,26


**Create dim_location table**

In [11]:
# Extract unique location information
locations = data[['Province/State', 'Country/Region', 'Lat', 'Long']]

# Add location key
dim_location = locations.reset_index(drop=True)
dim_location['loc_id'] = dim_location.index + 1

# Rename columns
dim_location = dim_location.rename(columns={
    "Province/State": "province_state",
    "Country/Region": "country_region",
    "Lat": "latitude",
    "Long": "longitude"
})

#Reorder the columns
dim_location = dim_location[['loc_id','province_state','country_region','latitude','longitude']]

dim_location.head()


Unnamed: 0,loc_id,province_state,country_region,latitude,longitude
0,1,Unknown,Afghanistan,33.93911,67.709953
1,2,Unknown,Albania,41.1533,20.1683
2,3,Unknown,Algeria,28.0339,1.6596
3,4,Unknown,Andorra,42.5063,1.5218
4,5,Unknown,Angola,-11.2027,17.8739


**Create fact_recovered_cases table**

In [12]:
# Melt the data to normalize date columns
fact_recovered_cases = data.melt(
    id_vars=['Province/State', 'Country/Region'],
    value_vars=dates,
    var_name='date',
    value_name='recovered_cases'
)

# Add date_key by merging with dim_date
fact_recovered_cases['date'] = pd.to_datetime(fact_recovered_cases['date'])
fact_recovered_cases = fact_recovered_cases.merge(
    dim_date[['date', 'date_id']], on='date', how='left'
)

# Add location_key by merging with dim_location
fact_recovered_cases = fact_recovered_cases.merge(
    dim_location[['province_state', 'country_region', 'loc_id']],
    left_on=['Province/State', 'Country/Region'],
    right_on=['province_state', 'country_region'],
    how='left'
)

# Select relevant columns
fact_recovered_cases = fact_recovered_cases[[
    'date_id', 'loc_id', 'recovered_cases'
]]

fact_recovered_cases.head()


  fact_recovered_cases['date'] = pd.to_datetime(fact_recovered_cases['date'])


Unnamed: 0,date_id,loc_id,recovered_cases
0,1,1,0
1,1,2,0
2,1,3,0
3,1,4,0
4,1,5,0


## Insert data into Data Warehouse

In [35]:
import psycopg2

db_params = {
    "dbname": "datawarehouse",
    "user": "postgres",
    "password": "Bursket21",
    "host": "database-2.crw62s0cicme.ap-southeast-2.rds.amazonaws.com",
    "port": 5432,          # Default PostgreSQL port
    "sslmode": "require"  # Enable SSL/TLS connection
}

'''
db_params = {
    "dbname": "<your database name>",
    "user": "<your master username>",
    "password": "<your master password>",
    "host": "<your endpoint>",
    "port": 5432,          # Default PostgreSQL port
    "sslmode": "require"  # Enable SSL/TLS connection
}
'''

# Establish the database connection
try:
    conn = psycopg2.connect(**db_params)
    print("Connection successful!")
except Exception as e:
    print(f"Error: {e}")
    exit()

Connection successful!


In [36]:
# Insert data into dim_date
cursor = conn.cursor()
for _, row in dim_date.iterrows():
    cursor.execute(
        "INSERT INTO dim_date (date, year, month, day) VALUES (%s, %s, %s, %s);",
        (str(row['date']), str(row['year']), str(row['month']), str(row['day']))
    )
# Commit changes and close connection
conn.commit()

In [37]:
# Insert data into dim_location
cursor = conn.cursor()
for _, row in dim_location.iterrows():
    cursor.execute(
        "INSERT INTO dim_location (province_state, country_region, latitude, longitude) VALUES (%s, %s, %s, %s);",
        (str(row['province_state']), str(row['country_region']), str(row['latitude']), str(row['longitude']))
    )
conn.commit()

In [49]:
# Insert data into fact_recovered_cases
cursor = conn.cursor()
for _, row in fact_recovered_cases.sample(400).iterrows(): # .sample(1000) to make it faster, so we only take 1000 samples
    cursor.execute(
        "INSERT INTO fact_recovered_cases (date_id, loc_id, recovered_cases) VALUES (%s, %s, %s);",
        (str(row['date_id']), str(row['loc_id']), str(row['recovered_cases']))
    )
conn.commit()

# Orchestration using Airflow



In [43]:
recent = data[['Province/State','Country/Region','Lat','Long']]
recent[data.columns[-1]] = data.iloc[:,-1]
recent.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  recent[data.columns[-1]] = data.iloc[:,-1]


Unnamed: 0,Province/State,Country/Region,Lat,Long,3/9/23
0,Unknown,Afghanistan,33.93911,67.709953,0
1,Unknown,Albania,41.1533,20.1683,0
2,Unknown,Algeria,28.0339,1.6596,0
3,Unknown,Andorra,42.5063,1.5218,0
4,Unknown,Angola,-11.2027,17.8739,0


In [45]:
dates = recent.columns[4:]

# Build dim_date table
dim_date1 = pd.DataFrame({
    "date_id": range(1, len(dates) + 1),
    "date": pd.to_datetime(dates),
})
dim_date1['year'] = dim_date1['date'].dt.year
dim_date1['month'] = dim_date1['date'].dt.month_name()
dim_date1['day'] = dim_date1['date'].dt.day

dim_date1.head()

Unnamed: 0,date_id,date,year,month,day
0,1,2023-03-09,2023,March,9


In [46]:
# Extract unique location information
locations1 = recent[['Province/State', 'Country/Region', 'Lat', 'Long']]

# Add location key
dim_location1 = locations1.reset_index(drop=True)
dim_location1['loc_id'] = dim_location1.index + 1

# Rename columns
dim_location1 = dim_location1.rename(columns={
    "Province/State": "province_state",
    "Country/Region": "country_region",
    "Lat": "latitude",
    "Long": "longitude"
})

#Reorder the columns
dim_location1 = dim_location1[['loc_id','province_state','country_region','latitude','longitude']]

dim_location1.head()


Unnamed: 0,loc_id,province_state,country_region,latitude,longitude
0,1,Unknown,Afghanistan,33.93911,67.709953
1,2,Unknown,Albania,41.1533,20.1683
2,3,Unknown,Algeria,28.0339,1.6596
3,4,Unknown,Andorra,42.5063,1.5218
4,5,Unknown,Angola,-11.2027,17.8739


In [47]:
# Melt the data to normalize date columns
fact_recovered_cases1 = recent.melt(
    id_vars=['Province/State', 'Country/Region'],
    value_vars=dates,
    var_name='date',
    value_name='recovered_cases'
)

# Add date_key by merging with dim_date
fact_recovered_cases1['date'] = pd.to_datetime(fact_recovered_cases1['date'])
fact_recovered_cases1 = fact_recovered_cases1.merge(
    dim_date1[['date', 'date_id']], on='date', how='left'
)

# Add location_key by merging with dim_location
fact_recovered_cases1 = fact_recovered_cases1.merge(
    dim_location1[['province_state', 'country_region', 'loc_id']],
    left_on=['Province/State', 'Country/Region'],
    right_on=['province_state', 'country_region'],
    how='left'
)

# Select relevant columns
fact_recovered_cases1 = fact_recovered_cases1[[
    'date_id', 'loc_id', 'recovered_cases'
]]

fact_recovered_cases1.head()


Unnamed: 0,date_id,loc_id,recovered_cases
0,1,1,0
1,1,2,0
2,1,3,0
3,1,4,0
4,1,5,0


## Airflow Dags

```python
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
import psycopg2
import requests
from io import StringIO

# PostgreSQL connection parameters
DB_CONFIG = {
    "dbname": "your_database_name",
    "user": "your_username",
    "password": "your_password",
    "host": "amazon_rds_endpoint",
    "port": 5432
}

CSV_URL = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_recovered_global.csv"

# Function to create tables in PostgreSQL

# Function to extract, transform, and save data
def process_data():
    # Extract Data

    data = pd.read_csv(CSV_URL)
    recent = data[['Province/State', 'Country/Region', 'Lat', 'Long']]
    recent[data.columns[-1]] = data.iloc[:, -1]

    # dim_date Table
    dates = recent.columns[4:]
    dim_date = pd.DataFrame({
        "date_id": range(1, len(dates) + 1),
        "date": pd.to_datetime(dates),
    })
    dim_date["year"] = dim_date["date"].dt.year
    dim_date["month"] = dim_date["date"].dt.month_name()
    dim_date["day"] = dim_date["date"].dt.day
    dim_date.to_csv("/tmp/dim_date.csv", index=False)

    # dim_location Table
    dim_location = recent[['Province/State', 'Country/Region', 'Lat', 'Long']].drop_duplicates().reset_index(drop=True)
    dim_location['loc_id'] = dim_location.index + 1
    dim_location = dim_location.rename(columns={
        "Province/State": "province_state",
        "Country/Region": "country_region",
        "Lat": "latitude",
        "Long": "longitude"
    })
    dim_location = dim_location[['loc_id', 'province_state', 'country_region', 'latitude', 'longitude']]
    dim_location.to_csv("/tmp/dim_location.csv", index=False)

    # fact_recovered_cases Table
    fact_recovered_cases = recent.melt(
        id_vars=['Province/State', 'Country/Region'],
        value_vars=dates,
        var_name='date',
        value_name='recovered_cases'
    )
    fact_recovered_cases['date'] = pd.to_datetime(fact_recovered_cases['date'])
    fact_recovered_cases = fact_recovered_cases.merge(
        dim_date[['date', 'date_id']], on='date', how='left'
    ).merge(
        dim_location[['province_state', 'country_region', 'loc_id']],
        left_on=['Province/State', 'Country/Region'],
        right_on=['province_state', 'country_region'],
        how='left'
    )
    fact_recovered_cases = fact_recovered_cases[['date_id', 'loc_id', 'recovered_cases']]
    fact_recovered_cases.to_csv("/tmp/fact_recovered_cases.csv", index=False)


# Function to load data into PostgreSQL
def load_data():
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()

    # Load dim_date
    dim_date = pd.read_csv("/tmp/dim_date.csv")
    for _, row in dim_date.iterrows():
        cursor.execute("""
            INSERT INTO dim_date (date_id, date, year, month, day)
            VALUES (%s, %s, %s, %s, %s)
            ON CONFLICT (date_id) DO NOTHING;
        """, (row.date_id, row.date, row.year, row.month, row.day))

    # Load dim_location
    dim_location = pd.read_csv("/tmp/dim_location.csv")
    for _, row in dim_location.iterrows():
        cursor.execute("""
            INSERT INTO dim_location (loc_id, province_state, country_region, latitude, longitude)
            VALUES (%s, %s, %s, %s, %s)
            ON CONFLICT (loc_id) DO NOTHING;
        """, (row.loc_id, row.province_state, row.country_region, row.latitude, row.longitude))

    # Load fact_recovered_cases
    fact_recovered_cases = pd.read_csv("/tmp/fact_recovered_cases.csv")
    for _, row in fact_recovered_cases.iterrows():
        cursor.execute("""
            INSERT INTO fact_recovered_cases (date_id, loc_id, recovered_cases)
            VALUES (%s, %s, %s);
        """, (row.date_id, row.loc_id, row.recovered_cases))

    conn.commit()
    cursor.close()
    conn.close()
    print("Data loaded successfully.")

# Define DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 6, 11),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG('covid_recovered_pipeline',
         default_args=default_args,
         schedule_interval='59 23 * * *',  # Run at 23:59 daily
         catchup=False) as dag:

    create_task = PythonOperator(
        task_id='create_tables',
        python_callable=create_tables
    )

    process_task = PythonOperator(
        task_id='process_data',
        python_callable=process_data
    )

    load_task = PythonOperator(
        task_id='load_data',
        python_callable=load_data
    )

    process_task >> load_task
    ```


# Visualization

Connect the database to Looker studio and then use SQL query to add data. Below the query:

**Top 10 highest recovered countries in 2021**

```sql
SELECT
    l.country_region,
    SUM(f.recovered_cases) AS total_recovered_cases
FROM
    fact_recovered_cases f
JOIN
    dim_date d ON f.date_id = d.date_id
JOIN
    dim_location l ON f.loc_id = l.loc_id
WHERE
    d.year = 2021
GROUP BY
    l.country_region
ORDER BY
    total_recovered_cases DESC
LIMIT 10;
```