In [6]:
# Import required libraries
import pandas as pd
import sqlite3
import os

# Install necessary packages
!pip install pandas

# CSV data sources:
# https://www.kaggle.com/datasets/nelgiriyewithana/countries-of-the-world-2023 - download the world-data-2023.csv
# https://www.kaggle.com/datasets/myrios/cost-of-living-index-by-country-by-number-2024 - download the Cost_of_Living_Index_by_Country_2024.csv
# Upload them to your Google Colab environment



## Data Cleaning

In [None]:
file1_path = 'DATA/Cost_of_Living_Index_by_Country_2024.csv'
file2_path = 'DATA/world-data-2023.csv'

df1 = pd.read_csv(file1_path)
df2 = pd.read_csv(file2_path)

df1_head = df1.head()
df2_head = df2.head()

df1_columns = df1.columns
df2_columns = df2.columns

df1_head, df2_head, df1_columns, df2_columns

merged_df = pd.merge(df1, df2, on='Country', how='inner')

merged_df_shape = merged_df.shape
merged_df_head = merged_df.head()

merged_df_shape, merged_df_head

output_path = 'merged_dataset.csv'
merged_df.to_csv(output_path, index=False)


'merged_dataset.csv'

In [7]:
# Check working directory to see if the files were uploaded correctly
print("Current working directory:", os.getcwd())
print("Files in the current directory:", os.listdir())

Current working directory: /home/hanyan/DS-2002-Final-Project
Files in the current directory: ['DS_2002_Final_Project_ETL_setup.ipynb', '.git', '.venv', 'README.md']


In [8]:
def fetch_data(file_path):
    """
    Fetch data from a CSV file
    """
    if not os.path.exists(file_path):
        print(f"Error: The file {file_path} does not exist.")
        return None
    try:
        df = pd.read_csv(file_path)
        print(f"Successfully read CSV file from {file_path}")
        return df
    except Exception as e:
        print(f"An unexpected error occurred while reading the file: {e}")
        return None

def convert_format(df, output_format):
    """
    Convert DataFrame to specified output format
    """
    if output_format == 'json':
        return df.to_json(orient='records')
    elif output_format == 'csv':
        return df.to_csv(index=False)
    elif output_format == 'sql':
        return df
    else:
        raise ValueError("Invalid output format. Choose 'json', 'csv', or 'sql'.")

def modify_columns(df, columns_to_keep=None, columns_to_add=None):
    """
    Modify DataFrame columns based on user input
    """
    if columns_to_keep:
        df = df[columns_to_keep]
    if columns_to_add:
        df = df.assign(**columns_to_add)
    return df

def store_data(data, output_format, filename=None, table_name=None):
    """
    Store data in specified format (file or SQL database)
    """
    if output_format in ['json', 'csv']:
        with open(filename, 'w') as f:
            f.write(data)
        print(f"Data stored in {filename}")
    elif output_format == 'sql':
        with sqlite3.connect('output.db') as conn:
            data.to_sql(table_name, conn, if_exists='replace', index=False)
        print(f"Data stored in SQLite database 'output.db', table '{table_name}'")

def generate_summary(df, stage):
    """
    Generate and print summary of the data
    """
    print(f"\n{stage} Summary:")
    print(f"Number of records: {len(df)}")
    print(f"Number of columns: {len(df.columns)}")
    print(f"Columns: {', '.join(df.columns)}")

def clean_birth_rate_data(df):
    """
    Clean and transform birth rate data
    """
    columns_to_keep = ['Country', 'Birth Rate']
    df = df[columns_to_keep]
    df = df.dropna()
    df['Birth Rate'] = pd.to_numeric(df['Birth Rate'].astype(str).str.replace('[^\d.]', '', regex=True), errors='coerce')
    return df

def clean_cost_of_living_data(df):
    """
    Clean and transform cost of living data
    """
    columns_to_keep = ['Country', 'Cost of Living Index']
    df = df[columns_to_keep]
    df = df.dropna()
    df['Cost of Living Index'] = pd.to_numeric(df['Cost of Living Index'], errors='coerce')
    return df

  df['Birth Rate'] = pd.to_numeric(df['Birth Rate'].astype(str).str.replace('[^\d.]', '', regex=True), errors='coerce')


In [9]:
def get_user_input():
    """
    Get user input for data source and output format
    """
    output_format = input("Enter output format (json/csv/sql): ").lower()
    return output_format

def get_column_modifications():
    """
    Get user input for column modifications
    """
    columns_to_keep = input("Enter columns to keep (comma-separated, leave blank for all): ").split(',') if input("Do you want to keep specific columns? (y/n): ").lower() == 'y' else None
    columns_to_add = {col: input(f"Enter value for new column '{col}': ") for col in input("Enter new columns to add (comma-separated, leave blank for none): ").split(',') if col}
    return columns_to_keep, columns_to_add

In [10]:
def main():
    """
    Main function to run the ETL pipeline
    """
    try:
        # Fetch data
        birth_rate_df = fetch_data("/content/world-data-2023.csv")
        cost_of_living_df = fetch_data("/content/Cost_of_Living_Index_by_Country_2024.csv")

        if birth_rate_df is None or cost_of_living_df is None:
            raise ValueError("Failed to fetch data")

        # Generate pre-processing summary
        generate_summary(birth_rate_df, "Birth Rate Pre-processing")
        generate_summary(cost_of_living_df, "Cost of Living Pre-processing")

        # Clean and transform data
        cleaned_birth_rate_df = clean_birth_rate_data(birth_rate_df)
        cleaned_cost_of_living_df = clean_cost_of_living_data(cost_of_living_df)

        # Merge datasets
        merged_df = pd.merge(cleaned_birth_rate_df, cleaned_cost_of_living_df, on='Country', how='inner')

        # Get user input
        output_format = get_user_input()

        # Modify columns
        columns_to_keep, columns_to_add = get_column_modifications()
        merged_df = modify_columns(merged_df, columns_to_keep, columns_to_add)

        # Convert format
        converted_data = convert_format(merged_df, output_format)

        # Store data
        if output_format in ['json', 'csv']:
            filename = f"/content/{input('Enter output filename: ')}"
            store_data(converted_data, output_format, filename)
        elif output_format == 'sql':
            table_name = input("Enter SQL table name: ")
            store_data(merged_df, output_format, table_name=table_name)

        # Generate post-processing summary
        generate_summary(merged_df, "Post-processing")

    except Exception as e:
        print(f"An error occurred: {e}")

if __name__ == "__main__":
    main()

Error: The file /content/world-data-2023.csv does not exist.
Error: The file /content/Cost_of_Living_Index_by_Country_2024.csv does not exist.
An error occurred: Failed to fetch data
