# EXTRACT - TRANSFORM - LOAD(ETL) PIPELINE
### Simulating our own banking data
To practice the ETL process, we will simulate our own banking data. This data will include users id, different events like deposits, withdrawals, and transfers, and the amounts associated with these events, and the timestamps of these events.

In [107]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random

# Set seed for reproducibility
random.seed(42)
np.random.seed(42)

# Define parameters for the synthetic banking data
events = ['deposit', 'withdraw', 'loan_application', 'atm_withdrawal', 'login']
users = ['user_'+str(i) for i in range(1, 50)]
amounts = [round(random.uniform(1000, 100000), 2) for _ in range(50)]
start_date = datetime(2025, 5, 1)
end_date = datetime(2025, 5, 31)
delta = (end_date - start_date).days + 1# including end date

# Generating the synthetic banking data
data = []
for i in range(delta):
    date = start_date + timedelta(days=i)
    for _ in range(random.randint(3, 6)):# 3–6 events per day
        event_choice = random.choice(events)
        amount = random.choice(amounts) if event_choice in ['deposit', 'withdraw', 'atm_withdrawal', 'loan_application'] else None
        record = {
            'user_id': random.choice(users),
            'event': event_choice,
            'amount': amount,
            'timestamp': (date + timedelta(
                hours=random.randint(0, 23),
                minutes=random.randint(0, 59)
            )).isoformat()
        }
        # Loan approval status
        if event_choice == 'loan_application':
            record['loan_status'] = random.choice(['approved', 'rejected'])
        else:
            record['loan_status'] = None
        data.append(record)

# Create a DataFrame from the generated data
banking_data = pd.DataFrame(data)

# add some duplicate entries
duplicates = banking_data.sample(frac=0.05, random_state=42)  # 5% of the data as duplicates
banking_data = pd.concat([banking_data, duplicates], ignore_index=True)

# Shuffle the data and save to CSV
banking_data = banking_data.sample(frac=1, random_state=42).reset_index(drop=True)  

banking_data.to_csv('banking_data.csv', index=False)
banking_data.head()

Unnamed: 0,user_id,event,amount,timestamp,loan_status
0,user_49,login,,2025-05-04T08:49:00,
1,user_1,login,,2025-05-20T09:18:00,
2,user_5,atm_withdrawal,86308.98,2025-05-03T12:24:00,
3,user_30,login,,2025-05-23T01:35:00,
4,user_31,withdraw,28519.39,2025-05-24T14:16:00,


## **1. Extraction**
* Extraction is the process of obtaining data from various sources and changing it to a destination designed to support analysis.
* There are two main types of extraction:
  - **Incremental Extraction**: Only new or changed records are extracted since the last ETL run.
  - **Full Extraction**: All records are extracted every time the ETL process runs.

### ***a. Full Extraction***
A Full Extraction means that every time you run the ETL process, you extract all records from the data source, regardless of whether they have changed or not.

In [108]:
# loading the banking data
full_extraction = pd.read_csv("banking_data.csv", parse_dates=["timestamp"])
print(f"Pulled {len(banking_data)} rows via full extraction.")
full_extraction.head()

Pulled 154 rows via full extraction.


Unnamed: 0,user_id,event,amount,timestamp,loan_status
0,user_49,login,,2025-05-04 08:49:00,
1,user_1,login,,2025-05-20 09:18:00,
2,user_5,atm_withdrawal,86308.98,2025-05-03 12:24:00,
3,user_30,login,,2025-05-23 01:35:00,
4,user_31,withdraw,28519.39,2025-05-24 14:16:00,


###  ***b. Incremental Extraction***
An Incremental Extraction means that you only extract records that have changed since the last extraction. This is often done by using timestamps or change data capture techniques to identify new or modified records.

In [109]:
# Create a tracking file with a last extraction timestamp
with open("last_extraction.txt", "w") as file:
    file.write("2025-05-15 12:00:00") # Initial checkpoint is from the middle of the month

# Read the last extraction timestamp
with open("last_extraction.txt", "r") as file:
    last_extraction = file.read().strip()

