In [2]:
import pandas as pd
import mysql.connector
from mysql.connector import Error
from dotenv import load_dotenv
import os

# Load .env file
load_dotenv()

DB_HOST = os.getenv("DB_HOST")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_NAME = os.getenv("DB_NAME")

# File paths for the CSV files
daily_file = "../data/daily_aggregation.csv"
weekly_file = "../data/weekly_aggregation.csv"
monthly_file = "../data/monthly_aggregation.csv"
final_dataset_file = "../data/final_dataset.csv"

# Function to connect to MySQL
def connect_to_mysql():
    try:
        connection = mysql.connector.connect(
            host=DB_HOST,
            user=DB_USER,
            password=DB_PASSWORD,
            database=DB_NAME,
        )
        if connection.is_connected():
            print(f"Connected to MySQL database: {DB_NAME}")
            return connection
    except Error as e:
        print(f"Connection error: {e}")
        return None

# Function to load a DataFrame into a MySQL table
def load_dataframe_to_mysql_table(connection, df, table_name):
    try:
        cursor = connection.cursor()
        # Create table if not exists
        columns = ", ".join([f"{col} VARCHAR(255)" for col in df.columns])
        create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns})"
        cursor.execute(create_table_query)

        # Insert data into the table
        for _, row in df.iterrows():
            placeholders = ", ".join(["%s"] * len(row))
            insert_query = f"INSERT INTO {table_name} VALUES ({placeholders})"
            cursor.execute(insert_query, tuple(row))
        connection.commit()
        print(f"Data loaded into {table_name} table successfully.")
    except Error as e:
        print(f"Error loading data into {table_name}: {e}")

# Function to format dates and load data into e_commerce_data table
def update_e_commerce_data_table(connection, final_dataset_file):
    final_dataset_df = pd.read_csv(final_dataset_file)
    
    # Convert InvoiceDate to correct format (YYYY-MM-DD)
    final_dataset_df['InvoiceDate'] = pd.to_datetime(
        final_dataset_df['InvoiceDate'], format='%m/%d/%Y'
    ).dt.strftime('%Y-%m-%d')
    
    try:
        cursor = connection.cursor()
        # Clear the existing table
        cursor.execute("TRUNCATE TABLE e_commerce_data")

        # Insert the cleaned data
        for _, row in final_dataset_df.iterrows():
            insert_query = """
            INSERT INTO e_commerce_data (InvoiceNo, Quantity, InvoiceDate, Price, CustomerID, Country)
            VALUES (%s, %s, %s, %s, %s, %s)
            """
            cursor.execute(insert_query, tuple(row))
        connection.commit()
        print("e_commerce_data table updated with the cleaned dataset.")
    except Error as e:
        print(f"Error updating e_commerce_data table: {e}")

# Main script
if __name__ == "__main__":
    connection = connect_to_mysql()
    if connection:
        # Load daily, weekly, and monthly data into MySQL
        daily_df = pd.read_csv(daily_file)
        weekly_df = pd.read_csv(weekly_file)
        monthly_df = pd.read_csv(monthly_file)

        load_dataframe_to_mysql_table(connection, daily_df, "daily_aggregation")
        load_dataframe_to_mysql_table(connection, weekly_df, "weekly_aggregation")
        load_dataframe_to_mysql_table(connection, monthly_df, "monthly_aggregation")

        # Update e_commerce_data table with the final dataset
        update_e_commerce_data_table(connection, final_dataset_file)

        # Close connection
        if connection.is_connected():
            connection.close()
            print("MySQL connection closed.")

Connected to MySQL database: ecommerce_db
Data loaded into daily_aggregation table successfully.
Data loaded into weekly_aggregation table successfully.
Data loaded into monthly_aggregation table successfully.
e_commerce_data table updated with the cleaned dataset.
MySQL connection closed.
