Task 1: Data Ingestion and Cleansing

In [85]:
import pandas as pd

Ingest raw data files

In [137]:
#read csv file
csvread = pd.read_csv("csvfile.csv")
csvread

Unnamed: 0,InteractionID,UserID,CampaignID,InteractionType,InteractionDate,TimeSpent
0,1,101,1,Click,2023-10-01,2
1,2,102,2,View,2023-10-01,14
2,3,103,1,Click,2023-10-02,8


In [87]:
#read json file
jsonread = pd.read_json("jsonfile.json", orient='columns')
jsonread

Unnamed: 0,CampaignID,CampaignName,StartDate,EndDate
0,1,Summer Sale,2023-09-15,2023-10-15


In [88]:
import xml.etree.ElementTree as et

In [89]:
etree = et.parse('userprofile.xml') 
root = etree.getroot()

In [90]:
root

<Element 'Users' at 0x000001EC546B99E0>

In [91]:
data = []

In [92]:
def parse_element(element, item):
    if len(list(element)) ==0:
        item[element.tag] =element.text
    
    else: 
        for child in list(element):
            parse_element(child, item)

In [93]:
for child in root:
    item = {}
    parse_element(child, item)
    data.append(item)

In [94]:
data

[{'UserID': '101', 'UserName': 'Alice', 'Email': 'alice@example.com'},
 {'UserID': '102', 'UserName': 'Bob', 'Email': 'bob@example.com'},
 {'UserID': '103', 'UserName': 'Charlie', 'Email': 'charlie@example.com'}]

In [95]:
#read xml file
xmlread = pd.DataFrame(data)
xmlread

Unnamed: 0,UserID,UserName,Email
0,101,Alice,alice@example.com
1,102,Bob,bob@example.com
2,103,Charlie,charlie@example.com


Datatype Conversion

In [102]:
#datatype conversion for user profiles table
xmlread['UserID'] = xmlread['UserID'].astype('int32')
xmlread['UserName'] = xmlread['UserName'].astype('string')
xmlread['Email'] = xmlread['Email'].astype('string')

In [103]:
print(xmlread['UserID'].dtypes)
print(xmlread['UserName'].dtypes)
print(xmlread['Email'].dtypes)

int32
string
string


In [98]:
#datatype conversion for campaign details table
jsonread['CampaignID'] = jsonread['CampaignID'].astype('int32')
jsonread['CampaignName'] = jsonread['CampaignName'].astype('string')
jsonread['StartDate'] = jsonread['StartDate'].astype('datetime64[s]')
jsonread['EndDate'] = jsonread['EndDate'].astype('datetime64[s]')

In [99]:
print(jsonread['CampaignID'].dtypes)
print(jsonread['CampaignName'].dtypes)
print(jsonread['StartDate'].dtypes)
print(jsonread['EndDate'].dtypes)

int32
string
datetime64[s]
datetime64[s]


In [138]:
#datatype conversion for customer interactions table
csvread['InteractionID'] = csvread['InteractionID'].astype('int32')
csvread['UserID'] = csvread['UserID'].astype('int32')
csvread['CampaignID'] = csvread['CampaignID'].astype('int32')
csvread['InteractionType'] = csvread['InteractionType'].astype('string')
csvread['InteractionDate'] = csvread['InteractionDate'].astype('datetime64[s]')
csvread['TimeSpent'] = csvread['TimeSpent'].astype('int32')

In [139]:
print(csvread['InteractionID'].dtypes)
print(csvread['UserID'].dtypes)
print(csvread['CampaignID'].dtypes)
print(csvread['InteractionType'].dtypes)
print(csvread['InteractionDate'].dtypes)
print(csvread['TimeSpent'].dtypes)

int32
int32
int32
string
datetime64[s]
int32


Handling missing values

In [36]:
#handling null values
csvread.replace('nan.nan', '-',inplace=True)
jsonread.replace('nan.nan', '-',inplace=True)
xmlread.replace('nan.nan', '-',inplace=True)

Remove Duplicates

In [132]:
#removing duplicates based on unique identifiers
csvread.drop_duplicates(subset=['InteractionID'], inplace=True)
jsonread.drop_duplicates(subset=['CampaignID'], inplace=True)
xmlread.drop_duplicates(subset=['UserID'], inplace=True)

Load data to database

In [15]:
import pyodbc
from sqlalchemy import create_engine
import pandas as pd

In [14]:
SERVER='karie666.database.windows.net'
DATABASE='kariedb'
DRIVER='ODBC Driver 17 for SQL Server'
USERNAME='destiny'
PASSWORD='karie1234A2'
ODBC_PARAMETERS=f'Driver={DRIVER};Server=tcp:{SERVER},1433;Database={DATABASE};Uid={USERNAME};Pwd={PASSWORD};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;'

In [15]:
CONNECTION_STRING = f'mssql+pyodbc:///?odbc_connect={ODBC_PARAMETERS}'
engine = create_engine(CONNECTION_STRING,echo=True)
print("connection is ok")

connection is ok


In [141]:
csvread.to_sql('stg_customer_interactions', con=engine, index=False)

