# Data pipelines - Covid-19 Daily Reports Exercise

### Objective: go through Extract, Transform, Load steps using Python, Pandas and SQLite.

**Contents: Simple ETL pipeline**
- [Step 1 - Extract](#extract)
- [Step 2 - Transform](#transform)
- [Step 3 - Load](#load)

<a id="extract"></a>

### Step 1 - Extract

In [1]:
import requests
import pandas as pd
import sqlite3

OWNER = 'CSSEGISandData'
REPO = 'COVID-19'
PATH = 'csse_covid_19_data/csse_covid_19_daily_reports'
URL = f'https://api.github.com/repos/{OWNER}/{REPO}/contents/{PATH}'

# Send GET request to the GitHub API
response = requests.get(URL)

# Error handling: (check if the request was successful)
if response.status_code == 200:
    # Get the JSON response
    json_data = response.json()

    # Extract the 'download_url' for each file
    download_urls = [file['download_url'] for file in json_data if file['name'].endswith('.csv')]
else:
    print("Error occurred while retrieving data from the GitHub API.")

I'll collect all the downloaded dataframes in a list to make the initial EDA more efficient.

In [2]:
dataframes = []

for i,download_url in enumerate(download_urls):
    # Read the CSV data into a DataFrame
    df = pd.read_csv(download_url)

    # Save the dataframe objects to make some exploratory analysis quicker
    dataframes.append(df)

<a id="transform"></a>

### Step 2 - Transform

In [13]:
for df in dataframes[:5]:
    df.info(verbose=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4011 entries, 0 to 4010
Data columns (total 7 columns):
 #   Column          Non-Null Count  Dtype         
---  ------          --------------  -----         
 0   Province_State  3833 non-null   object        
 1   Country_Region  4011 non-null   object        
 2   Last_Update     4011 non-null   datetime64[ns]
 3   Latitude        3922 non-null   float64       
 4   Longitude       3922 non-null   float64       
 5   Confirmed       4011 non-null   int64         
 6   Deaths          4011 non-null   int64         
dtypes: datetime64[ns](1), float64(2), int64(2), object(2)
memory usage: 219.5+ KB
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4016 entries, 0 to 4015
Data columns (total 7 columns):
 #   Column          Non-Null Count  Dtype         
---  ------          --------------  -----         
 0   Province_State  3837 non-null   object        
 1   Country_Region  4016 non-null   object        
 2   Last_Update     4016 non

In [4]:
sizes = pd.DataFrame([df.shape[1] for df in dataframes])
print('Column counts:')
sizes.value_counts()

Column counts:


14    871
12     68
6      39
8      21
dtype: int64

In [5]:
header_lengths = [14, 12, 8, 6]

for df in dataframes:
    if df.shape[1] in header_lengths:
        print(df.shape[1], sorted(df.columns.to_list()))
        header_lengths.remove(df.shape[1])
        if header_lengths == []:
            break
    else:
        continue

14 ['Active', 'Admin2', 'Case_Fatality_Ratio', 'Combined_Key', 'Confirmed', 'Country_Region', 'Deaths', 'FIPS', 'Incident_Rate', 'Last_Update', 'Lat', 'Long_', 'Province_State', 'Recovered']
6 ['Confirmed', 'Country/Region', 'Deaths', 'Last Update', 'Province/State', 'Recovered']
8 ['Confirmed', 'Country/Region', 'Deaths', 'Last Update', 'Latitude', 'Longitude', 'Province/State', 'Recovered']
12 ['Active', 'Admin2', 'Combined_Key', 'Confirmed', 'Country_Region', 'Deaths', 'FIPS', 'Last_Update', 'Lat', 'Long_', 'Province_State', 'Recovered']


**After examining column headers**:

4 levels of Tables (new headers from one level to the next will be highlighted)

- Level 1 (6 columns): 'Confirmed', 'Country/Region', 'Deaths', 'Last Update', 'Province/State', 'Recovered'
- Level 2 (8 columns): 'Confirmed', 'Country/Region', 'Deaths', 'Last Update', **'Latitude'**, **'Longitude'**, 'Province/State', 'Recovered'
- Level 3 (12 columns): **'Active'**, **'Admin2'**, **'Combined_Key'**, 'Confirmed', 'Country_Region', 'Deaths', **'FIPS'**, 'Last_Update', 'Lat', 'Long_', 'Province_State', 'Recovered'
- Level 4 (14 columns): 'Active', 'Admin2', **'Case_Fatality_Ratio'**, 'Combined_Key', 'Confirmed', 'Country_Region', 'Deaths', 'FIPS', **'Incident_Rate'**, 'Last_Update', 'Lat', 'Long_', 'Province_State', 'Recovered'

**Notes**:

- FIPS and Admin2 are US specific columns so can drop these to have a more consistent table structure worldwide. Combined key is also a column that is redundant given it is a concatenation of Country/Region and Province/State

In [6]:
# Dictionary to collect unique column headers and their count
header_count_dict = {}

# Iterate over the DataFrames
for df in dataframes:
    # Get the column headers of the current DataFrame
    column_headers = df.columns.tolist()

    # Update the header count dictionary
    for header in column_headers:
        if header in header_count_dict:
            header_count_dict[header] += 1
        else:
            header_count_dict[header] = 1

# Print the unique column headers and their count
print("Unique Column Headers and Count:")
for header, count in header_count_dict.items():
    print(f"{header}: {count} DataFrames")

Unique Column Headers and Count:
FIPS: 939 DataFrames
Admin2: 939 DataFrames
Province_State: 939 DataFrames
Country_Region: 939 DataFrames
Last_Update: 939 DataFrames
Lat: 939 DataFrames
Long_: 939 DataFrames
Confirmed: 999 DataFrames
Deaths: 999 DataFrames
Recovered: 999 DataFrames
Active: 939 DataFrames
Combined_Key: 939 DataFrames
Incident_Rate: 707 DataFrames
Case_Fatality_Ratio: 707 DataFrames
Province/State: 60 DataFrames
Country/Region: 60 DataFrames
Last Update: 60 DataFrames
Latitude: 21 DataFrames
Longitude: 21 DataFrames
Incidence_Rate: 164 DataFrames
Case-Fatality_Ratio: 164 DataFrames


This helps determine which way to rename column headers

In [7]:
cols_to_remove = []

for df in dataframes:
    null_percentages = df.isnull().sum() / len(df) * 100
    for column, percentage in null_percentages.items():
        if percentage>80:
            cols_to_remove.append(column)

pd.DataFrame(cols_to_remove).value_counts()

Active       637
Recovered    637
dtype: int64

Should also drop these columns (more than half the dataframes are missing the majority of these values)

### Cleaning Plan:

- Rename columns so every dataframe has exactly the same column headers (this is important to ensure things are dropped correctly

|      Old Name       |      New Name       |
|---------------------|---------------------|
| Country/Region      | Country_Region      |
| Province/State      | Province_State      |
| Lat                 | Latitude            |
| Long_               | Longitude           |
| Case-Fatality_Ratio | Case_Fatality_Ratio |
| Incidence_Rate      | Incident_Rate       |

- Standardize the columns of each dataframe to hold the following data:
    1) Country_Region
    2) Province_State
    3) Latitude
    4) Longitude
    5) Confirmed
    6) Deaths
    7) Last_Update 
