In [2]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random

# Set random seed for reproducibility
np.random.seed(42)
random.seed(42)

# Generate 100 records (more than the required 50)
num_records = 100

# Product catalog
products = {
    'A101': {'name': 'Wireless Headphones', 'category': 'Electronics', 'base_price': 89.99},
    'B202': {'name': 'Bluetooth Speaker', 'category': 'Electronics', 'base_price': 49.99},
    'C303': {'name': 'Smart Watch', 'category': 'Electronics', 'base_price': 199.99},
    'D404': {'name': 'Phone Case', 'category': 'Accessories', 'base_price': 19.99},
    'E505': {'name': 'USB Cable', 'category': 'Accessories', 'base_price': 9.99},
    'F606': {'name': 'Screen Protector', 'category': 'Accessories', 'base_price': 12.99}
}

# Customer regions
regions = ['North', 'South', 'East', 'West']

# Generate synthetic sales data
data = []
start_date = datetime.now() - timedelta(days=60)

for i in range(num_records):
    # Choose a random product
    product_id = random.choice(list(products.keys()))
    product = products[product_id]
    
    # Generate transaction details
    transaction_id = 10000 + i
    customer_id = random.randint(1000, 1999)
    quantity = random.randint(1, 3)
    
    # Add some price variability
    price_variation = random.uniform(0.9, 1.1)  # ±10% variation
    unit_price = round(product['base_price'] * price_variation, 2)
    amount = round(unit_price * quantity, 2)
    
    # Generate timestamp (spread over last 60 days)
    days_ago = random.randint(0, 60)
    transaction_date = start_date + timedelta(days=days_ago)
    
    # Last updated is either transaction date or later (for returns/updates)
    last_updated = transaction_date + timedelta(hours=random.randint(0, 48))
    
    # Add some payment methods
    payment_method = random.choice(['Credit Card', 'Debit Card', 'PayPal', 'Cash'])
    
    # Add region
    region = random.choice(regions)
    
    data.append({
        'transaction_id': transaction_id,
        'customer_id': customer_id,
        'product_id': product_id,
        'product_name': product['name'],
        'category': product['category'],
        'quantity': quantity,
        'unit_price': unit_price,
        'amount': amount,
        'transaction_date': transaction_date.strftime('%Y-%m-%d %H:%M:%S'),
        'last_updated': last_updated.strftime('%Y-%m-%d %H:%M:%S'),
        'payment_method': payment_method,
        'region': region
    })

# Create DataFrame
df = pd.DataFrame(data)

# Save to CSV
df.to_csv('custom_data.csv', index=False)

print(f"Generated {len(df)} records of sales data in 'sales_transactions.csv'")
print("Sample data:")
print(df.head())

Generated 100 records of sales data in 'sales_transactions.csv'
Sample data:
   transaction_id  customer_id product_id       product_name     category  \
0           10000         1114       F606   Screen Protector  Accessories   
1           10001         1758       F606   Screen Protector  Accessories   
2           10002         1238       B202  Bluetooth Speaker  Electronics   
3           10003         1603       D404         Phone Case  Accessories   
4           10004         1284       C303        Smart Watch  Electronics   

   quantity  unit_price  amount     transaction_date         last_updated  \
0         1       13.62   13.62  2025-05-01 18:12:35  2025-05-02 08:12:35   
1         3       11.92   35.76  2025-05-13 18:12:35  2025-05-13 20:12:35   
2         3       51.01  153.03  2025-05-21 18:12:35  2025-05-22 06:12:35   
3         2       21.23   42.46  2025-04-16 18:12:35  2025-04-18 18:12:35   
4         1      188.60  188.60  2025-06-03 18:12:35  2025-06-04 15:12:35  

In [3]:
# Section 1: Full Extraction

print("\n--- Section 1: Full Extraction ---")

# Load the entire dataset
full_df = pd.read_csv('custom_data.csv')

# Display basic stats
print(f"Total rows: {full_df.shape[0]}")
print(f"Total columns: {full_df.shape[1]}")
print("Sample data (first 5 rows):")
print(full_df.head())

# Print a message
print(f"Extracted {full_df.shape[0]} rows fully.")


# Section 2: Incremental Extraction

print("\n--- Section 2: Incremental Extraction ---")

last_extraction_file = 'last_extraction.txt'