2024-05-30 15:10:39,782 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-30 15:10:39,788 INFO sqlalchemy.engine.Engine SELECT [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME] 
FROM [INFORMATION_SCHEMA].[TABLES] 
WHERE ([INFORMATION_SCHEMA].[TABLES].[TABLE_TYPE] = CAST(? AS NVARCHAR(max)) OR [INFORMATION_SCHEMA].[TABLES].[TABLE_TYPE] = CAST(? AS NVARCHAR(max))) AND [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME] = CAST(? AS NVARCHAR(max)) AND [INFORMATION_SCHEMA].[TABLES].[TABLE_SCHEMA] = CAST(? AS NVARCHAR(max))
2024-05-30 15:10:39,789 INFO sqlalchemy.engine.Engine [cached since 1.437e+04s ago] ('BASE TABLE', 'VIEW', 'stg_customer_interactions', 'dbo')
2024-05-30 15:10:39,856 INFO sqlalchemy.engine.Engine 
CREATE TABLE stg_customer_interactions (
	[InteractionID] INTEGER NULL, 
	[UserID] INTEGER NULL, 
	[CampaignID] INTEGER NULL, 
	[InteractionType] VARCHAR(max) NULL, 
	[InteractionDate] DATETIME NULL, 
	[TimeSpent] INTEGER NULL
)