# Load the banking dataset
banking_data = pd.read_csv("banking_data.csv", parse_dates=["timestamp"])

# Filter only new or updated records
last_extraction_time = pd.to_datetime(last_extraction)
inc_ext = banking_data[banking_data['timestamp'] > last_extraction_time]

# Display results
print(f"Extracted {len(inc_ext)} rows incrementally since {last_extraction}.")

# Update the checkpoint
new_checkpoint = banking_data['timestamp'].max().isoformat()
with open("last_extraction.txt", "w") as f:
    f.write(new_checkpoint)
print(f"Updated last extraction timestamp to {new_checkpoint}.")
inc_ext.head()

Extracted 83 rows incrementally since 2025-05-15 12:00:00.
Updated last extraction timestamp to 2025-05-31T14:27:00.


Unnamed: 0,user_id,event,amount,timestamp,loan_status
1,user_1,login,,2025-05-20 09:18:00,
3,user_30,login,,2025-05-23 01:35:00,
4,user_31,withdraw,28519.39,2025-05-24 14:16:00,
10,user_38,loan_application,51030.17,2025-05-18 08:02:00,approved
11,user_10,atm_withdrawal,24046.3,2025-05-31 13:11:00,


## **2. Transformation**
* Transformation is the process of converting data from its original format into a format that is suitable for analysis.
* This can include cleaning, aggregating, enriching the data, structuring, filtering, categorizing, and more.
* We will perform some of the common transformation tasks on both the full and incremental extraction data.


### I. Transformation on Full Extraction Data

#### ***a. Data Cleaning***
This process involves removing or correcting data this is incorrect, incomplete, or irrelevant. In this case, we will  check for missing values in the dataset and remove any rows that contain them. We will also check for duplicate rows and remove them if they exist.


In [110]:
# Checking for missing values
missing_values = full_extraction.isnull().sum()
print(f"Missing values in each column:\n{missing_values}")

# filling missing values with null
full_extraction.fillna(value='null', inplace=True)

# Checking for duplicate rows
duplicate_rows = full_extraction.duplicated().sum()
print(f"Number of duplicate rows: {duplicate_rows}")

# Remove duplicates
full_ext_clean = full_extraction.drop_duplicates()
print(f"\nNumber of rows before removing duplicates: {len(full_extraction)}")
print(f"Number of rows after removing duplicates: {len(full_ext_clean)}")
full_ext_clean.head()

Missing values in each column:
user_id          0
event            0
amount          40
timestamp        0
loan_status    126
dtype: int64
Number of duplicate rows: 7

Number of rows before removing duplicates: 154
Number of rows after removing duplicates: 147


  full_extraction.fillna(value='null', inplace=True)


Unnamed: 0,user_id,event,amount,timestamp,loan_status
0,user_49,login,,2025-05-04 08:49:00,
1,user_1,login,,2025-05-20 09:18:00,
2,user_5,atm_withdrawal,86308.98,2025-05-03 12:24:00,
3,user_30,login,,2025-05-23 01:35:00,
4,user_31,withdraw,28519.39,2025-05-24 14:16:00,


### ***b. Enriching the data***
Enriching the data involves adding additional information to the dataset to make it more useful. In this case, we will add a new column, `hour`, to the dataset that contains the hour of the day when the transaction was made. This will allow us to analyze the data by hour and see if there are any patterns in the data.

In [111]:
# copy of the datafram
full_ext_clean = full_ext_clean.copy()

# Convert 'timestamp' to datetime and extract hour
full_ext_clean.loc[:, 'timestamp'] = pd.to_datetime(full_ext_clean['timestamp'], errors='coerce')
full_ext_clean.loc[:, 'hour'] = full_ext_clean['timestamp'].dt.hour
full_ext_clean.head()