def get_last_extraction_timestamp():
    try:
        with open(last_extraction_file, 'r') as f:
            timestamp_str = f.read().strip()
            if timestamp_str:
                # Handle ISO format timestamps
                return datetime.fromisoformat(timestamp_str)
    except FileNotFoundError:
        pass
    return datetime.min # Return a very old date if file doesn't exist or is empty
def save_last_extraction_timestamp(timestamp):
    with open(last_extraction_file, 'w') as f:
        f.write(timestamp.strftime('%Y-%m-%d %H:%M:%S'))

last_extracted_time = get_last_extraction_timestamp()
print(f"Last extraction timestamp: {last_extracted_time}")

# Simulate new data generation for incremental extraction
# For demonstration, let's add a few new records or update existing ones

# Read existing data (if any changes were made directly to custom_data.csv outside the notebook)
existing_df = pd.read_csv('custom_data.csv')

# Generate a few new records for incremental extraction simulation
new_records_data = []
current_time = datetime.now() + timedelta(hours=1) # Simulate current time is later

# Add some new records (e.g., 3 new transactions)
for i in range(3):
    product_id = random.choice(list(products.keys()))
    product = products[product_id]
    
    new_records_data.append({
        'transaction_id': existing_df['transaction_id'].max() + 1 + i,
        'customer_id': random.randint(2000, 2999),
        'product_id': product_id,
        'product_name': product['name'],
        'category': product['category'],
        'quantity': random.randint(1, 3),
        'unit_price': round(product['base_price'] * random.uniform(0.9, 1.1), 2),
        'amount': round(product['base_price'] * random.uniform(0.9, 1.1) * random.randint(1, 3), 2),
        'transaction_date': current_time.strftime('%Y-%m-%d %H:%M:%S'),
        'last_updated': current_time.strftime('%Y-%m-%d %H:%M:%S'),
        'payment_method': random.choice(['Credit Card', 'Debit Card']),
        'region': random.choice(regions)
    })

new_df = pd.DataFrame(new_records_data)

# Append new data to the existing data and save to custom_data.csv
# This simulates new data arriving in the source system
updated_df = pd.concat([existing_df, new_df], ignore_index=True)
updated_df.to_csv('custom_data.csv', index=False)

print(f"Simulated adding {len(new_df)} new records to custom_data.csv")

# Extract only new or updated records since last_extracted_time
# Reload the dataset to get the latest changes including simulated new data
data_for_incremental = pd.read_csv('custom_data.csv')
data_for_incremental['last_updated'] = pd.to_datetime(data_for_incremental['last_updated'])

incremental_df = data_for_incremental[data_for_incremental['last_updated'] > last_extracted_time]

print(f"Extracted {incremental_df.shape[0]} rows incrementally since last check.")
print("Sample of incrementally extracted data:")
print(incremental_df.head())

# Section 3: Save New Timestamp

print("\n--- Section 3: Save New Timestamp ---")

# After successful incremental extraction, update the last_extraction.txt
# Use the current time as the new last extraction timestamp
new_last_extraction_time = datetime.now()
save_last_extraction_timestamp(new_last_extraction_time)

print(f"Updated last_extraction.txt with new timestamp: {new_last_extraction_time}")

print("\nETL process complete!")


--- Section 1: Full Extraction ---
Total rows: 100
Total columns: 12
Sample data (first 5 rows):
   transaction_id  customer_id product_id       product_name     category  \
0           10000         1114       F606   Screen Protector  Accessories   
1           10001         1758       F606   Screen Protector  Accessories   
2           10002         1238       B202  Bluetooth Speaker  Electronics   
3           10003         1603       D404         Phone Case  Accessories   
4           10004         1284       C303        Smart Watch  Electronics   

   quantity  unit_price  amount     transaction_date         last_updated  \
0         1       13.62   13.62  2025-05-01 18:12:35  2025-05-02 08:12:35   
1         3       11.92   35.76  2025-05-13 18:12:35  2025-05-13 20:12:35   
2         3       51.01  153.03  2025-05-21 18:12:35  2025-05-22 06:12:35   
3         2       21.23   42.46  2025-04-16 18:12:35  2025-04-18 18:12:35   
4         1      188.60  188.60  2025-06-03 18:12:35  

In [4]:
# Section 4: Transform Full Data

print("\n--- Section 4: Transform Full Data ---")

# Make a copy to avoid SettingWithCopyWarning
transformed_full_df = full_df.copy()

