First thing is we need a couple of scripts for our pipeline:
Main script which will call all the other scripts and will provide us with just "start_offset_date" and "end_offset_date" for backdating along with the option to either comment out second run or third run. 

We will have an etl script and a main script that we will use to call the etl script defined as below
1. We will have an extraction function that will get data from our csv and add that data to pandas dataframe
2. Next we'll have a transformation function that will transform our location and price columns as stated above
3. Then we'll have loading function that will load to our transformed csv

Testing the exract function

In [6]:
import pandas as pd
import os
from datetime import datetime

def extract_data(input_csv):
    """
    Extracts data from a specified CSV file into a pandas DataFrame.

    Args:
        input_csv (str): The path to the raw sales data CSV file.

    Returns:
        pd.DataFrame: A DataFrame containing the extracted data, or None if the file is not found.
    """
    print(f"Extracting data from '{input_csv}'...")
    try:
        df = pd.read_csv(input_csv, sep = ";")
        return df
    except FileNotFoundError:
        print(f"Error: The file '{input_csv}' was not found.")
        return None

In [7]:
df = extract_data("Coffee Shop Sales2.csv")
df.head()

Extracting data from 'Coffee Shop Sales2.csv'...


Unnamed: 0,transaction_id,transaction_date,transaction_time,transaction_qty,store_id,store_location,product_id,unit_price,product_category,product_type,product_detail
0,1.0,2025/01/01,07:06:11,2.0,5.0,Lower Manhattan,32.0,3.0,Coffee,Gourmet brewed coffee,Ethiopia Rg
1,2.0,2025/01/01,07:08:56,2.0,5.0,Lower Manhattan,57.0,3.1,Tea,Brewed Chai tea,Spicy Eye Opener Chai Lg
2,3.0,2025/01/01,07:14:04,2.0,5.0,Lower Manhattan,59.0,4.5,Drinking Chocolate,Hot chocolate,Dark chocolate Lg
3,4.0,2025/01/01,07:20:24,1.0,5.0,Lower Manhattan,22.0,2.0,Coffee,Drip coffee,Our Old Time Diner Blend Sm
4,5.0,2025/01/01,07:22:41,2.0,5.0,Lower Manhattan,57.0,3.1,Tea,Brewed Chai tea,Spicy Eye Opener Chai Lg


Transformations

Now that we have loaded the data we need to do a couple of transformations.

1. Convert transaction_id, store_id and product_id columns to strings
2. Properly format transaction_date and transaction_time to match their proper formats. Date and timestamp.
3. Map locations to my local locations as follows, "Hell's Kitchen": "East Campus", "Astoria": "West Campus", "Lower Manhattan": "Main Campus"
4. Convert price to rands and we'll assume a conversion rate of $1:R15 so that prices are more local

In [21]:
#mapping locations

location_map = {
        "Hell's Kitchen": "East Campus",
        "Astoria": "West Campus",
        "Lower Manhattan": "Main Campus"
    }
df['store_location'] = df['store_location'].replace(location_map)
unique_locations = df['store_location'].unique()
print(unique_locations)

['Main Campus' 'East Campus' 'West Campus' nan]


In [23]:
#converting all id columns to strings that are properly formatted

cols_to_convert = ['transaction_id', 'store_id', 'product_id']

#If we might have missing values (NaN) in these columns and don’t want errors

df[cols_to_convert] = (
    df[cols_to_convert]
    .apply(lambda col: col.dropna().astype(int).astype(str))
)

df.head()

Unnamed: 0,transaction_id,transaction_date,transaction_time,transaction_qty,store_id,store_location,product_id,unit_price,product_category,product_type,product_detail
0,1,2025/01/01,07:06:11,2.0,5,Main Campus,32,3.0,Coffee,Gourmet brewed coffee,Ethiopia Rg
1,2,2025/01/01,07:08:56,2.0,5,Main Campus,57,3.1,Tea,Brewed Chai tea,Spicy Eye Opener Chai Lg
2,3,2025/01/01,07:14:04,2.0,5,Main Campus,59,4.5,Drinking Chocolate,Hot chocolate,Dark chocolate Lg
3,4,2025/01/01,07:20:24,1.0,5,Main Campus,22,2.0,Coffee,Drip coffee,Our Old Time Diner Blend Sm
4,5,2025/01/01,07:22:41,2.0,5,Main Campus,57,3.1,Tea,Brewed Chai tea,Spicy Eye Opener Chai Lg


Next we'll convert our price column to rands by multiplying it by 15

In [25]:
df['unit_price'] = (df['unit_price'] * 15).round(2)
df.head()

Unnamed: 0,transaction_id,transaction_date,transaction_time,transaction_qty,store_id,store_location,product_id,unit_price,product_category,product_type,product_detail
0,1,2025/01/01,07:06:11,2.0,5,Main Campus,32,45.0,Coffee,Gourmet brewed coffee,Ethiopia Rg
1,2,2025/01/01,07:08:56,2.0,5,Main Campus,57,46.5,Tea,Brewed Chai tea,Spicy Eye Opener Chai Lg
2,3,2025/01/01,07:14:04,2.0,5,Main Campus,59,67.5,Drinking Chocolate,Hot chocolate,Dark chocolate Lg
3,4,2025/01/01,07:20:24,1.0,5,Main Campus,22,30.0,Coffee,Drip coffee,Our Old Time Diner Blend Sm
4,5,2025/01/01,07:22:41,2.0,5,Main Campus,57,46.5,Tea,Brewed Chai tea,Spicy Eye Opener Chai Lg


Let's format date and time.

In [26]:
# Format date as YYYY-MM-DD
df['transaction_date'] = pd.to_datetime(
    df['transaction_date'], errors='coerce'
).dt.strftime('%Y-%m-%d')

# Format time as HH:MM:SS
df['transaction_time'] = pd.to_datetime(
    df['transaction_time'], format='%H:%M:%S', errors='coerce'
).dt.strftime('%H:%M:%S')

df.head()

Unnamed: 0,transaction_id,transaction_date,transaction_time,transaction_qty,store_id,store_location,product_id,unit_price,product_category,product_type,product_detail
0,1,2025-01-01,07:06:11,2.0,5,Main Campus,32,45.0,Coffee,Gourmet brewed coffee,Ethiopia Rg
1,2,2025-01-01,07:08:56,2.0,5,Main Campus,57,46.5,Tea,Brewed Chai tea,Spicy Eye Opener Chai Lg
2,3,2025-01-01,07:14:04,2.0,5,Main Campus,59,67.5,Drinking Chocolate,Hot chocolate,Dark chocolate Lg
3,4,2025-01-01,07:20:24,1.0,5,Main Campus,22,30.0,Coffee,Drip coffee,Our Old Time Diner Blend Sm
4,5,2025-01-01,07:22:41,2.0,5,Main Campus,57,46.5,Tea,Brewed Chai tea,Spicy Eye Opener Chai Lg


Now all we need to do is put all the above into a function that will allow us to have dates we can use to backdate our data 

