In [13]:
import pandas as pd
import sys
import os

# Add parent directory to path
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))
from src.db_connection import DatabaseConnection

PATH = "/home/mohamed/Desktop/data-engineering-projects/Covid-19-Data-Pipeline/data"


def read_data(file_name) -> pd.DataFrame:
    full_path = PATH + file_name + ".csv"
    row_data = pd.read_csv(full_path, quotechar='"', low_memory=False)
    return row_data


def process_row_data(row_data: pd.DataFrame, file_date):
    row_data["ingested_at"] = file_date
    return row_data


def load_row_into_db(row_data: pd.DataFrame):
    db_connection = DatabaseConnection()

    row_data.columns = row_data.columns.str.lower()  # Fix columns first
    db_connection.load_dataframe_into_db(row_data, "bronze", "covid")


def get_date_from_file_name(file_name: str) -> str:
    return file_name.split(".")[0]

In [22]:
def load_data_to_bronze(BASE):
    to_load_file_name = get_date_from_file_name(BASE)
    row_data = read_data(to_load_file_name)
    row_data = process_row_data(row_data, to_load_file_name)
    load_row_into_db(row_data)

In [11]:
load_data_to_bronze("01-02-2021.csv")

NameError: name 'load_data_to_bronze' is not defined

In [20]:
import pandas as pd
import sys
import os
import re
from dateutil.parser import parse

# Add parent directory to path
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))
PATH = "/home/mohamed/Desktop/data-engineering-projects/Covid-19-Data-Pipeline/data/"


def read_data(file_name) -> pd.DataFrame:
    """
    Read CSV file and drop exception incase if reading the file failed
    """
    full_path = PATH + file_name + ".csv"
    print(f"FULL_PATHH: {full_path}")
    try:
        row_data = pd.read_csv(full_path, quotechar='"', low_memory=False)

        if row_data.empty:
            raise ValueError("The dataset is empty.")

        return row_data

    except FileNotFoundError:
        raise FileNotFoundError(f"The file {file_name} is not exist")
    except pd.errors.EmptyDataError:
        raise ValueError("the file is corupted or empty")
    except Exception as e:
        raise type(e)(f"Failed to read the dataframe{e}")


def normalize_dataframe_columns_to_lowercase(row_data: pd.DataFrame) -> pd.DataFrame:
    """
    Turn the columns name of the dataset to lowercase to avoid any mismatch with the database columns name
    """
    if row_data.shape[1] == 0:
        raise pd.errors.EmptyDataError("Dataframe is empty.")

    to_process = row_data.copy()
    to_process.columns = to_process.columns.str.strip().str.lower()
    return to_process


def process_row_data(row_data: pd.DataFrame, file_date: str) -> pd.DataFrame:
    """
    Process the row data that read from the source and add ingested_at date for incremental load and processing
    to avoid full scan / processing.
    """
    to_process = row_data.copy()

    to_process["ingested_at"] = file_date
    return to_process


def load_row_into_db(row_data: pd.DataFrame) -> int:
    """
    Load the read dataset from source as it as after add new column ( ingested_at ) for delta and incremental processing.
    Raise database exception incase of failure of storing the dataset.
    """

    try:
        with DatabaseConnection() as db_connection:
            total_inserted = db_connection.load_dataframe_into_db(
                row_data, "bronze", "covid"
            )

        return total_inserted

    except Exception as e:
        raise type(e)(
            f"An error happened during storing the dataset into the database {e}"
        )


def get_date_from_file_name(file_name: str) -> str:
    """
    Return the date of the file by processing the filename and extract the date from the filename.
    in date format of day-month-year
    """

    content = file_name.split(".")

    if len(content) != 2:
        raise ValueError("The file name not as expected DD-MM-YYYY")

    file_date = parse(content[0]).strftime("%d-%m-%Y")

    file_name = file_date + "." + content[1]

    pattern = r"^\d{2}-\d{2}-\d{4}\.csv$"

    if not re.match(pattern, file_name):
        raise ValueError("The file name format not as expected.")

    return file_date


def run_extraction(file_name) -> dict:
    """
    Organizing the extraction process by run all the functions in order
    """

    to_load_file_name = get_date_from_file_name(file_name)
    loaded_data = read_data(to_load_file_name)
    row_data = normalize_dataframe_columns_to_lowercase(loaded_data)
    row_data = process_row_data(row_data, to_load_file_name)
    total_inserted = load_row_into_db(row_data)

    # Extraction Information

    extraction_info = {
        "file_name": file_name,
        "rows_read": len(row_data),
        "loaded_data": total_inserted,
        "status": "success",
    }

    return extraction_info

In [29]:
date = get_date_from_file_name("01-03-2021.csv")
date

'03-01-2021'

In [30]:
loaded = read_data(date)

FULL_PATHH: /home/mohamed/Desktop/data-engineering-projects/Covid-19-Data-Pipeline/data/03-01-2021.csv
