# Lab 4
# Extend ETL pipeline 

#### Imports

In [9]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random
import os 

#### Loading data

In [10]:
DATA_FILE = 'custom_data.csv' # Your existing data file
LAST_EXTRACTION_FILE = 'last_extraction.txt'
OUTPUT_FULL_TRANSFORMED_FILENAME = 'transformed_full.csv'
OUTPUT_INCREMENTAL_TRANSFORMED_FILENAME = 'transformed_incremental.csv'
print(f"Data file: {DATA_FILE}")

Data file: custom_data.csv


#### Full extraction

In [11]:

df_full_extraction = pd.DataFrame() # Initialize an empty DataFrame

try:
    if os.path.exists(DATA_FILE):
        df_full_extraction = pd.read_csv(DATA_FILE)
        print(f"Number of rows: {df_full_extraction.shape[0]}")
        print(f"Number of columns: {df_full_extraction.shape[1]}")
        print(df_full_extraction.head())
        print(df_full_extraction.info())

        print(f"\nExtracted {df_full_extraction.shape[0]} rows fully.")
    else:
        print(f"Error: The file '{DATA_FILE}' was not found")

except Exception as e:
    print(f"An error occurred during full extraction: {e}")

Number of rows: 338
Number of columns: 10
   record_id transaction_date transaction_timestamp customer_name  \
0          1       2025-04-01   2025-04-01T17:49:57        Costco   
1          2       2025-04-01   2025-04-01T10:36:18         Apple   
2          3       2025-04-01   2025-04-01T17:16:47        Target   
3          4       2025-04-01   2025-04-01T18:15:48        Costco   
4          5       2025-04-01   2025-04-01T02:37:35        Amazon   

  product_category   amount  quantity payment_method last_updated_timestamp  \
0      Electronics   929.94         1         PayPal    2025-04-01T18:49:57   
1        Groceries   130.17         4         PayPal    2025-04-01T12:59:18   
2       Home Goods  1333.76         8    Credit Card    2025-04-01T18:19:47   
3            Tools  1115.54         9         PayPal    2025-04-01T20:41:48   
4         Software   633.14         6    Credit Card    2025-04-01T03:35:35   

      status  
0    Pending  
1  Cancelled  
2    Pending  
3  Cance

### Incremental extraction

In [12]:
def get_last_extraction_timestamp(file_path):
    if os.path.exists(file_path):
        with open(file_path, 'r') as f:
            try:
                timestamp_str = f.read().strip()
                if timestamp_str:
                    return datetime.fromisoformat(timestamp_str)
                else:
                    return datetime.min 
            except ValueError:
                return datetime.min 
    else:
        return datetime.min 

last_extraction_time = get_last_extraction_timestamp(LAST_EXTRACTION_FILE)
print(f"Last extraction timestamp: {last_extraction_time}")

df_incremental_extraction = pd.DataFrame() 

try:
    if not df_full_extraction.empty: 
        df_full_extraction['last_updated_timestamp'] = pd.to_datetime(df_full_extraction['last_updated_timestamp'], errors='coerce')
        df_current_data_valid_timestamps = df_full_extraction.dropna(subset=['last_updated_timestamp'])

        df_incremental_extraction = df_current_data_valid_timestamps[
            df_current_data_valid_timestamps['last_updated_timestamp'] > last_extraction_time
        ].copy() 

        print(f"\nExtracted {df_incremental_extraction.shape[0]} rows incrementally since last check ({last_extraction_time}).")

        if not df_incremental_extraction.empty:
            print("\nIncremental data extracted (first 5 rows):")
            print(df_incremental_extraction.head())
        else:
            print("No new or updated records found.")
    else:
        print("Full extraction DataFrame is empty, cannot perform incremental extraction.")

except Exception as e:
    print(f"An error occurred during incremental extraction: {e}")

Last extraction timestamp: 2025-06-09 20:56:34.567175

Extracted 0 rows incrementally since last check (2025-06-09 20:56:34.567175).
No new or updated records found.


## Transformations

#### Full DAta

