In [2]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random

## Simulating to get the data

In [3]:


np.random.seed(42)
random.seed(42)  
num_records = 100

# Generate signup dates
signup_dates = pd.to_datetime('2022-01-01') + pd.to_timedelta(np.random.randint(0, 365, size=num_records), unit='D')

# Create user data DataFrame
user_data = pd.DataFrame({
    'user_id': np.arange(1, num_records + 1),
    'age': np.random.randint(18, 60, size=num_records),
    'signup_date': signup_dates,
    'country': np.random.choice(['Kenya', 'Cameroon', 'USA', 'UK'], size=num_records),
    'is_active': np.random.choice([True, False], size=num_records),
})

# Generate last_updated timestamps
user_data['last_updated'] = [
    (signup + timedelta(hours=random.randint(0, 23), minutes=random.randint(0, 59))).isoformat()
    for signup in signup_dates
]

print("Extracted User data (first 5 rows):")
print(user_data.head())


Extracted User data (first 5 rows):
   user_id  age signup_date   country  is_active         last_updated
0        1   25  2022-04-13  Cameroon      False  2022-04-13T20:07:00
1        2   41  2022-12-15       USA      False  2022-12-15T00:47:00
2        3   28  2022-09-28     Kenya      False  2022-09-28T08:15:00
3        4   34  2022-04-17        UK       True  2022-04-17T07:08:00
4        5   25  2022-03-13     Kenya      False  2022-03-13T23:06:00


In [4]:
user_data.shape

(100, 6)

In [5]:
user_data.to_csv('user_data_large.csv', index=False)



## Performing full extraction

In [6]:
df_full = pd.read_csv("user_data_large.csv", parse_dates=["last_updated"])
print(f"Pulled {len(df_full)} rows via full extraction.")
df_full.head()

Pulled 100 rows via full extraction.


Unnamed: 0,user_id,age,signup_date,country,is_active,last_updated
0,1,25,2022-04-13,Cameroon,False,2022-04-13 20:07:00
1,2,41,2022-12-15,USA,False,2022-12-15 00:47:00
2,3,28,2022-09-28,Kenya,False,2022-09-28 08:15:00
3,4,34,2022-04-17,UK,True,2022-04-17 07:08:00
4,5,25,2022-03-13,Kenya,False,2022-03-13 23:06:00


In [7]:
# Find the minimum and maximum of 'last_updated'
min_time = df_full['last_updated'].min()
max_time = df_full['last_updated'].max()

# Set initial checkpoint to halfway between min and max
initial_checkpoint = min_time + (max_time - min_time) / 2

# Save it to a file
with open("last_extraction.txt", "w") as f:
    f.write(initial_checkpoint.isoformat())

print(f"Initial extraction checkpoint set to: {initial_checkpoint}")


Initial extraction checkpoint set to: 2022-07-02 21:47:00


## Performing incremental Extraction

In [8]:
# INCREMENTAL EXTRACTION
with open("last_extraction.txt", "r") as f:
    last_extraction = f.read().strip()
df = pd.read_csv("user_data_large.csv", parse_dates=["last_updated"])
last_extraction_time = pd.to_datetime(last_extraction)
df_incremental = df[df['last_updated'] > last_extraction_time]
print(f"Pulled {len(df_incremental)} new/updated rows since {last_extraction}.")
df_incremental.head()

Pulled 58 new/updated rows since 2022-07-02T21:47:00.


Unnamed: 0,user_id,age,signup_date,country,is_active,last_updated
1,2,41,2022-12-15,USA,False,2022-12-15 00:47:00
2,3,28,2022-09-28,Kenya,False,2022-09-28 08:15:00
5,6,52,2022-07-08,Kenya,False,2022-07-08 21:47:00
9,10,59,2022-08-03,Kenya,False,2022-08-03 02:13:00
10,11,56,2022-11-27,UK,False,2022-11-27 07:32:00


In [9]:
# Get the most recent update
new_checkpoint = df['last_updated'].max()
# Save it
with open("last_extraction.txt", "w") as f:
    f.write(new_checkpoint.isoformat())
print(f"Updated last_extraction.txt to {new_checkpoint}")

Updated last_extraction.txt to 2022-12-30 21:14:00


## ETL Process: Transformation phase

#### FULL TRANSTRANFORMATION

In [11]:
df_full.head()

Unnamed: 0,user_id,age,signup_date,country,is_active,last_updated
0,1,25,2022-04-13,Cameroon,False,2022-04-13 20:07:00
1,2,41,2022-12-15,USA,False,2022-12-15 00:47:00
2,3,28,2022-09-28,Kenya,False,2022-09-28 08:15:00
3,4,34,2022-04-17,UK,True,2022-04-17 07:08:00
4,5,25,2022-03-13,Kenya,False,2022-03-13 23:06:00


Transformation 1: DATA CLEANING

In [None]:
#remove duplicates based on 'user_id', keeping the last occurrence
df_full.drop_duplicates(subset=['user_id'], keep='last', inplace=True)

In [21]:
#checking for null values in df_full
print("Checking for null values in df_full:")
print(df_full.isnull().sum())

Checking for null values in df_full:
user_id         0
age             0
signup_date     0
country         0
is_active       0
last_updated    0
age_group       0
dtype: int64


