## ETL Extract Lab - DSA 2040A


## Project Setup
This notebook demonstrates:

➡️ Lab 3 Data Extraction
- Full dataset extraction
- Incremental extraction based on last run timestamp
- Proper ETL workflow practices

➡️ Lab 4 Transformation 

#### ➡️ Lab 3 Data Extraction

In [73]:
#Import Required Libraries
import pandas as pd
from datetime import datetime

#### 🟨 Section 1: Full Extraction

In [75]:
import pandas as pd

def wrangle_full(csv_file):
    """Perform full extraction: Load dataset, show stats, sample"""
    try:
        df = pd.read_csv(csv_file)
        
        # Basic stats
        print("✅ Full extraction completed successfully.")
        print(f"Extracted {len(df)} rows fully.")
        print(f"Number of columns: {df.shape[1]}")
        
        print("\n📊 Column Names:")
        print(df.columns.tolist())
        
        print("\n🔍 Sample Records:")
        print(df.head())

        return df
    
    except Exception as e:
        print("❌ Error during full extraction:", e)
        return None



In [76]:
# Usage
file_path = "custom_data.csv"
df_full = wrangle_full(file_path)

✅ Full extraction completed successfully.
Extracted 2003 rows fully.
Number of columns: 8

📊 Column Names:
['Transaction ID', 'Date', 'Customer ID', 'Gender', 'Age', 'Product Category', 'Quantity', 'Price per Unit']

🔍 Sample Records:
  Transaction ID        Date Customer ID  Gender Age Product Category  \
0              1  2023-11-24     CUST001    Male  34           Beauty   
1              2  2023-02-27     CUST002  Female  26         Clothing   
2              3  2023-01-13     CUST003    Male  50      Electronics   
3              4  2023-05-21     CUST004    Male  37         Clothing   
4              5  2023-05-06     CUST005    Male  30           Beauty   

  Quantity Price per Unit  
0        3             50  
1        2            500  
2        1             30  
3        1            500  
4        2             50  


In [77]:
print(df_full.head())

  Transaction ID        Date Customer ID  Gender Age Product Category  \
0              1  2023-11-24     CUST001    Male  34           Beauty   
1              2  2023-02-27     CUST002  Female  26         Clothing   
2              3  2023-01-13     CUST003    Male  50      Electronics   
3              4  2023-05-21     CUST004    Male  37         Clothing   
4              5  2023-05-06     CUST005    Male  30           Beauty   

  Quantity Price per Unit  
0        3             50  
1        2            500  
2        1             30  
3        1            500  
4        2             50  


In [78]:
print("📊 Dataset Info:")
print(df_full.info())

