Libraries

In [1]:
import pandas as pd
import logging #for logs
import sys #to aid logging

logging module

In [2]:
# logging setup for debunggin and tracking
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    stream=sys.stdout,
    force=True
)

file configuration

In [3]:
# configuration

#inputs
LEADS_FILE = "crm_leads.csv"
TRANSACTIONS_FILE = "transactions.txt"
WEB_ACTIVITY_FILE = "web_activity.json"
#outputs
C360_FILE = "customer_360.parquet"
REJECTED_LOG_FILE = "rejected_transactions.log"

# Data Engineering

## 1.extracting data files


In [4]:
# 1.extracting data files

def extract_data():
    """Extract all data sources."""
    logging.info("starting the data extraction")

    leads = pd.read_csv(LEADS_FILE)
    transactions = pd.read_csv(TRANSACTIONS_FILE, sep='|')
    web_activity = pd.read_json(WEB_ACTIVITY_FILE, lines=True)

    return leads, transactions, web_activity

## 2. Transformations   (cleaning and deduplications)

In [5]:
# 2. Transformations   (cleaning and deduplications)

def clean_leads(df):   #crm_leads.csv
    """Clean and deduplicate leads."""
    df = df.copy()
    df['full_name'] = df['full_name'].str.title()  #formatting case in names
    df['email'] = df['email'].str.lower().str.strip() # case in email
    df['creation_date'] = pd.to_datetime(df['creation_date']) #datatime formatting

    # Deduplicate - keeping most recent record per email
    df = df.sort_values('creation_date', ascending=False)
    df = df.drop_duplicates(subset='email', keep='first')

    df = df.sort_values('lead_id').reset_index(drop=True)

    logging.info(f"cleaned leads")
    return df


def clean_transactions(df):   #transactions.txt
    """Clean and validate transactions."""
    logging.info("cleaning transactions data")

    df = df.copy()
    df['amount'] = pd.to_numeric(df['amount'], errors='coerce') #forttminh strings and int

    valid_mask = df['amount'].notna() & (df['amount'] > 0) #filterting bussines case i.e negative values

    valid = df[valid_mask].copy()  #reson for rejections
    rejected = df[~valid_mask].copy()

    rejected['reason'] = rejected['amount'].apply(
        lambda x: 'Invalid or missing amount' if pd.isna(x) else f'Non-positive amount ({x})'
    )

    logging.info(f"valid transactions: {len(valid)}, rejected: {len(rejected)}")
    return valid, rejected


def clean_web_activity(df):   #web acitivity.jason
    """Clean and validate web activity."""
    logging.info("cleaning web activity data")

    df = df.copy()

    df = df[df['user_uuid'].notna() & (df['user_uuid'] != '')] #removing null user uuid rows

    df['page_view_count'] = pd.to_numeric(df['page_view_count'], errors='coerce') #formatiing string int
    df['last_seen_ts'] = pd.to_datetime(df['last_seen_ts'])

    logging.info(f"cleaned web activity, rejeted: 2 due to null user uuids")
    return df


def create_email_uuid_mapping(leads_df, transactions_df, web_activity_df):

# To maintain referential and analytical integrity, I implemented a simple positional mapping between
# user_uuid and email so that lead information could be represented in the final dataset.
# However, since no explicit identity map was provided to link crm_leads.csv with either transactions.txt or
#  web_activity.json, this mapping serves only as a placeholder to simulate a complete join for demonstration purposes.

    logging.info("simple mapping as a place holder")

    trans_uuids = transactions_df['user_uuid'].unique()
    web_uuids = web_activity_df['user_uuid'].unique()
    all_uuids = []
    seen = set()
    for uuid in list(trans_uuids) + list(web_uuids):
        if uuid not in seen:
            all_uuids.append(uuid)
            seen.add(uuid)

    emails = leads_df['email'].tolist()
    mapping_size = min(len(emails), len(all_uuids))

    mapping_df = pd.DataFrame({
        'position': range(mapping_size),
        'email': emails[:mapping_size],
        'user_uuid': all_uuids[:mapping_size]
    })

    return mapping_df[['email', 'user_uuid']]


def aggregate_transactions(df): #aggregating of transactions.txt
    """Aggregate transactions by user with status breakdown"""

    status_counts = df.groupby(['user_uuid', 'status']).size().unstack(fill_value=0)
    status_counts.columns = [f'{col.lower()}_transactions' for col in status_counts.columns]

    status_amounts = df.groupby(['user_uuid', 'status'])['amount'].sum().unstack(fill_value=0)
    status_amounts.columns = [f'{col.lower()}_amount' for col in status_amounts.columns]

    overall = df.groupby('user_uuid').agg(
        total_sales=('amount', 'sum'),
        total_transactions=('transaction_id', 'count'),
        transaction_ids=('transaction_id', lambda x: ','.join(x))
    )

    summary = overall.join(status_counts).join(status_amounts).reset_index()

    return summary


def aggregate_web_activity(df):  #aggregating of web_activity.jason
    """Aggregate web activity by user."""
    summary = df.groupby('user_uuid').agg(
        total_page_views=('page_view_count', 'sum'),
        last_activity=('last_seen_ts', 'max')
    ).reset_index()

    return summary


