<a href="https://colab.research.google.com/github/asusatijo/ds-2002-final-project/blob/main/Step_3_ETL_Implementation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
    file_path_1 = "OzoneNational.csv"
    file_path_2 = "co2_annmean_gl.csv"

data_1 = pd.read_csv(file_path_1)
data_2 = pd.read_csv(file_path_2)
print(data_1.head())
print(data_2.head())

   Year      Mean  Number of Trend Sites  10th Percentile  90th Percentile  \
0  1980  0.094925                    134            0.070            0.116   
1  1981  0.092347                    134            0.071            0.115   
2  1982  0.090713                    134            0.069            0.115   
3  1983  0.098052                    134            0.071            0.122   
4  1984  0.089019                    134            0.068            0.112   

  Units  
0   ppm  
1   ppm  
2   ppm  
3   ppm  
4   ppm  
   year    mean   unc
0  1979  336.85  0.11
1  1980  338.91  0.07
2  1981  340.11  0.09
3  1982  340.86  0.03
4  1983  342.53  0.06


In [None]:
import pandas as pd
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# 1. Extract Step
def extract_data(file_path_1, file_path_2):
    """
    Extract data from two CSV files.
    :param file_path_1: Path to the first CSV file.
    :param file_path_2: Path to the second CSV file.
    :return: Two DataFrames containing the extracted data.
    """
    try:
        logging.info("Extracting data from %s and %s", file_path_1, file_path_2)
        data_1 = pd.read_csv(file_path_1)
        data_2 = pd.read_csv(file_path_2)
        logging.info("Data extraction successful.")
        return data_1, data_2
    except Exception as e:
        logging.error("Error during data extraction: %s", e)
        raise

# 2. Transform Step
def transform_data(data_1, data_2):
    """
    Transform and merge data from two sources.
    :param data_1: DataFrame from the first source.
    :param data_2: DataFrame from the second source.
    :return: Merged and transformed DataFrame.
    """
    try:
        logging.info("Transforming data...")
        # Drop missing values from both datasets
        data_1_na = data_1.dropna()
        data_2_na = data_2.dropna()

        # Drop 1979 year (index 0) from co2 data
        data_2_no1979 = data_2_na.drop(index=0)

        # Transformation: Making the data uniform through lowercase and renaming columns to avoid duplicates
        data_1_clean = data_1_na.rename(columns={"Year": "year", "Mean":"mean_of_ozone"})

        # Transformation: Renaming columns for consistency
        data_1_new = data_1_clean.rename(columns={"year": "year_of_ozone_and_co2", "Number of Trend Sites": "Number_of_Trend_Sites", "10th Percentile": "10th_Percentile", "90th Percentile":"90th_Percentile"})
        data_2_new = data_2_no1979.rename(columns={"year": "year_of_ozone_and_co2"})

        # Transformation: Merging datasets on a common column
        merged_data = pd.merge(data_1_new, data_2_new, on="year_of_ozone_and_co2", how="inner")

        # Transformation: Adding a calculated column
        merged_data["mean_of_ozone_and_co2"] = merged_data["mean_of_ozone"] + merged_data["mean"]

        # Transformation: Filtering rows based on a condition
        merged_data = merged_data[merged_data["mean_of_ozone_and_co2"] > 300]

        logging.info("Data transformation successful.")
        return merged_data
    except Exception as e:
        logging.error("Error during data transformation: %s", e)
        raise

# 3. Load Step - Save as SQL
def load_data_to_sql(data, sql_file_path):
    """
    Convert the DataFrame into SQL insert statements and save them to a .sql file.
    :param data: DataFrame containing the transformed data.
    :param sql_file_path: Path to the SQL file.
    """
    try:
        logging.info("Loading data to SQL file: %s", sql_file_path)

        # Open the file in write mode
        with open(sql_file_path, "w") as sql_file:
            # Write SQL commands for table creation
            sql_file.write("CREATE TABLE IF NOT EXISTS transformed_data (\n")
            for idx, col in enumerate(data.columns):
                if idx == len(data.columns) - 1:  # If it's the last column
                  sql_file.write(f"  `{col}` TEXT\n")  # Append value for the last column
                else:
                  sql_file.write(f"  `{col}` TEXT,\n")  # Add a comma for all other columns
            sql_file.write(");\n\n")

            # Write SQL insert commands
            for _, row in data.iterrows():
                sql_file.write("INSERT INTO transformed_data VALUES (")
                sql_file.write(", ".join([f"'{str(x)}'" for x in row.values]))  # Insert row values
                sql_file.write(");\n")

        logging.info("Data successfully written to SQL file.")
    except Exception as e:
        logging.error("Error during data loading to SQL file: %s", e)
        raise

# Main ETL Workflow
def run_etl(file_path_1, file_path_2, sql_file_path):
    """
    Create the ETL workflow for two data sources and save the result as an SQL file.
    :param file_path_1: Path to the first CSV file.
    :param file_path_2: Path to the second CSV file.
    :param sql_file_path: Path to the SQL file to save the result.
    """
    try:
        logging.info("Starting ETL workflow...")
        # Extract step
        data_1, data_2 = extract_data(file_path_1, file_path_2)

        # Transform step
        transformed_data = transform_data(data_1, data_2)

        # Load step: Save data to SQL file
        load_data_to_sql(transformed_data, sql_file_path)

        logging.info("ETL workflow completed successfully.")
    except Exception as e:
        logging.error("ETL workflow failed: %s", e)

# Execute ETL
if __name__ == "__main__":
    # File paths for the two data sources
    file_path_1 = "OzoneNational.csv"
    file_path_2 = "co2_annmean_gl.csv"
    sql_file_path = "ETL_transformed_data.sql"

    # Run the ETL workflow
    run_etl(file_path_1, file_path_2, sql_file_path)