In [41]:
def transform_data(df, start_offset_date, end_offset_date):
    """
    Transforms the raw DataFrame by filtering by date, mapping locations,
    and converting currency.

    Args:
        df (pd.DataFrame): The raw data DataFrame.
        start_offset_date (str): The start date for filtering (e.g., '2025-01-01').
        end_offset_date (str): The end date for filtering (e.g., '2025-03-31').

    Returns:
        pd.DataFrame: The transformed DataFrame, or an empty DataFrame if no data
                      is found in the specified date range.
    """
    print("Transforming data...")


    # Convert date and time to datetime/time
    df['transaction_date'] = pd.to_datetime(df['transaction_date'], errors='coerce')
    df['transaction_time'] = pd.to_datetime(df['transaction_time'], format='%H:%M:%S', errors='coerce').dt.strftime('%H:%M:%S')

    # Filter based on date range
    start_date = pd.to_datetime(start_offset_date)
    end_date = pd.to_datetime(end_offset_date)
    df_filtered = df[(df['transaction_date'] >= start_date) & (df['transaction_date'] <= end_date)].copy()

    if df_filtered.empty:
        print(f"No data found for the date range: {start_offset_date} to {end_offset_date}.")
        return df_filtered

    # Format date for output
    df_filtered['transaction_date'] = df_filtered['transaction_date'].dt.strftime('%Y-%m-%d')

    # Map store locations
    location_map = {
        "Hell's Kitchen": "East Campus",
        "Astoria": "West Campus",
        "Lower Manhattan": "Main Campus"
    }
    df_filtered['store_location'] = df_filtered['store_location'].replace(location_map)

    # Convert unit_price
    df_filtered['unit_price'] = (df_filtered['unit_price'] * 15).round(2)


    cols_to_convert = ['transaction_id', 'store_id', 'product_id']

    #If we might have missing values (NaN) in these columns and don’t want errors

    df_filtered[cols_to_convert] = (
        df_filtered[cols_to_convert]
        .apply(lambda col: col.dropna().astype(int).astype(str)))

    return df_filtered

In [53]:
df_new = transform_data(df, "2025-03-01", "2025-03-05")
df_new.head()

Transforming data...


Unnamed: 0,transaction_id,transaction_date,transaction_time,transaction_qty,store_id,store_location,product_id,unit_price,product_category,product_type,product_detail
33673,33730,2025-03-01,07:01:20,2.0,5,Main Campus,22,30.0,Coffee,Drip coffee,Our Old Time Diner Blend Sm
33674,33731,2025-03-01,07:02:34,2.0,5,Main Campus,30,45.0,Coffee,Gourmet brewed coffee,Columbian Medium Roast Lg
33675,33732,2025-03-01,07:02:56,1.0,5,Main Campus,22,30.0,Coffee,Drip coffee,Our Old Time Diner Blend Sm
33676,33733,2025-03-01,07:04:24,1.0,5,Main Campus,37,45.0,Coffee,Barista Espresso,Espresso shot
33677,33734,2025-03-01,07:06:11,2.0,5,Main Campus,32,45.0,Coffee,Gourmet brewed coffee,Ethiopia Rg


Now we will work on loading our data. First, we will load the data into a csv on our local pc. Once we are able to load our data into a csv, we need to create logic that will make sure that we do not duplicate transactions and thus if we load transactions that have been loaded already, they will get replaced otherwise the transactions will be appended. Once our logic is successful, we will attempt automating data into our csv so we upload data daily automated. The last step in building our first ETL is to get our data to flow into our OLTP (MySQL) and Bigquery tables but later on that :) 

In [55]:
def load_data(df_transformed, output_csv, start_offset_date, end_offset_date):
    if df_transformed.empty:
        print("⚠ No data to load.")
        return

    # Ensure correct types
    df_transformed['transaction_date'] = pd.to_datetime(df_transformed['transaction_date'])
    df_transformed['transaction_id'] = df_transformed['transaction_id'].astype(str)

    if os.path.exists(output_csv):
        print("📂 Existing data found. Removing old data in the same date range...")
        df_existing = pd.read_csv(output_csv)
        df_existing['transaction_date'] = pd.to_datetime(df_existing['transaction_date'])
        df_existing['transaction_id'] = df_existing['transaction_id'].astype(str)

        # Remove rows from existing file that fall within the same date range
        mask = ~(
            (df_existing['transaction_date'] >= pd.to_datetime(start_offset_date)) &
            (df_existing['transaction_date'] <= pd.to_datetime(end_offset_date))
        )
        df_existing = df_existing[mask]

        # Append new transformed data
        df_final = pd.concat([df_existing, df_transformed], ignore_index=True)
    else:
        print("🆕 No existing data. Creating a new file...")
        df_final = df_transformed.copy()

    # Keep only unique transactions
    df_final = df_final.drop_duplicates(subset=['transaction_id'], keep='last')
    df_final = df_final.sort_values(by=['transaction_date', 'transaction_time'])

    df_final.to_csv(output_csv, index=False)
    print(f"✅ Load complete! Total unique transactions: {df_final['transaction_id'].nunique()}")

In [59]:
load_data(df_new, "successful_upload.csv", "2025-01-01", "2025-04-05")

📂 Existing data found. Removing old data in the same date range...
✅ Load complete! Total unique transactions: 3343


I used cd dags
rm -rf .ipynb_checkpoints

to make the dags folder accessible

In [1]:
pip install mysql-connector-python

Collecting mysql-connector-python
  Using cached mysql_connector_python-9.4.0-py2.py3-none-any.whl.metadata (7.3 kB)
Using cached mysql_connector_python-9.4.0-py2.py3-none-any.whl (406 kB)
Installing collected packages: mysql-connector-python
Successfully installed mysql-connector-python-9.4.0
Note: you may need to restart the kernel to use updated packages.


'host': 'localhost',
        'user': 'root',
        'password': '@Whatsnew2711',
        'database': 'coffee_shop_sales_oltp' for MySQL

PostgreSQL password: 'Whatsnew2711'

In [2]:
pip install pymysql

Collecting pymysql
  Using cached PyMySQL-1.1.1-py3-none-any.whl.metadata (4.4 kB)
Using cached PyMySQL-1.1.1-py3-none-any.whl (44 kB)
Installing collected packages: pymysql
Successfully installed pymysql-1.1.1
Note: you may need to restart the kernel to use updated packages.


In [2]:
pip show SQLAlchemy

Name: SQLAlchemy
Version: 1.4.54
Summary: Database Abstraction Library
Home-page: https://www.sqlalchemy.org
Author: Mike Bayer
Author-email: mike_mp@zzzcomputing.com
License: MIT
Location: /Users/mac/anaconda3/lib/python3.10/site-packages
Requires: greenlet
Required-by: aext-project-filebrowser-server, alembic, apache-airflow-core, ipython-sql, SQLAlchemy-JSONField, SQLAlchemy-Utils
Note: you may need to restart the kernel to use updated packages.


In [11]:
pip install SQLAlchemy

Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install psycopg2

Collecting psycopg2
  Using cached psycopg2-2.9.10.tar.gz (385 kB)
  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: psycopg2