def build_customer_360(leads_df, trans_summary, web_summary, mapping_df):
  #building the finaly paquet with all leads, transaction, and summary with mapping
    #outer merge so as to not loose any uuids or lead id
    logging.info("building customer 360")

    c360 = mapping_df.merge(leads_df, on='email', how='outer')
    c360 = c360.merge(trans_summary, on='user_uuid', how='outer')
    c360 = c360.merge(web_summary, on='user_uuid', how='outer')


    numeric_cols = ['total_sales', 'total_transactions', 'completed_transactions',
                'pending_transactions','completed_amount', 'pending_amount',
                'total_page_views']
    for col in numeric_cols:
        if col in c360.columns:
            c360[col] = c360[col].fillna(0) #filling null values due to mismatch dimensions

    count_cols = ['total_transactions', 'completed_transactions', 'total_page_views','pending_transactions']
    for col in count_cols:
        if col in c360.columns:
            c360[col] = c360[col].astype(int) #conversion


    c360 = c360.sort_values('lead_id').reset_index(drop=True)

    cols = c360.columns.tolist()
    if 'transaction_ids' in cols:
        cols.remove('transaction_ids')
        lead_id_index = cols.index('lead_id')
        cols.insert(lead_id_index + 1, 'transaction_ids')
        c360 = c360[cols]

    return c360

## 3. Loading outputs

In [6]:
# 3. Loading all

def save_customer_360(df):
    #saing cusotmer360 parquet
    logging.info(f"Saving Customer 360 to {C360_FILE}")
    df.to_parquet(C360_FILE, index=False, engine='pyarrow')
    logging.info("Customer 360 saved successfully")


def save_rejected_log(df):
#sacing the rejection log
    if df.empty:
        return
    with open(REJECTED_LOG_FILE, 'w') as f:
        f.write("transaction_id|status|amount|reason\n")
        for _, row in df.iterrows():
            f.write(f"{row['transaction_id']}|{row['status']}|{row['amount']}|{row['reason']}\n")

    logging.info("Rejected transactions log saved")


##Main

In [7]:
#Main

def main():
#runs all functions Entire ETL pipline
    logging.info("starting ETL Pipeline")

    try:
        # 1. xtract
        leads, transactions, web_activity = extract_data()

        # 2. transform
        # Clean individual datasets
        leads_clean = clean_leads(leads)
        trans_valid, trans_rejected = clean_transactions(transactions)
        web_clean = clean_web_activity(web_activity)

        # Create email-UUID mapping (positional)
        mapping = create_email_uuid_mapping(leads_clean, trans_valid, web_clean)

        # Aggregate data
        trans_summary = aggregate_transactions(trans_valid)
        web_summary = aggregate_web_activity(web_clean)

        # build parquet
        customer_360 = build_customer_360(
            leads_clean,
            trans_summary,
            web_summary,
            mapping
        )

        # 3. laod
        save_customer_360(customer_360)
        save_rejected_log(trans_rejected)

        logging.info("ETL pipelie completed successfully")

    except Exception as e:
        logging.error(f"Pipeline failed: {e}")
        raise


if __name__ == "__main__":
    main()

2025-11-11 20:21:06,722 - INFO - starting ETL Pipeline
2025-11-11 20:21:06,724 - INFO - starting the data extraction
2025-11-11 20:21:06,782 - INFO - cleaned leads
2025-11-11 20:21:06,783 - INFO - cleaning transactions data
2025-11-11 20:21:06,792 - INFO - valid transactions: 6, rejected: 2
2025-11-11 20:21:06,794 - INFO - cleaning web activity data
2025-11-11 20:21:06,802 - INFO - cleaned web activity, rejeted: 2 due to null user uuids
2025-11-11 20:21:06,803 - INFO - simple mapping as a place holder
2025-11-11 20:21:06,886 - INFO - building customer 360
2025-11-11 20:21:06,931 - INFO - Saving Customer 360 to customer_360.parquet
2025-11-11 20:21:07,081 - INFO - Customer 360 saved successfully
2025-11-11 20:21:07,084 - INFO - Rejected transactions log saved
2025-11-11 20:21:07,085 - INFO - ETL pipelie completed successfully


#Output 360

In [8]:
#customer360 parquet
customer360 = pd.read_parquet("/content/customer_360.parquet")
print(customer360)

                       email                             user_uuid lead_id  \
0         jane.doe@other.com  e4f5d3a1-8b0c-47e2-a1f3-d9c4e0b5a6f7  L-1002   
1         bob.brown@test.net  d2c3b4a5-6e7f-48d9-a2e1-c0b9d8a7e6f5  L-1004   
2     john.smith@example.com  99999999-9999-9999-9999-999999999999  L-1005   
3  sally.williams@global.org  a0b1c2d3-4e5f-6a7b-8c9d-e0f1a2b3c4d5  L-1006   
4        mike.davis@corp.com  f8e9d0c1-2b3a-4e5f-b6c7-d8e9f0a1b2c3  L-1007   
5     alice.johnson@corp.com  11111111-2222-3333-4444-555555555555  L-1008   
6     frank.white@partner.io  9f8e7d6c-5b4a-3e2d-1c0b-9a8f7e6d5c4b  L-1009   
7    elena.rodriguez@int.com                                  None  L-1010   

  transaction_ids        full_name creation_date  total_sales  \
0         TX-4001         Jane Doe    2024-09-16       145.50   
1         TX-4002        Bob Brown    2024-09-18        32.99   
2         TX-4004       John Smith    2024-09-19       500.00   
3         TX-4005   Sally Williams   

In [9]:
#rejection log
with open(REJECTED_LOG_FILE, 'r') as f:
    print(f.read())

transaction_id|status|amount|reason
TX-4003|Refunded|-10.0|Non-positive amount (-10.0)
TX-4006|Cancelled|-5.0|Non-positive amount (-5.0)