2024-05-30 15:10:39,859 INFO sqlalchemy.engine.Engine [no 

3

In [100]:
jsonread.to_sql('stg_campaign_details', con=engine, index=False)

2024-05-30 11:59:46,344 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-30 11:59:46,347 INFO sqlalchemy.engine.Engine SELECT [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME] 
FROM [INFORMATION_SCHEMA].[TABLES] 
WHERE ([INFORMATION_SCHEMA].[TABLES].[TABLE_TYPE] = CAST(? AS NVARCHAR(max)) OR [INFORMATION_SCHEMA].[TABLES].[TABLE_TYPE] = CAST(? AS NVARCHAR(max))) AND [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME] = CAST(? AS NVARCHAR(max)) AND [INFORMATION_SCHEMA].[TABLES].[TABLE_SCHEMA] = CAST(? AS NVARCHAR(max))
2024-05-30 11:59:46,348 INFO sqlalchemy.engine.Engine [cached since 2916s ago] ('BASE TABLE', 'VIEW', 'stg_campaign_details', 'dbo')
2024-05-30 11:59:46,396 INFO sqlalchemy.engine.Engine 
CREATE TABLE stg_campaign_details (
	[CampaignID] INTEGER NULL, 
	[CampaignName] VARCHAR(max) NULL, 
	[StartDate] DATETIME NULL, 
	[EndDate] DATETIME NULL
)


2024-05-30 11:59:46,397 INFO sqlalchemy.engine.Engine [no key 0.00119s] ()
2024-05-30 11:59:46,437 INFO sqlalchemy.engine.Engine INSERT INTO

1

In [104]:
xmlread.to_sql('stg_user_profiles', con=engine, index=False)

2024-05-30 12:00:56,872 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-30 12:00:56,875 INFO sqlalchemy.engine.Engine SELECT [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME] 
FROM [INFORMATION_SCHEMA].[TABLES] 
WHERE ([INFORMATION_SCHEMA].[TABLES].[TABLE_TYPE] = CAST(? AS NVARCHAR(max)) OR [INFORMATION_SCHEMA].[TABLES].[TABLE_TYPE] = CAST(? AS NVARCHAR(max))) AND [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME] = CAST(? AS NVARCHAR(max)) AND [INFORMATION_SCHEMA].[TABLES].[TABLE_SCHEMA] = CAST(? AS NVARCHAR(max))
2024-05-30 12:00:56,876 INFO sqlalchemy.engine.Engine [cached since 2987s ago] ('BASE TABLE', 'VIEW', 'stg_user_profiles', 'dbo')
2024-05-30 12:00:56,912 INFO sqlalchemy.engine.Engine 
CREATE TABLE stg_user_profiles (
	[UserID] INTEGER NULL, 
	[UserName] VARCHAR(max) NULL, 
	[Email] VARCHAR(max) NULL
)


2024-05-30 12:00:56,913 INFO sqlalchemy.engine.Engine [no key 0.00130s] ()
2024-05-30 12:00:56,933 INFO sqlalchemy.engine.Engine INSERT INTO stg_user_profiles ([UserID], [UserName],

3

In [143]:
query= "SELECT * FROM [dbo].[stg_customer_interactions]"
customer_interact=pd.read_sql(query,engine)

2024-05-30 15:12:40,467 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-30 15:12:40,468 INFO sqlalchemy.engine.Engine SELECT [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME] 
FROM [INFORMATION_SCHEMA].[TABLES] 
WHERE ([INFORMATION_SCHEMA].[TABLES].[TABLE_TYPE] = CAST(? AS NVARCHAR(max)) OR [INFORMATION_SCHEMA].[TABLES].[TABLE_TYPE] = CAST(? AS NVARCHAR(max))) AND [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME] = CAST(? AS NVARCHAR(max)) AND [INFORMATION_SCHEMA].[TABLES].[TABLE_SCHEMA] = CAST(? AS NVARCHAR(max))
2024-05-30 15:12:40,469 INFO sqlalchemy.engine.Engine [cached since 1.449e+04s ago] ('BASE TABLE', 'VIEW', 'SELECT * FROM [dbo].[stg_customer_interactions]', 'dbo')
2024-05-30 15:12:40,505 INFO sqlalchemy.engine.Engine SELECT * FROM [dbo].[stg_customer_interactions]
2024-05-30 15:12:40,507 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-05-30 15:12:40,532 INFO sqlalchemy.engine.Engine ROLLBACK


In [144]:
customer_interact

Unnamed: 0,InteractionID,UserID,CampaignID,InteractionType,InteractionDate,TimeSpent
0,1,101,1,Click,2023-10-01,2
1,2,102,2,View,2023-10-01,14
2,3,103,1,Click,2023-10-02,8


In [118]:
query2= "SELECT * FROM [dbo].[stg_campaign_details]"
campaign=pd.read_sql(query2,engine)

2024-05-30 12:10:12,215 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-30 12:10:12,216 INFO sqlalchemy.engine.Engine SELECT [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME] 
FROM [INFORMATION_SCHEMA].[TABLES] 
WHERE ([INFORMATION_SCHEMA].[TABLES].[TABLE_TYPE] = CAST(? AS NVARCHAR(max)) OR [INFORMATION_SCHEMA].[TABLES].[TABLE_TYPE] = CAST(? AS NVARCHAR(max))) AND [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME] = CAST(? AS NVARCHAR(max)) AND [INFORMATION_SCHEMA].[TABLES].[TABLE_SCHEMA] = CAST(? AS NVARCHAR(max))
2024-05-30 12:10:12,217 INFO sqlalchemy.engine.Engine [cached since 3542s ago] ('BASE TABLE', 'VIEW', 'SELECT * FROM [dbo].[stg_campaign_details]', 'dbo')
2024-05-30 12:10:12,250 INFO sqlalchemy.engine.Engine SELECT * FROM [dbo].[stg_campaign_details]
2024-05-30 12:10:12,251 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-05-30 12:10:12,268 INFO sqlalchemy.engine.Engine ROLLBACK


In [119]:
campaign

Unnamed: 0,CampaignID,CampaignName,StartDate,EndDate
0,1,Summer Sale,2023-09-15,2023-10-15


In [116]:
query3= "SELECT * FROM [dbo].[stg_user_profiles]"
user=pd.read_sql(query3,engine)

2024-05-30 12:10:02,324 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-05-30 12:10:02,325 INFO sqlalchemy.engine.Engine SELECT [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME] 
FROM [INFORMATION_SCHEMA].[TABLES] 
WHERE ([INFORMATION_SCHEMA].[TABLES].[TABLE_TYPE] = CAST(? AS NVARCHAR(max)) OR [INFORMATION_SCHEMA].[TABLES].[TABLE_TYPE] = CAST(? AS NVARCHAR(max))) AND [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME] = CAST(? AS NVARCHAR(max)) AND [INFORMATION_SCHEMA].[TABLES].[TABLE_SCHEMA] = CAST(? AS NVARCHAR(max))
2024-05-30 12:10:02,326 INFO sqlalchemy.engine.Engine [cached since 3532s ago] ('BASE TABLE', 'VIEW', 'SELECT * FROM [dbo].[stg_user_profiles]', 'dbo')
2024-05-30 12:10:02,362 INFO sqlalchemy.engine.Engine SELECT * FROM [dbo].[stg_user_profiles]
2024-05-30 12:10:02,363 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-05-30 12:10:02,379 INFO sqlalchemy.engine.Engine ROLLBACK


In [117]:
user

Unnamed: 0,UserID,UserName,Email
0,101,Alice,alice@example.com
1,102,Bob,bob@example.com
2,103,Charlie,charlie@example.com


Task 2: Data Transformation and Aggregation

In [131]:
#calculate the total number of interactions per campaign and store the results
total_interactions = customer_interact.groupby('CampaignID').agg(Total_Interactions_Per_Campaign=('InteractionID', 'count')).reset_index()
result = total_interactions
print(result)


   CampaignID  Total_Interactions_Per_Campaign
0           1                                2
1           2                                1


In [149]:
#calculate the average time spent on the website for each user and store the results
#no time available, hence i added a column name "TimeSpent" in this case
time_spent = customer_interact.groupby('UserID').agg(Average_Time_Spent=('TimeSpent', 'mean')).reset_index()
result2 = time_spent
print(result2)

   UserID  Average_Time_Spent
0     101                 2.0
1     102                14.0
2     103                 8.0


In [155]:
#aggregate customer data to create a summary table with key customer metrics
summary = user.merge(result2, on='UserID', how='left')
print(summary)

   UserID UserName                Email  Average_Time_Spent
0     101    Alice    alice@example.com                 2.0
1     102      Bob      bob@example.com                14.0
2     103  Charlie  charlie@example.com                 8.0