- Drop the following columns from any dataframe (if they exist):

| Column to drop      | Justification                   |
| --------------------| --------------------------------|
| Admin2              | Too USA specific                |
| FIPS                | Too USA specific                |
| Combined_key        | Redundant data                  |
| Active              | Too much missing data           |
| Recovered           | Too much missing data           |
| Incident_Rate       | Missing from too many dataframes|
| Case_Fatality_Ratio | Missing from too many dataframes|
    
- Add Null columns to any tables that do not already have one of the 7 columns above (this should be a small minority of dataframes)
    
- Standardize formatting (change Last Update to datetime)

In [8]:
rename_mapping = {
    'Country/Region': 'Country_Region',
    'Province/State': 'Province_State',
    'Lat': 'Latitude',
    'Long_': 'Longitude',
    'Last Update': 'Last_Update',
    'Case-Fatality_Ratio':'Case_Fatality_Ratio',
    'Incidence_Rate': 'Incident_Rate'
}

columns_to_drop = ['Admin2','FIPS','Combined_Key','Active','Recovered','Incident_Rate','Case_Fatality_Ratio']
columns_to_add = ['Country_Region','Province_State','Latitude','Longitude','Confirmed','Deaths','Last_Update']

In [9]:
def convert_to_datetime(value):
    """
    Convert the input value to datetime format by trying multiple formats.

    Args:
        value (str): The input value to be converted.

    Returns:
        datetime.datetime or pd.NaT: The converted datetime value or NaT if the conversion fails.
    """
    formats = ['%Y-%m-%d %H:%M:%S', '%m/%d/%Y %H:%M']

    for fmt in formats:
        try:
            return pd.to_datetime(value, format=fmt)
        except ValueError:
            pass

    return pd.NaT