📊 Dataset Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2003 entries, 0 to 2002
Data columns (total 8 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   Transaction ID    2003 non-null   object
 1   Date              2001 non-null   object
 2   Customer ID       2001 non-null   object
 3   Gender            2001 non-null   object
 4   Age               2001 non-null   object
 5   Product Category  2001 non-null   object
 6   Quantity          2001 non-null   object
 7   Price per Unit    2001 non-null   object
dtypes: object(8)
memory usage: 125.3+ KB
None


In [79]:
# Ensure the 'Date' column is in datetime format
df_full['Date'] = pd.to_datetime(df_full['Date'], errors='coerce')

# Drop any rows where the date could not be parsed
df_full = df_full.dropna(subset=['Date'])

# Display the earliest and latest dates
start_date = df_full['Date'].min()
end_date = df_full['Date'].max()

print(f"📅 Earliest date in data: {start_date}")
print(f"📅 Latest date in data: {end_date}")

📅 Earliest date in data: 2023-01-01 00:00:00
📅 Latest date in data: 2024-01-01 00:00:00


In [80]:
# Sort by Date in ascending order
df_full = df_full.sort_values(by='Date', ascending=True).reset_index(drop=True)
df_full.tail(10)

Unnamed: 0,Transaction ID,Date,Customer ID,Gender,Age,Product Category,Quantity,Price per Unit
1990,520,2023-12-29,CUST520,Female,49,Electronics,4,25
1991,520,2023-12-29,CUST520,Female,49,Electronics,4,25
1992,233,2023-12-29,CUST233,Female,51,Beauty,2,300
1993,233,2023-12-29,CUST233,Female,51,Beauty,2,300
1994,857,2023-12-31,CUST857,Male,60,Electronics,2,25
1995,857,2023-12-31,CUST857,Male,60,Electronics,2,25
1996,650,2024-01-01,CUST650,Male,55,Electronics,1,30
1997,211,2024-01-01,CUST211,Male,42,Beauty,3,500
1998,650,2024-01-01,CUST650,Male,55,Electronics,1,30
1999,211,2024-01-01,CUST211,Male,42,Beauty,3,500


###  🟨 Section 2: Incremental Extraction

🕒 Incremental Extraction using Timestamp

In this ETL pipeline, incremental extraction is based on a saved timestamp stored in the `last_extraction.txt` file. 

For this lab, I manually set the timestamp to:

In [81]:
def read_last_extraction_time(file_path='last_extraction.txt'):
    try:
        with open(file_path, 'r') as f:
            return datetime.strptime(f.read().strip(), '%Y-%m-%d %H:%M:%S')
    except (FileNotFoundError, ValueError):
        return datetime.min

def wrangle_incremental(csv_file, timestamp_file='last_extraction.txt'):
    """Extract only new or updated records based on last extraction time"""
    try:
        df = pd.read_csv(csv_file)

        # Try parsing the Date column safely
        df['Date'] = pd.to_datetime(df['Date'], errors='coerce', format='%Y-%m-%d')

        # Drop rows where date couldn't be parsed
        df = df.dropna(subset=['Date'])

        # Ensure dates are sorted
        df = df.sort_values('Date')

        last_time = read_last_extraction_time(timestamp_file)
        new_data = df[df['Date'] > last_time]

        print(f"✅ Extracted {len(new_data)} rows incrementally since last check ({last_time}).")
        return new_data
    except Exception as e:
        print("❌ Error during incremental extraction:", e)
        return pd.DataFrame()


### 🟩 Section 3: Save New Timestamp

In [82]:
def update_extraction_time(file_path='last_extraction.txt', timestamp=None):
    """Updates the last_extraction.txt file with current or custom timestamp"""
    try:
        if timestamp is None:
            timestamp = datetime.now()
        with open(file_path, 'w') as f:
            f.write(timestamp.strftime('%Y-%m-%d %H:%M:%S'))
        print(f"✅ Updated last extraction time to: {timestamp}")
    except Exception as e:
        print("❌ Error writing to timestamp file:", e)


#### ✅ Example Incremental Extraction

In [58]:
# Extract new data since last time
new_data = wrangle_incremental("custom_data.csv")

# Update the timestamp only if we found new data
if not new_data.empty:
    update_extraction_time()


✅ Extracted 0 rows incrementally since last check (2025-06-30 20:18:52).


#### Section 4: Checking Data Quality

In [96]:
import pandas as pd

def check_data_quality(csv_file):
    df = pd.read_csv(csv_file)
    df.columns = df.columns.str.strip()  # Remove extra spaces in column names

    print("📊 Data Quality Check Report\n")

    # 1. Transaction ID
    print("🔹 Transaction ID")
    print(f" - Missing: {df['Transaction ID'].isna().sum()}")
    print(f" - Duplicate IDs: {df['Transaction ID'].duplicated().sum()}")
    print(f" - Unique IDs: {df['Transaction ID'].nunique()} of {len(df)} rows\n")

    # 2. Date
    print("🔹 Date")
    print(f" - Missing: {df['Date'].isna().sum()}")
    try:
        df['Date'] = pd.to_datetime(df['Date'], errors='raise')
        print(" - All dates parsed successfully")
        print(f" - Date Range: {df['Date'].min()} to {df['Date'].max()}\n")
    except Exception as e:
        print(" - Date parsing error ❌:", e, "\n")

    # 3. Customer ID
    print("🔹 Customer ID")
    print(f" - Missing: {df['Customer ID'].isna().sum()}")
    print(f" - Unique IDs: {df['Customer ID'].nunique()}")
    if df['Customer ID'].str.contains(' ').any():
        print(" - Warning: Some IDs contain extra spaces")
    print()

    # 4. Gender
    print("🔹 Gender")
    print(f" - Missing: {df['Gender'].isna().sum()}")
    print(f" - Unique Values: {df['Gender'].unique()}")
    valid_genders = {'M', 'F', 'Male', 'Female'}
    if not set(df['Gender'].unique()).issubset(valid_genders):
        print(" - Warning: Unexpected gender labels found")
    print()

    # 5. Age
    print("🔹 Age")
    print(f" - Missing: {df['Age'].isna().sum()}")
    try:
        df['Age'] = pd.to_numeric(df['Age'], errors='coerce')
        print(f" - Min Age: {df['Age'].min()}, Max Age: {df['Age'].max()}")
        if df['Age'].min() < 0 or df['Age'].max() > 120:
            print(" - Warning: Age outliers detected")
    except:
        print(" - Age conversion error ❌")
    print()

    # 6. Product Category
    print("🔹 Product Category")
    print(f" - Missing: {df['Product Category'].isna().sum()}")
    print(f" - Unique Categories: {df['Product Category'].unique()}")
    print()

    # 7. Quantity
    print("🔹 Quantity")
    try:
        df['Quantity'] = pd.to_numeric(df['Quantity'], errors='coerce')
        print(f" - Missing: {df['Quantity'].isna().sum()}")
        print(f" - Min: {df['Quantity'].min()}, Max: {df['Quantity'].max()}")
        if (df['Quantity'] < 0).any():
            print(" - Warning: Negative quantities found")
    except:
        print(" - Quantity conversion error ❌")
    print()

    # 8. Price per Unit
    print("🔹 Price per Unit")
    try:
        df['Price per Unit'] = pd.to_numeric(df['Price per Unit'], errors='coerce')
        print(f" - Missing: {df['Price per Unit'].isna().sum()}")
        print(f" - Min: {df['Price per Unit'].min()}, Max: {df['Price per Unit'].max()}")
        if (df['Price per Unit'] < 0).any():
            print(" - Warning: Negative prices found")
    except:
        print(" - Price per Unit conversion error ❌")
    print()

    # 9. Total Amount
    print("🔹 Total Amount")
    if 'Total Amount' in df.columns:
        try:
            df['Total Amount'] = pd.to_numeric(df['Total Amount'], errors='coerce')
            incorrect_total = (df['Total Amount'] != df['Quantity'] * df['Price per Unit']).sum()
            print(f" - Missing: {df['Total Amount'].isna().sum()}")
            print(f" - Inconsistent Total Amounts: {incorrect_total}")
        except:
            print(" - Total Amount conversion or consistency check failed ❌")
    else:
        print(" - Total Amount column not found")
    print("\n✅ Data quality check completed.\n")

    return df


In [98]:
df_data_check = check_data_quality("custom_data.csv")


📊 Data Quality Check Report

🔹 Transaction ID
 - Missing: 0
 - Duplicate IDs: 1000
 - Unique IDs: 1003 of 2003 rows

🔹 Date
 - Missing: 2
 - Date parsing error ❌: time data "Date" doesn't match format "%Y-%m-%d", at position 1001. You might want to try:
    - passing `format` if your strings have a consistent format;
    - passing `format='ISO8601'` if your strings are all ISO8601 but not necessarily in exactly the same format;
    - passing `format='mixed'`, and the format will be inferred for each element individually. You might want to use `dayfirst` alongside this. 

🔹 Customer ID
 - Missing: 2
 - Unique IDs: 1001

🔹 Gender
 - Missing: 2
 - Unique Values: ['Male' 'Female' nan 'Gender']

🔹 Age
 - Missing: 2
 - Min Age: 18.0, Max Age: 64.0

🔹 Product Category
 - Missing: 2
 - Unique Categories: ['Beauty' 'Clothing' 'Electronics' nan 'Product Category']

🔹 Quantity
 - Missing: 3
 - Min: 1.0, Max: 4.0

🔹 Price per Unit
 - Missing: 3
 - Min: 25.0, Max: 500.0

🔹 Total Amount
 - Total Amo

### ➡️Lab 4 Transformation Checklist

##### Transform Full Data

In [99]:
def transform_data(df):
    # Cleaning
    df = df.drop_duplicates()
    
    # Transaction ID
    # Drop rows with missing Transaction ID
    df = df.dropna(subset=['Transaction ID'])
    # Convert to integer (if not already)
    df['Transaction ID'] = df['Transaction ID'].astype(int)
    # Remove duplicate Transaction IDs (keep first)
    df = df.drop_duplicates(subset='Transaction ID')
    # Optionally reassign Transaction ID to be sequential
    df = df.sort_values('Transaction ID').reset_index(drop=True)
    df['Transaction ID'] = range(1, len(df) + 1)
    
    # Customer ID
    # Remove leading/trailing spaces
    df['Customer ID'] = df['Customer ID'].astype(str).str.strip()
    # Convert to uppercase (in case some are lowercase)
    df['Customer ID'] = df['Customer ID'].str.upper()
    # Fill missing values with placeholder (optional)
    df['Customer ID'].fillna('UNKNOWN')
    # Optional: Remove rows not matching the expected pattern (e.g., CUST123)
    df = df[df['Customer ID'].str.match(r'^CUST\d+$')]
    
    
    # Quantity
    # Fill missing values (choose median or zero based on business logic)
    df['Quantity'] = pd.to_numeric(df['Quantity'], errors='coerce')  # Ensure numeric
    df['Quantity'].fillna(0)
    # Convert to integer (after filling)
    df['Quantity'] = df['Quantity'].astype(int)
    # Remove rows with zero or negative quantities (optional)
    df = df[df['Quantity'] > 0]

    
    # Price per Unit
    df['Price per Unit'] = df['Price per Unit'].fillna(0)
    # Convert to numeric
    df['Price per Unit'] = pd.to_numeric(df['Price per Unit'], errors='coerce')
    # Fill missing values with median
    median_price = df['Price per Unit'].median()
    df['Price per Unit'].fillna(median_price)
    # Remove non-positive prices (optional, depending on logic)
    df = df[df['Price per Unit'] > 0]
    
    
    # Date
    # --- Date ---
    df['Date'] = pd.to_datetime(df['Date'], errors='coerce', format='mixed')
    df = df.dropna(subset=['Date'])
    df = df.sort_values('Date').reset_index(drop=True)
    # Gender
    # Convert to lowercase and strip whitespace
    df['Gender'] = df['Gender'].astype(str).str.strip().str.lower()

    # Map common variations to standard form
    gender_map = {
        'm': 'Male',
        'male': 'Male',
        'f': 'Female',
        'female': 'Female'
    }
    df['Gender'] = df['Gender'].map(gender_map)
    # Drop rows with unexpected/invalid genders
    df = df[df['Gender'].isin(['Male', 'Female'])]
    
    #Age
    # Coerce errors during conversion (e.g., 'N/A' -> NaN)
    df['Age'] = pd.to_numeric(df['Age'], errors='coerce')
    # Fill missing ages with the median (or mean)
    median_age = df['Age'].median()
    df['Age'].fillna(median_age)
    # Remove unrealistic ages
    df = df[(df['Age'] >= 10) & (df['Age'] <= 100)]
    # Convert to integer
    df['Age'] = df['Age'].astype(int)
    
    # Product Category
    df['Product Category'] = df['Product Category'].astype(str).str.strip().str.title()
    
    df['Total Amount'] = df['Quantity'] * df['Price per Unit']
    # Save the result
    return df


#### 🔸1. Apply the Transformation to Full Data

In [100]:
# Step 2: Apply the transformation
transformed_full = transform_data(df_full)

# Step 3: Save transformed data
transformed_full.to_csv("transformed_full.csv", index=False)
print("✅ Transformed full data saved to transformed_full.csv")

✅ Transformed full data saved to transformed_full.csv


#### 🔸 2. Apply the Transformation to Incremental Data

In [None]:
# Step 1: Read incremental extracted data
df_incremental = pd.read_csv("incremental_data.csv")  # or your extracted DataFrame

# Step 2: Apply transformation
transformed_incremental = transform_data(df_incremental)

# Step 3: Save transformed incremental data
transformed_incremental.to_csv("transformed_incremental.csv", index=False)
print("✅ Transformed incremental data saved to transformed_incremental.csv")

##### Understanding the Data 

##### Order the data by Date

In [None]:
# Convert 'Date' column to datetime
df_full['Date'] = pd.to_datetime(df['Date'], errors='coerce')
        
# Drop rows with invalid or missing dates
df_full = df.dropna(subset=['Date'])
        
        # Sort the DataFrame by 'Date'
df_full = df_full.sort_values('Date').reset_index(drop=True)
        

Unnamed: 0,Transaction ID,Date,Customer ID,Gender,Age,Product Category,Quantity,Price per Unit,Total Amount
0,522,2023-01-01,CUST522,Male,46,Beauty,3,500,1500
1,559,2023-01-01,CUST559,Female,40,Clothing,4,300,1200
2,522,2023-01-01,CUST522,Male,46,Beauty,3,500,1500
3,559,2023-01-01,CUST559,Female,40,Clothing,4,300,1200
4,180,2023-01-01,CUST180,Male,41,Clothing,3,300,900


##### Customer ID

In [None]:
print(df_full['Customer ID'].unique()[:10])
df_full['Customer ID'].apply(lambda x: x != x.strip()).value_counts()
print("Missing Customer IDs:", df_full['Customer ID'].isnull().sum())
df_full[~df_full['Customer ID'].str.match(r'^CUST\d+$')]




['CUST522' 'CUST559' 'CUST180' 'CUST421' 'CUST163' 'CUST303' 'CUST979'
 'CUST610' 'CUST231' 'CUST032']
Missing Customer IDs: 0


Unnamed: 0,Transaction ID,Date,Customer ID,Gender,Age,Product Category,Quantity,Price per Unit,Total Amount


In [61]:
df_full.columns

Index(['Transaction ID', 'Date', 'Customer ID', 'Gender', 'Age',
       'Product Category', 'Quantity', 'Price per Unit', 'Total Amount'],
      dtype='object')

#### 'Product Category'

In [69]:
print("Unique values in 'Product Category':")
print(df_full['Product Category'].dropna().unique())
print("\nNumber of missing values:", df_full['Product Category'].isna().sum())

Unique values in 'Product Category':
['Beauty' 'Clothing' 'Electronics']

Number of missing values: 0


#### Age

In [71]:
print("Unique Age values:", df_full['Age'].unique())
print("Any missing values?", df_full['Age'].isna().sum())
print("Minimum Age:", df_full['Age'].min())
print("Maximum Age:", df_full['Age'].max())


Unique Age values: ['46' '40' '41' '37' '64' '19' '26' '23' '30' '38' '57' '60' '58' '54'
 '51' '45' '63' '31' '27' '33' '18' '50' '53' '43' '24' '34' '42' '59'
 '49' '39' '36' '22' '56' '29' '28' '20' '47' '32' '25' '35' '21' '52'
 '48' '62' '61' '55' '44']
Any missing values? 0
Minimum Age: 18
Maximum Age: 64