[33m  DEPRECATION: Building 'psycopg2' using the legacy setup.py bdist_wheel mechanism, which will be removed in a future version. pip 25.3 will enforce this behaviour change. A possible replacement is to use the standardized build interface by setting the `--use-pep517` option, (possibly combined with `--no-build-isolation`), or adding a `pyproject.toml` file to the source tree of 'psycopg2'. Discussion can be found at https://github.com/pypa/pip/issues/6334[0m[33m
[0m  Building wheel for psycopg2 (setup.py) ... [?25ldone
[?25h  Created wheel for psycopg2: filename=psycopg2-2.9.10-cp311-cp311-macosx_10_15_x86_64.whl size=134232 sha256=727426fb4ff5c10e4c69c805c85c4fbeece2cbacee648f3796112d9be1c7a31b
  Stored in directory: /Users/mac/Library/Caches/pip/wheels/d9/83/60/e9660320860aef3c38a67dea6ff9538e4cad76502cb39ed

In [8]:
import pandas as pd
import mysql.connector
import psycopg2
import psycopg2.extras
import logging
import sys

# Set up logging for better visibility
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# =========================================================
# 1️⃣ Database Connection Configurations
# =========================================================
MYSQL_HOST = "localhost"
MYSQL_PORT = 3306
MYSQL_USER = "root"
MYSQL_PASSWORD = "@Whatsnew2711"
MYSQL_DB = "coffee_shop_sales_oltp"

PG_HOST = "localhost"
PG_PORT = 5432
PG_USER = "postgres"
PG_PASSWORD = "Whatsnew2711"
PG_DB = "Coffee_Machine_Project_One"

def get_postgres_connection():
    """Establishes and returns a PostgreSQL database connection."""
    try:
        conn = psycopg2.connect(
            host=PG_HOST,
            port=PG_PORT,
            user=PG_USER,
            password=PG_PASSWORD,
            dbname=PG_DB
        )
        return conn
    except psycopg2.Error as e:
        logging.error(f"❌ Failed to connect to PostgreSQL: {e}")
        return None

# =========================================================
# 2️⃣ ETL Functions
# =========================================================

def extract_data_from_mysql(date_to_process):
    """
    Extracts daily transaction data from the MySQL OLTP database
    for a specific date using a direct connection.
    """
    logging.info(f"📥 Extracting data from MySQL for {date_to_process}...")
    conn = None
    cursor = None
    try:
        # Pass parameters directly as keyword arguments to avoid parsing issues.
        conn = mysql.connector.connect(
            host=MYSQL_HOST,
            port=MYSQL_PORT,
            user=MYSQL_USER,
            password=MYSQL_PASSWORD,
            database=MYSQL_DB
        )
        cursor = conn.cursor()
        
        query = f"""
        SELECT
            transaction_id,
            transaction_date,
            transaction_time,
            transaction_qty,
            store_id,
            store_location,
            product_id,
            unit_price,
            product_category,
            product_type,
            product_detail
        FROM sales_data
        WHERE transaction_date = '{date_to_process}';
        """
        df_raw = pd.read_sql(query, conn)
        logging.info(f"✅ Extracted {len(df_raw)} records.")
        return df_raw
    except mysql.connector.Error as err:
        logging.error(f"❌ MySQL Error: {err}")
        # Rollback not needed for a read operation, but included for consistency
        if 'conn' in locals() and conn.is_connected():
            conn.rollback()
        return None
    finally:
        if 'cursor' in locals() and cursor is not None:
            cursor.close()
        if 'conn' in locals() and conn.is_connected():
            conn.close()
            logging.info("MySQL connection closed.")

def transform_data(df_raw):
    """
    Transforms the raw data into a star schema format.
    Returns the fact table and all dimension tables.
    """
    logging.info("🔄 Transforming data for OLAP...")
    df_raw["total_amount"] = df_raw["transaction_qty"] * df_raw["unit_price"]

    # Dim Date
    dim_date = df_raw[["transaction_date"]].drop_duplicates().reset_index(drop=True)
    dim_date["date_key"] = dim_date["transaction_date"].dt.strftime('%Y%m%d').astype(int)
    dim_date["full_date"] = dim_date["transaction_date"]
    dim_date["day_of_week"] = dim_date["transaction_date"].dt.day_name()
    dim_date["month_name"] = dim_date["transaction_date"].dt.month_name()
    dim_date["quarter"] = dim_date["transaction_date"].dt.quarter
    dim_date["year"] = dim_date["transaction_date"].dt.year
    dim_date = dim_date[["date_key", "full_date", "day_of_week", "month_name", "quarter", "year"]]

    # Dim Time
    dim_time = df_raw[["transaction_time"]].drop_duplicates().reset_index(drop=True)
    dim_time["time_key"] = dim_time["transaction_time"].dt.hour * 10000 + \
                           dim_time["transaction_time"].dt.minute * 100 + \
                           dim_time["transaction_time"].dt.second
    dim_time["full_time"] = dim_time["transaction_time"]
    dim_time["hour"] = dim_time["transaction_time"].dt.hour
    dim_time["minute"] = dim_time["transaction_time"].dt.minute
    dim_time = dim_time[["time_key", "full_time", "hour", "minute"]]

    # Dim Store
    dim_store = df_raw[["store_id", "store_location"]].drop_duplicates().reset_index(drop=True)
    dim_store["store_key"] = dim_store.index + 1

    # Dim Product
    dim_product = df_raw[["product_id", "product_category", "product_type", "product_detail", "unit_price"]].drop_duplicates().reset_index(drop=True)
    dim_product["product_key"] = dim_product.index + 1

    # Fact Table
    df_fact = df_raw.copy()
    df_fact = df_fact.merge(dim_date[["transaction_date","date_key"]], on="transaction_date")
    df_fact = df_fact.merge(dim_time[["transaction_time","time_key"]], on="transaction_time")
    df_fact = df_fact.merge(dim_store[["store_id","store_key"]], on="store_id")
    df_fact = df_fact.merge(dim_product[["product_id","product_key"]], on="product_id")

    df_fact = df_fact[[
        "transaction_id",
        "date_key",
        "time_key",
        "store_key",
        "product_key",
        "transaction_qty",
        "total_amount"
    ]]

    return dim_date, dim_time, dim_store, dim_product, df_fact

def load_data_to_postgres(dim_date, dim_time, dim_store, dim_product, df_fact):
    """
    Loads transformed data into the PostgreSQL OLAP database.
    """
    logging.info("📤 Loading data to PostgreSQL...")
    conn = None
    try:
        conn = get_postgres_connection()
        if not conn:
            return
            
        def insert_if_not_exists(df, table_name, unique_col):
            if df.empty:
                return
            cursor = conn.cursor()
            for _, row in df.iterrows():
                cols = list(row.index)
                vals = [row[c] for c in cols]
                placeholders = ','.join(['%s'] * len(cols))
                sql = f"""
                INSERT INTO {table_name} ({','.join(cols)})
                VALUES ({placeholders})
                ON CONFLICT ({unique_col}) DO NOTHING;
                """
                cursor.execute(sql, vals)
            conn.commit()
            cursor.close()
        
        insert_if_not_exists(dim_date, 'dim_date', 'date_key')
        logging.info("✅ dim_date loaded.")
        insert_if_not_exists(dim_time, 'dim_time', 'time_key')
        logging.info("✅ dim_time loaded.")
        insert_if_not_exists(dim_store, 'dim_store', 'store_key')
        logging.info("✅ dim_store loaded.")
        insert_if_not_exists(dim_product, 'dim_product', 'product_key')
        logging.info("✅ dim_product loaded.")
        insert_if_not_exists(df_fact, 'fact_sales', 'transaction_id')
        logging.info("✅ fact_sales loaded.")
        logging.info("✅ Data loaded to PostgreSQL successfully!")
        
    except Exception as e:
        logging.error(f"❌ Failed to load data to PostgreSQL: {e}")
    finally:
        if conn:
            conn.close()

In [10]:
extract_data_from_mysql("2025-01-01")

2025-08-20 06:17:15,680 - INFO - 📥 Extracting data from MySQL for 2025-01-01...
  df_raw = pd.read_sql(query, conn)
2025-08-20 06:17:16,000 - INFO - ✅ Extracted 549 records.
2025-08-20 06:17:16,002 - INFO - MySQL connection closed.


Unnamed: 0,transaction_id,transaction_date,transaction_time,transaction_qty,store_id,store_location,product_id,unit_price,product_category,product_type,product_detail
0,1_20250101,2025-01-01,0 days 07:06:11,2.0,5,Main Campus,32,45.00,Coffee,Gourmet brewed coffee,Ethiopia Rg
1,10_20250101,2025-01-01,0 days 07:39:34,2.0,5,Main Campus,58,52.50,Drinking Chocolate,Hot chocolate,Dark chocolate Rg
2,100_20250101,2025-01-01,0 days 10:49:51,2.0,5,Main Campus,61,71.25,Drinking Chocolate,Hot chocolate,Sustainably Grown Organic Lg
3,101_20250101,2025-01-01,0 days 10:51:17,1.0,5,Main Campus,47,45.00,Tea,Brewed Green tea,Serenity Green Tea Lg
4,102_20250101,2025-01-01,0 days 10:52:44,2.0,8,East Campus,23,37.50,Coffee,Drip coffee,Our Old Time Diner Blend Rg
...,...,...,...,...,...,...,...,...,...,...,...
544,95_20250101,2025-01-01,0 days 10:30:07,1.0,5,Main Campus,70,48.75,Bakery,Scone,Cranberry Scone
545,96_20250101,2025-01-01,0 days 10:35:47,1.0,8,East Campus,60,56.25,Drinking Chocolate,Hot chocolate,Sustainably Grown Organic Rg
546,97_20250101,2025-01-01,0 days 10:41:53,1.0,5,Main Campus,59,67.50,Drinking Chocolate,Hot chocolate,Dark chocolate Lg
547,98_20250101,2025-01-01,0 days 10:47:53,2.0,5,Main Campus,36,56.25,Coffee,Premium brewed coffee,Jamaican Coffee River Lg


In [12]:
transform_data(df)

2025-08-20 06:18:23,293 - INFO - 🔄 Transforming data for OLAP...


AttributeError: Can only use .dt accessor with datetimelike values

In [19]:
pkill -f "airflow"

SyntaxError: invalid syntax (3526988528.py, line 1)

In [70]:
import pandas as pd
import mysql.connector
import psycopg2
import logging

# Set up logging for visibility
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# =========================================================
# 1️⃣ Database Connection Configurations
# =========================================================
MYSQL_HOST = "localhost"
MYSQL_PORT = 3306
MYSQL_USER = "root"
MYSQL_PASSWORD = "@Whatsnew2711"
MYSQL_DB = "coffee_shop_sales_oltp"

PG_HOST = "localhost"
PG_PORT = 5432
PG_USER = "postgres"
PG_PASSWORD = "Whatsnew2711"
PG_DB = "Coffee_Machine_Project_One"

def get_postgres_connection():
    """Establishes and returns a PostgreSQL database connection."""
    try:
        conn = psycopg2.connect(
            host=PG_HOST,
            port=PG_PORT,
            user=PG_USER,
            password=PG_PASSWORD,
            dbname=PG_DB
        )
        return conn
    except psycopg2.Error as e:
        logging.error(f"❌ Failed to connect to PostgreSQL: {e}")
        return None

# =========================================================
# 2️⃣ ETL Functions
# =========================================================

def extract_data_from_mysql(date_to_process):
    """
    Extracts daily transaction data from MySQL OLTP database
    for a specific date.
    """
    logging.info(f"📥 Extracting data from MySQL for {date_to_process}...")
    conn = None
    cursor = None
    try:
        conn = mysql.connector.connect(
            host=MYSQL_HOST,
            port=MYSQL_PORT,
            user=MYSQL_USER,
            password=MYSQL_PASSWORD,
            database=MYSQL_DB
        )
        cursor = conn.cursor()
        query = f"""
        SELECT
            transaction_id,
            transaction_date,
            transaction_time,
            transaction_qty,
            store_id,
            store_location,
            product_id,
            unit_price,
            product_category,
            product_type,
            product_detail
        FROM sales_data
        WHERE transaction_date = '{date_to_process}';
        """
        df_raw = pd.read_sql(query, conn)
        logging.info(f"✅ Extracted {len(df_raw)} records.")
        return df_raw
    except mysql.connector.Error as err:
        logging.error(f"❌ MySQL Error: {err}")
        if 'conn' in locals() and conn.is_connected():
            conn.rollback()
        return None
    finally:
        if 'cursor' in locals() and cursor is not None:
            cursor.close()
        if 'conn' in locals() and conn.is_connected():
            conn.close()
            logging.info("MySQL connection closed.")

# =============================
# Transform Data (Postgres-ready)
# =============================

def transform_data(df_raw):
    """
    Transforms raw MySQL data into star schema DataFrames that match PostgreSQL tables.
    Returns: dim_date, dim_time, dim_store, dim_product, df_fact
    """
    import pandas as pd
    import logging
    logging.info("🔄 Transforming data for OLAP...")

    # ----------------------------
    # Convert transaction_date to datetime
    # ----------------------------
    if "transaction_date" in df_raw.columns:
        df_raw["transaction_date"] = pd.to_datetime(df_raw["transaction_date"], errors="coerce")

    # ----------------------------
    # Convert transaction_time from timedelta to time
    # ----------------------------
    if "transaction_time" in df_raw.columns:
        if pd.api.types.is_timedelta64_dtype(df_raw["transaction_time"]):
            df_raw["transaction_time"] = df_raw["transaction_time"].apply(
                lambda x: (pd.Timestamp("00:00:00") + x).time() if pd.notnull(x) else None
            )
        else:
            df_raw["transaction_time"] = pd.to_datetime(
                df_raw["transaction_time"].astype(str), errors="coerce"
            ).dt.time

    # Drop rows where conversion failed
    df_raw = df_raw.dropna(subset=["transaction_date", "transaction_time"])

    # ----------------------------
    # Calculate total_amount
    # ----------------------------
    df_raw["total_amount"] = df_raw["transaction_qty"] * df_raw["unit_price"]

    # ----------------------------
    # Dim Date
    # ----------------------------
    dim_date = df_raw[["transaction_date"]].drop_duplicates().reset_index(drop=True)
    dim_date["day"] = dim_date["transaction_date"].dt.day
    dim_date["month"] = dim_date["transaction_date"].dt.month
    dim_date["year"] = dim_date["transaction_date"].dt.year
    dim_date["weekday"] = dim_date["transaction_date"].dt.day_name()

    # ----------------------------
    # Dim Time
    # ----------------------------
    dim_time = df_raw[["transaction_time"]].drop_duplicates().reset_index(drop=True)
    dim_time["hour"] = dim_time["transaction_time"].apply(lambda x: x.hour)
    dim_time["minute"] = dim_time["transaction_time"].apply(lambda x: x.minute)
    dim_time["second"] = dim_time["transaction_time"].apply(lambda x: x.second)

    # ----------------------------
    # Dim Store
    # ----------------------------
    dim_store = df_raw[["store_id", "store_location"]].drop_duplicates().reset_index(drop=True)

    # ----------------------------
    # Dim Product
    # ----------------------------
    dim_product = df_raw[["product_id", "product_category", "product_type", "product_detail", "unit_price"]].drop_duplicates().reset_index(drop=True)

    # ----------------------------
    # Fact Table
    # ----------------------------
    df_fact = df_raw.copy()
    df_fact = df_fact.merge(dim_date, on="transaction_date")
    df_fact = df_fact.merge(dim_time, on="transaction_time")
    df_fact = df_fact.merge(dim_store, on=["store_id", "store_location"])
    df_fact = df_fact.merge(dim_product, on=["product_id", "product_category", "product_type", "product_detail", "unit_price"])

    df_fact = df_fact[[
        "transaction_id",
        "transaction_date",  # maps to dim_date.date_id in load
        "transaction_time",  # maps to dim_time.time_id
        "store_id",          # maps to dim_store.store_id
        "product_id",        # maps to dim_product.product_id
        "transaction_qty",
        "total_amount"
    ]]

    logging.info(f"✅ Transformation completed. Records: {len(df_fact)}")
    return dim_date, dim_time, dim_store, dim_product, df_fact


def load_data_to_postgres(dim_date, dim_time, dim_store, dim_product, df_fact):
    """
    Loads transformed data into PostgreSQL OLAP database
    while ensuring no duplicates.
    """
    logging.info("📤 Loading data to PostgreSQL...")
    conn = None
    try:
        conn = get_postgres_connection()
        if not conn:
            return
        
        cur = conn.cursor()

        # -----------------------------
        # Dim Date
        # -----------------------------
        date_id_map = {}
        for _, row in dim_date.iterrows():
            cur.execute("""
                INSERT INTO dim_date (transaction_date, day, month, year, weekday)
                VALUES (%s, %s, %s, %s, %s)
                ON CONFLICT (transaction_date) DO UPDATE SET
                    day = EXCLUDED.day,
                    month = EXCLUDED.month,
                    year = EXCLUDED.year,
                    weekday = EXCLUDED.weekday
                RETURNING date_id;
            """, (
                row["transaction_date"].date(),
                int(row["day"]),
                int(row["month"]),
                int(row["year"]),
                row["weekday"]
            ))
            date_id_map[row["transaction_date"]] = cur.fetchone()[0]

        # -----------------------------
        # Dim Time
        # -----------------------------
        time_id_map = {}
        for _, row in dim_time.iterrows():
            cur.execute("""
                INSERT INTO dim_time (transaction_time, hour, minute, second)
                VALUES (%s, %s, %s, %s)
                ON CONFLICT (transaction_time) DO UPDATE SET
                    hour = EXCLUDED.hour,
                    minute = EXCLUDED.minute,
                    second = EXCLUDED.second
                RETURNING time_id;
            """, (
                row["transaction_time"],
                int(row["hour"]),
                int(row["minute"]),
                int(row["second"])
            ))
            time_id_map[row["transaction_time"]] = cur.fetchone()[0]

        # -----------------------------
        # Dim Store
        # -----------------------------
        store_id_map = {}
        for _, row in dim_store.iterrows():
            cur.execute("""
                INSERT INTO dim_store (store_location)
                VALUES (%s)
                ON CONFLICT (store_id) DO NOTHING
                RETURNING store_id;
            """, (row["store_location"],))
            result = cur.fetchone()
            store_id_map[row["store_id"]] = result[0] if result else row["store_id"]

        # -----------------------------
        # Dim Product
        # -----------------------------
        product_id_map = {}
        for _, row in dim_product.iterrows():
            cur.execute("""
                INSERT INTO dim_product (product_category, product_type, product_detail, unit_price)
                VALUES (%s, %s, %s, %s)
                ON CONFLICT (product_id) DO NOTHING
                RETURNING product_id;
            """, (
                row["product_category"],
                row["product_type"],
                row["product_detail"],
                float(row["unit_price"])
            ))
            result = cur.fetchone()
            product_id_map[row["product_id"]] = result[0] if result else row["product_id"]

        conn.commit()

        # -----------------------------
        # Fact Table
        # -----------------------------
        for _, row in df_fact.iterrows():
            cur.execute("""
                INSERT INTO fact_sales (date_id, time_id, store_id, product_id, transaction_qty, total_amount)
                VALUES (%s, %s, %s, %s, %s, %s)
                ON CONFLICT (transaction_id) DO NOTHING;
            """, (
                date_id_map[row["transaction_date"]],
                time_id_map[row["transaction_time"]],
                store_id_map[row["store_id"]],
                product_id_map[row["product_id"]],
                int(row["transaction_qty"]),
                float(row["total_amount"])
            ))

        conn.commit()
        cur.close()
        logging.info("✅ Data loaded to PostgreSQL successfully!")

    except Exception as e:
        logging.error(f"❌ Failed to load data to PostgreSQL: {e}")
    finally:
        if conn:
            conn.close()


In [76]:
# 1️⃣ Extract data for a specific date
df_raw = extract_data_from_mysql("2025-01-02")
print("Extracted data sample:")
print(df_raw.head())

# 2️⃣ Transform the raw data into star schema
dim_date, dim_time, dim_store, dim_product, df_fact = transform_data(df_raw)
print("\nDimension & Fact tables samples:")
print("dim_date:")
print(dim_date.head())
print("dim_time:")
print(dim_time.head())
print("dim_store:")
print(dim_store.head())
print("dim_product:")
print(dim_product.head())
print("fact_sales:")
print(df_fact.head())

# 3️⃣ Load transformed data into PostgreSQL
load_data_to_postgres(dim_date, dim_time, dim_store, dim_product, df_fact)
print("\n✅ ETL process completed for 2025-01-02")


2025-08-20 08:31:24,458 - INFO - 📥 Extracting data from MySQL for 2025-01-02...
  df_raw = pd.read_sql(query, conn)
2025-08-20 08:31:24,772 - INFO - ✅ Extracted 566 records.
2025-08-20 08:31:24,776 - INFO - MySQL connection closed.
2025-08-20 08:31:24,791 - INFO - 🔄 Transforming data for OLAP...
2025-08-20 08:31:24,879 - INFO - ✅ Transformation completed. Records: 566
2025-08-20 08:31:24,888 - INFO - 📤 Loading data to PostgreSQL...


Extracted data sample:
  transaction_id transaction_date transaction_time  transaction_qty store_id  \
0  1000_20250102       2025-01-02  0 days 17:19:03              1.0        8   
1  1001_20250102       2025-01-02  0 days 17:19:40              1.0        5   
2  1002_20250102       2025-01-02  0 days 17:19:40              1.0        5   
3  1003_20250102       2025-01-02  0 days 17:20:14              1.0        8   
4  1004_20250102       2025-01-02  0 days 17:21:01              1.0        3   

  store_location product_id  unit_price product_category  \
0    East Campus         54        37.5              Tea   
1    Main Campus         27        52.5           Coffee   
2    Main Campus         74        52.5           Bakery   
3    East Campus         46        37.5              Tea   
4    West Campus         28        30.0           Coffee   

            product_type             product_detail  
0        Brewed Chai tea    Morning Sunrise Chai Rg  
1  Organic brewed coffee   

2025-08-20 08:31:25,290 - INFO - ✅ Data loaded to PostgreSQL successfully!



✅ ETL process completed for 2025-01-02


In [73]:
import pandas as pd
import mysql.connector
import psycopg2
import logging

# Set up logging for visibility
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# =========================================================
# 1️⃣ Database Connection Configurations
# =========================================================
MYSQL_HOST = "localhost"
MYSQL_PORT = 3306
MYSQL_USER = "root"
MYSQL_PASSWORD = "@Whatsnew2711"
MYSQL_DB = "coffee_shop_sales_oltp"

PG_HOST = "localhost"
PG_PORT = 5432
PG_USER = "postgres"
PG_PASSWORD = "Whatsnew2711"
PG_DB = "Coffee_Machine_Project_One"

def get_postgres_connection():
    """Establishes and returns a PostgreSQL database connection."""
    try:
        conn = psycopg2.connect(
            host=PG_HOST,
            port=PG_PORT,
            user=PG_USER,
            password=PG_PASSWORD,
            dbname=PG_DB
        )
        return conn
    except psycopg2.Error as e:
        logging.error(f"❌ Failed to connect to PostgreSQL: {e}")
        return None

# =========================================================
# 2️⃣ ETL Functions
# =========================================================

def extract_data_from_mysql(date_to_process):
    """
    Extracts daily transaction data from MySQL OLTP database
    for a specific date.
    """
    logging.info(f"📥 Extracting data from MySQL for {date_to_process}...")
    conn = None
    cursor = None
    try:
        conn = mysql.connector.connect(
            host=MYSQL_HOST,
            port=MYSQL_PORT,
            user=MYSQL_USER,
            password=MYSQL_PASSWORD,
            database=MYSQL_DB
        )
        cursor = conn.cursor()
        query = f"""
        SELECT
            transaction_id,
            transaction_date,
            transaction_time,
            transaction_qty,
            store_id,
            store_location,
            product_id,
            unit_price,
            product_category,
            product_type,
            product_detail
        FROM sales_data
        WHERE transaction_date = '{date_to_process}';
        """
        df_raw = pd.read_sql(query, conn)
        logging.info(f"✅ Extracted {len(df_raw)} records.")
        return df_raw
    except mysql.connector.Error as err:
        logging.error(f"❌ MySQL Error: {err}")
        if 'conn' in locals() and conn.is_connected():
            conn.rollback()
        return None
    finally:
        if 'cursor' in locals() and cursor is not None:
            cursor.close()
        if 'conn' in locals() and conn.is_connected():
            conn.close()
            logging.info("MySQL connection closed.")

# =============================
# Transform Data (Postgres-ready)
# =============================

def transform_data(df_raw):
    """
    Transforms raw MySQL data into star schema DataFrames that match PostgreSQL tables.
    Returns: dim_date, dim_time, dim_store, dim_product, df_fact
    """
    import pandas as pd
    import logging
    logging.info("🔄 Transforming data for OLAP...")

    # ----------------------------
    # Convert transaction_date to datetime
    # ----------------------------
    if "transaction_date" in df_raw.columns:
        df_raw["transaction_date"] = pd.to_datetime(df_raw["transaction_date"], errors="coerce")

    # ----------------------------
    # Convert transaction_time from timedelta to time
    # ----------------------------
    if "transaction_time" in df_raw.columns:
        if pd.api.types.is_timedelta64_dtype(df_raw["transaction_time"]):
            df_raw["transaction_time"] = df_raw["transaction_time"].apply(
                lambda x: (pd.Timestamp("00:00:00") + x).time() if pd.notnull(x) else None
            )
        else:
            df_raw["transaction_time"] = pd.to_datetime(
                df_raw["transaction_time"].astype(str), errors="coerce"
            ).dt.time

    # Drop rows where conversion failed
    df_raw = df_raw.dropna(subset=["transaction_date", "transaction_time"])

    # ----------------------------
    # Calculate total_amount
    # ----------------------------
    df_raw["total_amount"] = df_raw["transaction_qty"] * df_raw["unit_price"]

    # ----------------------------
    # Dim Date
    # ----------------------------
    dim_date = df_raw[["transaction_date"]].drop_duplicates().reset_index(drop=True)
    dim_date["day"] = dim_date["transaction_date"].dt.day
    dim_date["month"] = dim_date["transaction_date"].dt.month
    dim_date["year"] = dim_date["transaction_date"].dt.year
    dim_date["weekday"] = dim_date["transaction_date"].dt.day_name()

    # ----------------------------
    # Dim Time
    # ----------------------------
    dim_time = df_raw[["transaction_time"]].drop_duplicates().reset_index(drop=True)
    dim_time["hour"] = dim_time["transaction_time"].apply(lambda x: x.hour)
    dim_time["minute"] = dim_time["transaction_time"].apply(lambda x: x.minute)
    dim_time["second"] = dim_time["transaction_time"].apply(lambda x: x.second)

    # ----------------------------
    # Dim Store
    # ----------------------------
    dim_store = df_raw[["store_id", "store_location"]].drop_duplicates().reset_index(drop=True)

    # ----------------------------
    # Dim Product
    # ----------------------------
    dim_product = df_raw[["product_id", "product_category", "product_type", "product_detail", "unit_price"]].drop_duplicates().reset_index(drop=True)

    # ----------------------------
    # Fact Table
    # ----------------------------
    df_fact = df_raw.copy()
    df_fact = df_fact.merge(dim_date, on="transaction_date")
    df_fact = df_fact.merge(dim_time, on="transaction_time")
    df_fact = df_fact.merge(dim_store, on=["store_id", "store_location"])
    df_fact = df_fact.merge(dim_product, on=["product_id", "product_category", "product_type", "product_detail", "unit_price"])

    df_fact = df_fact[[
        "transaction_id",
        "transaction_date",  # maps to dim_date.date_id in load
        "transaction_time",  # maps to dim_time.time_id
        "store_id",          # maps to dim_store.store_id
        "product_id",        # maps to dim_product.product_id
        "transaction_qty",
        "total_amount"
    ]]

    logging.info(f"✅ Transformation completed. Records: {len(df_fact)}")
    return dim_date, dim_time, dim_store, dim_product, df_fact

# =============================
# Load Data (Corrected)
# =============================

def load_data_to_postgres(dim_date, dim_time, dim_store, dim_product, df_fact):
    """
    Loads transformed data into PostgreSQL OLAP database
    by checking for existence before inserting to avoid duplicates.
    """
    logging.info("📤 Loading data to PostgreSQL...")
    conn = None
    try:
        conn = get_postgres_connection()
        if not conn:
            return
        
        cur = conn.cursor()

        # -----------------------------
        # Dim Date (Check then Insert)
        # -----------------------------
        date_id_map = {}
        for _, row in dim_date.iterrows():
            cur.execute("SELECT date_id FROM dim_date WHERE transaction_date = %s;", (row["transaction_date"].date(),))
            result = cur.fetchone()

            if result:
                date_id_map[row["transaction_date"]] = result[0]
            else:
                cur.execute("""
                    INSERT INTO dim_date (transaction_date, day, month, year, weekday)
                    VALUES (%s, %s, %s, %s, %s)
                    RETURNING date_id;
                """, (
                    row["transaction_date"].date(),
                    int(row["day"]),
                    int(row["month"]),
                    int(row["year"]),
                    row["weekday"]
                ))
                date_id_map[row["transaction_date"]] = cur.fetchone()[0]

        # -----------------------------
        # Dim Time (Check then Insert)
        # -----------------------------
        time_id_map = {}
        for _, row in dim_time.iterrows():
            cur.execute("SELECT time_id FROM dim_time WHERE transaction_time = %s;", (row["transaction_time"],))
            result = cur.fetchone()

            if result:
                time_id_map[row["transaction_time"]] = result[0]
            else:
                cur.execute("""
                    INSERT INTO dim_time (transaction_time, hour, minute, second)
                    VALUES (%s, %s, %s, %s)
                    RETURNING time_id;
                """, (
                    row["transaction_time"],
                    int(row["hour"]),
                    int(row["minute"]),
                    int(row["second"])
                ))
                time_id_map[row["transaction_time"]] = cur.fetchone()[0]

        # -----------------------------
        # Dim Store (Check then Insert)
        # -----------------------------
        store_id_map = {}
        for _, row in dim_store.iterrows():
            cur.execute("SELECT store_id FROM dim_store WHERE store_location = %s;", (row["store_location"],))
            result = cur.fetchone()

            if result:
                store_id_map[row["store_id"]] = result[0]
            else:
                cur.execute("""
                    INSERT INTO dim_store (store_location)
                    VALUES (%s)
                    RETURNING store_id;
                """, (row["store_location"],))
                store_id_map[row["store_id"]] = cur.fetchone()[0]

        # -----------------------------
        # Dim Product (Check then Insert)
        # -----------------------------
        product_id_map = {}
        for _, row in dim_product.iterrows():
            cur.execute("""
                SELECT product_id FROM dim_product
                WHERE product_category = %s AND product_type = %s AND product_detail = %s;
            """, (row["product_category"], row["product_type"], row["product_detail"]))
            result = cur.fetchone()

            if result:
                product_id_map[row["product_id"]] = result[0]
            else:
                cur.execute("""
                    INSERT INTO dim_product (product_category, product_type, product_detail, unit_price)
                    VALUES (%s, %s, %s, %s)
                    RETURNING product_id;
                """, (
                    row["product_category"],
                    row["product_type"],
                    row["product_detail"],
                    float(row["unit_price"])
                ))
                product_id_map[row["product_id"]] = cur.fetchone()[0]
        
        conn.commit()

        # -----------------------------
        # Fact Table (Check then Insert)
        # -----------------------------
        for _, row in df_fact.iterrows():
            cur.execute("SELECT transaction_id FROM fact_sales WHERE transaction_id = %s;", (row["transaction_id"],))
            result = cur.fetchone()
            
            if not result:
                cur.execute("""
                    INSERT INTO fact_sales (transaction_id, date_id, time_id, store_id, product_id, transaction_qty, total_amount)
                    VALUES (%s, %s, %s, %s, %s, %s, %s);
                """, (
                    row["transaction_id"],
                    date_id_map[row["transaction_date"]],
                    time_id_map[row["transaction_time"]],
                    store_id_map[row["store_id"]],
                    product_id_map[row["product_id"]],
                    int(row["transaction_qty"]),
                    float(row["total_amount"])
                ))
        
        conn.commit()
        cur.close()
        logging.info("✅ Data loaded to PostgreSQL successfully!")

    except Exception as e:
        logging.error(f"❌ Failed to load data to PostgreSQL: {e}")
        if conn:
            conn.rollback()
    finally:
        if conn:
            conn.close()

In [87]:
import pandas as pd
from sqlalchemy import create_engine, text
from datetime import datetime, timedelta
import logging
import sys
import mysql.connector

# Import the ETL functions from your main script
from etl_olap_postgresql import (
    extract_data_from_mysql,
    transform_data,
    load_data_to_postgres,
    get_postgres_connection
)

# =========================================================================
# Logging Configuration
# =========================================================================
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

# =========================================================================
# Database Configurations for the Scheduler
# =========================================================================
MYSQL_HOST = 'localhost'
MYSQL_PORT = 3306
MYSQL_USER = 'root'
MYSQL_PASSWORD = '@Whatsnew2711'
MYSQL_DB = 'coffee_shop_sales_oltp'

PG_HOST = 'localhost'
PG_PORT = 5432
PG_USER = 'postgres'
PG_PASSWORD = 'Whatsnew2711'
PG_DB = 'Coffee_Machine_Project_One'

try:
    mysql_conn_str = f"mysql+mysqlconnector://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DB}"
    pg_conn_str = f"postgresql+psycopg2://{PG_USER}:{PG_PASSWORD}@{PG_HOST}:{PG_PORT}/{PG_DB}"

    mysql_engine = create_engine(mysql_conn_str)
    pg_engine = create_engine(pg_conn_str)
except Exception as e:
    logging.error(f"❌ Failed to create database engines for scheduler: {e}")
    sys.exit(1)

# =========================================================================
# Scheduler Functions
# =========================================================================

def get_next_date_to_process():
    """
    Determine the next date to process by finding the last date
    in the PostgreSQL OLAP fact table.
    """
    logging.info("🗓️ Checking for the last processed date in PostgreSQL...")
    try:
        query = "SELECT MAX(transaction_date) FROM fact_sales;"
        with pg_engine.connect() as conn:
            last_date_df = pd.read_sql_query(text(query), conn)
            last_date = last_date_df.iloc[0, 0]

        if last_date is None or pd.isna(last_date):
            logging.info("⭐ OLAP database is empty. Finding earliest date from MySQL...")
            query = "SELECT MIN(transaction_date) FROM sales_data;"
            with mysql_engine.connect() as conn:
                earliest_date_df = pd.read_sql_query(text(query), conn)
                earliest_date = earliest_date_df.iloc[0, 0]

            if earliest_date is None or pd.isna(earliest_date):
                logging.warning("⚠️ No data found in the source MySQL database.")
                return None
            return pd.to_datetime(earliest_date)
        else:
            return pd.to_datetime(last_date) + timedelta(days=1)

    except Exception as e:
        logging.error(f"❌ Error in get_next_date_to_process: {e}", exc_info=True)
        return None


def run_daily_incremental():
    """
    Orchestrates the full ETL process from MySQL to PostgreSQL for a single day.
    """
    logging.info("🚀 Starting daily incremental ETL process...")

    next_date = get_next_date_to_process()
    if next_date is None:
        logging.error("❌ Failed to determine the next date to process. Exiting.")
        return

    # ✅ Use SQLAlchemy connection for MySQL query
    query = "SELECT MAX(transaction_date) FROM sales_data;"
    with mysql_engine.connect() as conn:
        last_raw_date_df = pd.read_sql_query(text(query), conn)
        last_raw_date = last_raw_date_df.iloc[0, 0]

    if pd.isna(last_raw_date) or next_date.date() > pd.to_datetime(last_raw_date).date():
        logging.info("✅ No new data to process. All available data is loaded.")
        return

    date_to_process = next_date.strftime('%Y-%m-%d')
    logging.info(f"➡️ Processing daily incremental ETL for: {date_to_process}")

    df_raw = extract_data_from_mysql(date_to_process)
    if df_raw is None or df_raw.empty:
        logging.warning(f"⚠️ No data found for {date_to_process}. Skipping load.")
        return

    try:
        dim_date, dim_time, dim_store, dim_product, df_fact = transform_data(df_raw)
        load_data_to_postgres(dim_date, dim_time, dim_store, dim_product, df_fact)
        logging.info(f"✅ Daily incremental ETL for {date_to_process} completed successfully.")
    except Exception as e:
        logging.error(f"❌ An error occurred during the ETL process for {date_to_process}: {e}", exc_info=True)


if __name__ == "__main__":
    run_daily_incremental()


2025-08-20 09:26:54,457 - INFO - 🚀 Starting daily incremental ETL process...
2025-08-20 09:26:54,460 - INFO - 🗓️ Checking for the last processed date in PostgreSQL...
  last_date_df = pd.read_sql_query(text(query), conn)
2025-08-20 09:26:54,574 - ERROR - ❌ Error in get_next_date_to_process: Query must be a string unless using sqlalchemy.
Traceback (most recent call last):
  File "/var/folders/jh/nl0hs4bx26l4cbrp239k3n9c0000gn/T/ipykernel_1405/246310345.py", line 62, in get_next_date_to_process
    last_date_df = pd.read_sql_query(text(query), conn)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mac/anaconda3/envs/airflow_env/lib/python3.11/site-packages/pandas/io/sql.py", line 528, in read_sql_query
    return pandas_sql.read_query(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/mac/anaconda3/envs/airflow_env/lib/python3.11/site-packages/pandas/io/sql.py", line 2728, in read_query
    cursor = self.execute(sql, params)
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  

In [82]:
pip install mysql-connector-python psycopg2-binary sqlalchemy

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.10-cp311-cp311-macosx_12_0_x86_64.whl.metadata (4.9 kB)
Downloading psycopg2_binary-2.9.10-cp311-cp311-macosx_12_0_x86_64.whl (3.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.10
Note: you may need to restart the kernel to use updated packages.


In [9]:
import pandas as pd
import os
from datetime import datetime

def extract_data(input_csv):
    """
    Extracts data from a specified CSV file into a pandas DataFrame.
    """
    print(f"📥 Extracting data from '{input_csv}'...")
    try:
        df = pd.read_csv(input_csv, sep=";")
        return df
    except FileNotFoundError:
        print(f"❌ Error: The file '{input_csv}' was not found.")
        return None

def transform_data(df, start_offset_date, end_offset_date):
    """
    Transforms the raw DataFrame by:
    - Filtering by date range
    - Formatting dates & times
    - Mapping store locations
    - Converting currency
    - Regenerating unique transaction IDs
    """
    print("🔄 Transforming data...")

    # Convert date and time
    df['transaction_date'] = pd.to_datetime(df['transaction_date'], errors='coerce')
    df['transaction_time'] = pd.to_datetime(
        df['transaction_time'], format='%H:%M:%S', errors='coerce'
    ).dt.strftime('%H:%M:%S')

    # Filter by date range
    start_date = pd.to_datetime(start_offset_date)
    end_date = pd.to_datetime(end_offset_date)
    df_filtered = df[
        (df['transaction_date'] >= start_date) & (df['transaction_date'] <= end_date)
    ].copy()

    if df_filtered.empty:
        print(f"⚠ No data found for the range {start_offset_date} to {end_offset_date}.")
        return df_filtered

    # Format date for output
    df_filtered['transaction_date'] = df_filtered['transaction_date'].dt.strftime('%Y-%m-%d')

    # Map store locations
    location_map = {
        "Hell's Kitchen": "East Campus",
        "Astoria": "West Campus",
        "Lower Manhattan": "Main Campus"
    }
    df_filtered['store_location'] = df_filtered['store_location'].replace(location_map)

    # Convert unit_price
    df_filtered['unit_price'] = (df_filtered['unit_price'] * 15).round(2)

    # Convert key columns to string safely
    cols_to_convert = ['transaction_id', 'store_id', 'product_id']
    df_filtered[cols_to_convert] = df_filtered[cols_to_convert].apply(
        lambda col: col.dropna().astype(int).astype(str)
    )

    # Force unique transaction IDs by appending the date
    df_filtered['transaction_id'] = (
        df_filtered['transaction_id'].astype(str) + "_" + 
        pd.to_datetime(df_filtered['transaction_date']).dt.strftime('%Y%m%d')
    )

    return df_filtered

def load_data(df_transformed, output_csv, start_offset_date, end_offset_date):
    """
    Loads transformed data into the CSV, replacing rows in the given date range
    and ensuring all transaction IDs are unique across the file.
    """
    if df_transformed.empty:
        print("⚠ No data to load.")
        return

    # Ensure proper types
    df_transformed['transaction_date'] = pd.to_datetime(df_transformed['transaction_date'])
    df_transformed['transaction_id'] = df_transformed['transaction_id'].astype(str)

    if os.path.exists(output_csv):
        print("📂 Existing data found. Removing old data in the same date range...")
        df_existing = pd.read_csv(output_csv)
        df_existing['transaction_date'] = pd.to_datetime(df_existing['transaction_date'])
        df_existing['transaction_id'] = df_existing['transaction_id'].astype(str)

        # Remove rows from existing file that fall within the same date range
        mask = ~(
            (df_existing['transaction_date'] >= pd.to_datetime(start_offset_date)) &
            (df_existing['transaction_date'] <= pd.to_datetime(end_offset_date))
        )
        df_existing = df_existing[mask]

        # Append new transformed data
        df_final = pd.concat([df_existing, df_transformed], ignore_index=True)
    else:
        print("🆕 No existing data. Creating a new file...")
        df_final = df_transformed.copy()

    # Keep only unique transaction IDs (final safeguard)
    df_final = df_final.drop_duplicates(subset=['transaction_id'], keep='last')
    df_final = df_final.sort_values(by=['transaction_date', 'transaction_time'])

    df_final.to_csv(output_csv, index=False)
    print(f"✅ Load complete! Total unique transactions: {df_final['transaction_id'].nunique()}")

if __name__ == "__main__":
    input_file = "Coffee Shop Sales2.csv"  # your raw CSV file
    output_file = "processed_sales_data.csv"  # final output

    start_date = input("Enter start date (YYYY-MM-DD): ").strip()
    end_date = input("Enter end date (YYYY-MM-DD): ").strip()

    df_raw = extract_data(input_file)
    if df_raw is not None:
        df_transformed = transform_data(df_raw, start_date, end_date)
        load_data(df_transformed, output_file, start_date, end_date)


Enter start date (YYYY-MM-DD):  2025-01-01
Enter end date (YYYY-MM-DD):  2025-05-05


📥 Extracting data from 'Coffee Shop Sales2.csv'...
🔄 Transforming data...
🆕 No existing data. Creating a new file...
✅ Load complete! Total unique transactions: 85474