def clean_dataframe(df, rename_mapping, columns_to_drop, columns_to_add):
    """
    Performs cleaning operations on a pandas DataFrame.

    Args:
        df (pandas.DataFrame): The DataFrame to be cleaned.
        rename_mapping (dict): A dictionary mapping old column names to new column names.
        columns_to_drop (list): A list of column names to be dropped from the DataFrame (if exists).
        columns_to_add (list): A list of column names to be added to the DataFrame (if not already exists).

    Returns:
        pandas.DataFrame: The cleaned DataFrame.
    """
    # Renaming columns
    df.rename(columns=rename_mapping, inplace=True)

    # Dropping columns
    for column in columns_to_drop:
        if column in df.columns:
            df.drop(columns=column, inplace=True)

    # Adding columns with null values if not present
    for column in columns_to_add:
        if column not in df.columns:
            df[column] = pd.Series(dtype=float)

    # Converting 'Last_Update' column to datetime format
    df['Last_Update'] = df['Last_Update'].apply(convert_to_datetime)
    #df['Last_Update'] = pd.to_datetime(df['Last_Update'], format='%Y-%m-%d %H:%M:%S')

    return df

    

In [10]:
clean_dataframe(dataframes[2], rename_mapping, columns_to_drop, columns_to_add).info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4016 entries, 0 to 4015
Data columns (total 7 columns):
 #   Column          Non-Null Count  Dtype         
---  ------          --------------  -----         
 0   Province_State  3837 non-null   object        
 1   Country_Region  4016 non-null   object        
 2   Last_Update     4016 non-null   datetime64[ns]
 3   Latitude        3925 non-null   float64       
 4   Longitude       3925 non-null   float64       
 5   Confirmed       4016 non-null   int64         
 6   Deaths          4016 non-null   int64         
dtypes: datetime64[ns](1), float64(2), int64(2), object(2)
memory usage: 219.8+ KB


In [11]:
cleaned_dataframes = []

for df in dataframes:
    cleaned_dataframes.append(clean_dataframe(df, rename_mapping, columns_to_drop, columns_to_add))

<a id="load"></a>

### Step 3 - Load

In [14]:
# Database file path
db_file = 'covid_data.db'

# Connect to the SQLite database
conn = sqlite3.connect(db_file)

# Iterate over the clean DataFrames
for i, df in enumerate(cleaned_dataframes):
    table_name = f'table_{i}'

    # Load the DataFrame into the SQL database using .to_sql()
    df.to_sql(table_name, conn, if_exists='replace')

# Close the database connection
conn.close()