In [22]:
print("No duplicate user_ids found in df_full.")
print("No null values found in df_full.")

No duplicate user_ids found in df_full.
No null values found in df_full.


Transformation 2: Categorization into different age bins

In [15]:
df_full['age_group'] = pd.cut(df_full['age'],
                              bins=[17, 25, 35, 50, 60],
                              labels=['18-25', '26-35', '36-50', '51-60'])
df_full.head()

Unnamed: 0,user_id,age,signup_date,country,is_active,last_updated,age_group
0,1,25,2022-04-13,Cameroon,False,2022-04-13 20:07:00,18-25
1,2,41,2022-12-15,USA,False,2022-12-15 00:47:00,36-50
2,3,28,2022-09-28,Kenya,False,2022-09-28 08:15:00,26-35
3,4,34,2022-04-17,UK,True,2022-04-17 07:08:00,26-35
4,5,25,2022-03-13,Kenya,False,2022-03-13 23:06:00,18-25


Transformation 3: Structural e.g changing data types and standardizing date format

In [16]:
#checking for data types in df_full
print("Data types in df_full:")
print(df_full.dtypes)


Data types in df_full:
user_id                  int64
age                      int64
signup_date             object
country                 object
is_active                 bool
last_updated    datetime64[ns]
age_group             category
dtype: object


In [18]:
#change user_id to string
df_full['user_id'] = df_full['user_id'].astype(str)

#change signup_date and last_updated to datetime
df_full['signup_date'] = pd.to_datetime(df_full['signup_date']).dt.strftime('%Y-%m-%d %H:%M:%S')
df_full['last_updated'] = pd.to_datetime(df_full['last_updated']).dt.strftime('%Y-%m-%d %H:%M:%S')

In [19]:
print(df_full.dtypes)

user_id           object
age                int64
signup_date       object
country           object
is_active           bool
last_updated      object
age_group       category
dtype: object


Saving the new transformed full data set to a csv file called "transformed_full.csv"

In [24]:
# save the final Dataframe to a new csv file called 'transformed_full.csv
df_full.to_csv('transformed_full.csv', index=False)

## INCREMENTAL TRANSFORMATION

In [23]:
df_incremental.head()

Unnamed: 0,user_id,age,signup_date,country,is_active,last_updated
1,2,41,2022-12-15,USA,False,2022-12-15 00:47:00
2,3,28,2022-09-28,Kenya,False,2022-09-28 08:15:00
5,6,52,2022-07-08,Kenya,False,2022-07-08 21:47:00
9,10,59,2022-08-03,Kenya,False,2022-08-03 02:13:00
10,11,56,2022-11-27,UK,False,2022-11-27 07:32:00


Transformation 1: DATA CLEANING

In [27]:
#remove duplicates based on 'user_id'
df_incremental=df_incremental.drop_duplicates()



In [28]:
#checking for null values in df_incremental
print("Checking for null values in df_incremental:")
print(df_incremental.isnull().sum())

Checking for null values in df_incremental:
user_id         0
age             0
signup_date     0
country         0
is_active       0
last_updated    0
dtype: int64


In [29]:
print("No duplicate user_ids found in df_incremental.")
print("No null values found in df_incremental.")

No duplicate user_ids found in df_incremental.
No null values found in df_incremental.


Transformation 2: Categorization into different age bins

In [30]:
df_incremental['age_group'] = pd.cut(df_incremental['age'],
                              bins=[17, 25, 35, 50, 60],
                              labels=['18-25', '26-35', '36-50', '51-60'])
df_incremental.head()

Unnamed: 0,user_id,age,signup_date,country,is_active,last_updated,age_group
1,2,41,2022-12-15,USA,False,2022-12-15 00:47:00,36-50
2,3,28,2022-09-28,Kenya,False,2022-09-28 08:15:00,26-35
5,6,52,2022-07-08,Kenya,False,2022-07-08 21:47:00,51-60
9,10,59,2022-08-03,Kenya,False,2022-08-03 02:13:00,51-60
10,11,56,2022-11-27,UK,False,2022-11-27 07:32:00,51-60


Transformation 3: Structural e.g changing data types and standardizing date format

In [32]:
#checking for data types in df_incremental
print("Data types in df_incremental:")
print(df_incremental.dtypes)


Data types in df_incremental:
user_id                  int64
age                      int64
signup_date             object
country                 object
is_active                 bool
last_updated    datetime64[ns]
age_group             category
dtype: object


In [33]:
#change user_id to string
df_incremental['user_id'] = df_incremental['user_id'].astype(str)

#change signup_date and last_updated to datetime
df_incremental['signup_date'] = pd.to_datetime(df_incremental['signup_date']).dt.strftime('%Y-%m-%d %H:%M:%S')
df_incremental['last_updated'] = pd.to_datetime(df_incremental['last_updated']).dt.strftime('%Y-%m-%d %H:%M:%S')

In [34]:
print(df_incremental.dtypes)

user_id           object
age                int64
signup_date       object
country           object
is_active           bool
last_updated      object
age_group       category
dtype: object


Saving the new transformed incremental data set to a csv file called "transformed_incremental.csv"

In [35]:
df_incremental.to_csv('transformed_incremental.csv', index=False)