In [13]:
def transform_sales_data(df):
    df_transformed = df.copy()
    initial_rows = len(df_transformed)
    df_transformed.drop_duplicates(subset=['record_id'], keep='first', inplace=True)
    if len(df_transformed) < initial_rows:
        print(f"    - Removed {initial_rows - len(df_transformed)} duplicate records based on 'record_id'.")
    else:
        print("    - No duplicate records found based on 'record_id'.")

    for col in ['customer_name', 'product_category', 'payment_method', 'status']:
        if col in df_transformed.columns and df_transformed[col].dtype == 'object':
            df_transformed[col] = df_transformed[col].str.strip()
    print("    - Stripped whitespace from string columns.")

    if 'status' in df_transformed.columns:
        df_transformed['status'] = df_transformed['status'].str.title()
        print("    - Standardized 'status' column casing.")

    print("    - Handling missing values in critical columns...")
    original_rows_after_duplicates = len(df_transformed)
    df_transformed.dropna(subset=['amount', 'quantity', 'transaction_date', 'transaction_timestamp'], inplace=True)
    if len(df_transformed) < original_rows_after_duplicates:
        print(f"    - Dropped {original_rows_after_duplicates - len(df_transformed)} rows due to invalid data (e.g., missing amount, quantity, or timestamps).")
    else:
        print("    - No rows dropped due to missing critical data after type conversion.")


    print("  - Applying structural transformations (data type conversions)...")
    df_transformed['transaction_date'] = pd.to_datetime(df_transformed['transaction_date'], errors='coerce')
    df_transformed['transaction_timestamp'] = pd.to_datetime(df_transformed['transaction_timestamp'], errors='coerce')
    df_transformed['last_updated_timestamp'] = pd.to_datetime(df_transformed['last_updated_timestamp'], errors='coerce')
    df_transformed['amount'] = pd.to_numeric(df_transformed['amount'], errors='coerce')
    df_transformed['quantity'] = pd.to_numeric(df_transformed['quantity'], errors='coerce')


    print("  - Applying enrichment transformations...")
    print("    - Calculating 'total_price'...")
    df_transformed['total_price'] = df_transformed['quantity'] * df_transformed['amount']


    print("  - Applying structural transformations (column selection and renaming)...")
    selected_columns = [
        'record_id',
        'transaction_date',
        'transaction_timestamp',
        'customer_name',
        'product_category',
        'quantity',
        'amount',
        'total_price',
        'payment_method',
        'status'
    ]
    df_transformed = df_transformed[[col for col in selected_columns if col in df_transformed.columns]]

    df_transformed.rename(columns={
        'record_id': 'transaction_id',
        'customer_name': 'customer',
        'product_category': 'category',
        'payment_method': 'payment_type'
    }, inplace=True)


    return df_transformed

if not df_full_extraction.empty:
    df_transformed_full = transform_sales_data(df_full_extraction)

    print(f"\nTransformed {len(df_transformed_full)} records from full extraction.")
    print("\nFirst 5 rows of transformed full data:")
    print(df_transformed_full.head())
    print("\nData types of transformed full data:")
    print(df_transformed_full.info())

    try:
        df_transformed_full.to_csv(OUTPUT_FULL_TRANSFORMED_FILENAME, index=False)
        print(f"\nTransformed full data successfully saved to '{OUTPUT_FULL_TRANSFORMED_FILENAME}'")
    except Exception as e:
        print(f"Error saving transformed full data to CSV: {e}")
else:
    print("Full extraction DataFrame is empty. Skipping full data transformation.")

    - No duplicate records found based on 'record_id'.
    - Stripped whitespace from string columns.
    - Standardized 'status' column casing.
    - Handling missing values in critical columns...
    - No rows dropped due to missing critical data after type conversion.
  - Applying structural transformations (data type conversions)...
  - Applying enrichment transformations...
    - Calculating 'total_price'...
  - Applying structural transformations (column selection and renaming)...

Transformed 338 records from full extraction.

First 5 rows of transformed full data:
   transaction_id transaction_date transaction_timestamp customer  \
0               1       2025-04-01   2025-04-01 17:49:57   Costco   
1               2       2025-04-01   2025-04-01 10:36:18    Apple   
2               3       2025-04-01   2025-04-01 17:16:47   Target   
3               4       2025-04-01   2025-04-01 18:15:48   Costco   
4               5       2025-04-01   2025-04-01 02:37:35   Amazon   

      

#### Incremental Data

In [14]:
if not df_incremental_extraction.empty:
    df_transformed_incremental = transform_sales_data(df_incremental_extraction)

    print(f"\nTransformed {len(df_transformed_incremental)} records from incremental extraction.")
    print("\nFirst 5 rows of transformed incremental data:")
    print(df_transformed_incremental.head())
    print("\nData types of transformed incremental data:")
    print(df_transformed_incremental.info())

    try:
        df_transformed_incremental.to_csv(OUTPUT_INCREMENTAL_TRANSFORMED_FILENAME, index=False)
        print(f"\nTransformed incremental data successfully saved to '{OUTPUT_INCREMENTAL_TRANSFORMED_FILENAME}'")
    except Exception as e:
        print(f"Error saving transformed incremental data to CSV: {e}")
else:
    print("Incremental extraction DataFrame is empty. Skipping incremental data transformation.")

Incremental extraction DataFrame is empty. Skipping incremental data transformation.


#### Structuring data
- Standardizing data formats and types.

In [5]:
def structure_data(df):
    df['date'] = pd.to_datetime(df['date'], errors='coerce').dt.strftime('%Y-%m-%d')
    df['quantity'] = df['quantity'].astype(int)
    df.rename(columns={'unit_price': 'price_per_unit'}, inplace=True)
    return df

### Update Last Extraction

In [15]:
print(f"\n--- Saving New Timestamp ---")
current_extraction_time = datetime.now()
try:
    with open(LAST_EXTRACTION_FILE, 'w') as f:
        f.write(current_extraction_time.isoformat())
    print(f"New extraction timestamp saved: {current_extraction_time.isoformat()}")
except Exception as e:
    print(f"Error saving new timestamp to '{LAST_EXTRACTION_FILE}': {e}")


--- Saving New Timestamp ---
New extraction timestamp saved: 2025-06-14T10:30:50.574049