Unnamed: 0,user_id,event,amount,timestamp,loan_status,hour
0,user_49,login,,2025-05-04 08:49:00,,8
1,user_1,login,,2025-05-20 09:18:00,,9
2,user_5,atm_withdrawal,86308.98,2025-05-03 12:24:00,,12
3,user_30,login,,2025-05-23 01:35:00,,1
4,user_31,withdraw,28519.39,2025-05-24 14:16:00,,14


### ***3. Categorizating the data***
Categorizing the data invloves converting the data into categories or groups to make it easier to analyze. In this case, we will categorize the hour of the day into three categories: morning, afternoon, and evening. This will allow us to analyze the data by time of day and see if there are any patterns in the data.

In [112]:
# Categorization: Define time of day
def categorize_time(hour):
    if 5 <= hour < 12:
        return 'morning'
    elif 12 <= hour < 17:
        return 'afternoon'
    elif 17 <= hour < 21:
        return 'evening'
    else:
        return 'night'

# Create the 'day_period' column
full_ext_clean['day_period'] = full_ext_clean['hour'].apply(categorize_time)

# Convert full_ext_clean to csv
full_ext_clean.to_csv('Transformed_full_ext.csv', index=False)
full_ext_clean.head()

Unnamed: 0,user_id,event,amount,timestamp,loan_status,hour,day_period
0,user_49,login,,2025-05-04 08:49:00,,8,morning
1,user_1,login,,2025-05-20 09:18:00,,9,morning
2,user_5,atm_withdrawal,86308.98,2025-05-03 12:24:00,,12,afternoon
3,user_30,login,,2025-05-23 01:35:00,,1,night
4,user_31,withdraw,28519.39,2025-05-24 14:16:00,,14,afternoon


### II. Transformation on Incremental Extraction Data
#### ***a. Data Cleaning***
This process involves removing or correcting data that is incorrect, incomplete, or irrelevant. In this case, we will check for missing values in the dataset and remove any rows that contain them. We will also check for duplicate rows and remove them if they exist.

In [113]:
# Checking for missing values
missing_values = inc_ext.isnull().sum()
print(f"Missing values in each column:\n{missing_values}")

# filling missing values with null
inc_ext.fillna(value='null', inplace=True)

# Checking for duplicate rows
duplicate_rows = inc_ext.duplicated().sum()
print(f"Number of duplicate rows: {duplicate_rows}")

# Remove duplicates
inc_ext_clean = inc_ext.drop_duplicates()
print(f"\nNumber of rows before removing duplicates: {len(inc_ext)}")
print(f"Number of rows after removing duplicates: {len(inc_ext_clean)}")
inc_ext_clean.head()

Missing values in each column:
user_id         0
event           0
amount         14
timestamp       0
loan_status    64
dtype: int64
Number of duplicate rows: 4

Number of rows before removing duplicates: 83
Number of rows after removing duplicates: 79


  inc_ext.fillna(value='null', inplace=True)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  inc_ext.fillna(value='null', inplace=True)


Unnamed: 0,user_id,event,amount,timestamp,loan_status
1,user_1,login,,2025-05-20 09:18:00,
3,user_30,login,,2025-05-23 01:35:00,
4,user_31,withdraw,28519.39,2025-05-24 14:16:00,
10,user_38,loan_application,51030.17,2025-05-18 08:02:00,approved
11,user_10,atm_withdrawal,24046.3,2025-05-31 13:11:00,


#### ***b. Aggregating***
This process involves combining data from multiple rows into a single row. In this case, we will aggregate the data by user id and transaction type to get the total amount of deposits, withdrawals, and transfers for each user. This will allow us to see how much each user has deposited, withdrawn, and transferred in total.


In [114]:
# aggregate the data by user_id and event
agg_data = inc_ext_clean.groupby(['user_id', 'event']).agg({
    'amount': 'sum',
    'timestamp': 'count',
    'loan_status': lambda x: ', '.join(x.dropna().unique())
}).reset_index()
agg_data.rename(columns={'timestamp': 'event_count'}, inplace=True)
# Display the aggregated data
agg_data.head()

# Save the aggregated data to a CSV file
agg_data.to_csv('Transformed_Incremental_ext.csv', index=False)