# Transformation 1: Handle missing values (Example: fill numerical NaNs with mean, categorical with 'Unknown')
# Numerical columns to fill with mean
numerical_cols = ['quantity', 'unit_price', 'amount']
for col in numerical_cols:
    if transformed_full_df[col].isnull().any():
        transformed_full_df[col].fillna(transformed_full_df[col].mean(), inplace=True)

# Categorical columns to fill with 'Unknown'
categorical_cols = ['product_name', 'category', 'payment_method', 'region']
for col in categorical_cols:
    if transformed_full_df[col].isnull().any():
        transformed_full_df[col].fillna('Unknown', inplace=True)

print("Transformation 1: Handled missing values.")

# Transformation 2: Enrichment - Add 'total_price' column
transformed_full_df['total_price'] = transformed_full_df['quantity'] * transformed_full_df['unit_price']
print("Transformation 2: Added 'total_price' column.")

# Transformation 3: Structural - Convert date columns to datetime objects
transformed_full_df['transaction_date'] = pd.to_datetime(transformed_full_df['transaction_date'])
transformed_full_df['last_updated'] = pd.to_datetime(transformed_full_df['last_updated'])
print("Transformation 3: Converted date columns to datetime objects.")

print("Sample of transformed full data (first 5 rows):")
print(transformed_full_df.head())

# Save transformed full data
transformed_full_df.to_csv('transformed_full.csv', index=False)
print(f"Saved transformed full data to 'transformed_full.csv' with {transformed_full_df.shape[0]} rows.")


# Section 5: Transform Incremental Data

print("\n--- Section 5: Transform Incremental Data ---")

# Make a copy to avoid SettingWithCopyWarning
transformed_incremental_df = incremental_df.copy()

# Apply the same transformations as Full Data to ensure consistency

# Transformation 1: Handle missing values
for col in numerical_cols: # Reusing numerical_cols from full transformation
    if transformed_incremental_df[col].isnull().any():
        # Here, we'll recompute for simplicity in a self-contained incremental transform
        transformed_incremental_df[col].fillna(transformed_incremental_df[col].mean(), inplace=True)

for col in categorical_cols: # Reusing categorical_cols from full transformation
    if transformed_incremental_df[col].isnull().any():
        transformed_incremental_df[col].fillna('Unknown', inplace=True)

print("Transformation 1: Handled missing values for incremental data.")

# Transformation 2: Enrichment - Add 'total_price' column
transformed_incremental_df['total_price'] = transformed_incremental_df['quantity'] * transformed_incremental_df['unit_price']
print("Transformation 2: Added 'total_price' column for incremental data.")

# Transformation 3: Structural - Convert date columns to datetime objects
# The 'last_updated' was already converted during incremental extraction, but re-confirm for 'transaction_date'
transformed_incremental_df['transaction_date'] = pd.to_datetime(transformed_incremental_df['transaction_date'])
transformed_incremental_df['last_updated'] = pd.to_datetime(transformed_incremental_df['last_updated'])
print("Transformation 3: Converted date columns to datetime objects for incremental data.")

print("Sample of transformed incremental data (first 5 rows):")
print(transformed_incremental_df.head())

# Save transformed incremental data
transformed_incremental_df.to_csv('transformed_incremental.csv', index=False)
print(f"Saved transformed incremental data to 'transformed_incremental.csv' with {transformed_incremental_df.shape[0]} rows.")

print("\nETL process complete!")


--- Section 4: Transform Full Data ---
Transformation 1: Handled missing values.
Transformation 2: Added 'total_price' column.
Transformation 3: Converted date columns to datetime objects.
Sample of transformed full data (first 5 rows):
   transaction_id  customer_id product_id       product_name     category  \
0           10000         1114       F606   Screen Protector  Accessories   
1           10001         1758       F606   Screen Protector  Accessories   
2           10002         1238       B202  Bluetooth Speaker  Electronics   
3           10003         1603       D404         Phone Case  Accessories   
4           10004         1284       C303        Smart Watch  Electronics   

   quantity  unit_price  amount    transaction_date        last_updated  \
0         1       13.62   13.62 2025-05-01 18:12:35 2025-05-02 08:12:35   
1         3       11.92   35.76 2025-05-13 18:12:35 2025-05-13 20:12:35   
2         3       51.01  153.03 2025-05-21 18:12:35 2025-05-22 06:12:35   