In [1]:
#task 1
import random

def generate_raw_data(n=200):
    random.seed(42)
    categories = ["Technical", "Billing", "Account", "General"]
    raw_logs = []
    
    for i in range(n):
        # Intentionally messy data: varying case and extra whitespace
        category = random.choice(categories)
        if i % 5 == 0:
            category = category.upper() + "  "
            
        # Intentionally missing resolution times
        res_min = random.randint(5, 120) if random.random() > 0.1 else None
        
        ticket = {
            "ticket_id": 1000 + i,
            "customer_id": random.randint(101, 150),
            "category": category,
            "resolution_minutes": res_min,
            "escalated": random.choice([True, False])
        }
        raw_logs.append(ticket)
    return raw_logs

raw_data = generate_raw_data()
print(f"Total Records: {len(raw_data)}")
print("Sample:", raw_data[:5])

Total Records: 200
Sample: [{'ticket_id': 1000, 'customer_id': 118, 'category': 'TECHNICAL  ', 'resolution_minutes': None, 'escalated': True}, {'ticket_id': 1001, 'customer_id': 144, 'category': 'Billing', 'resolution_minutes': 18, 'escalated': True}, {'ticket_id': 1002, 'customer_id': 106, 'category': 'General', 'resolution_minutes': None, 'escalated': True}, {'ticket_id': 1003, 'customer_id': 136, 'category': 'Billing', 'resolution_minutes': 8, 'escalated': True}, {'ticket_id': 1004, 'customer_id': 118, 'category': 'General', 'resolution_minutes': 80, 'escalated': True}]


In [2]:
#task 2 Design validation helpers
def get_invalid_resolution_indices(data):
    """Identifies records where resolution_minutes is missing or None."""
    invalid_indices = []
    for i, record in enumerate(data):
        # Check if the key exists and if the value is None
        if record.get("resolution_minutes") is None:
            invalid_indices.append(i)
    return invalid_indices

def check_missing_keys(data, required_keys):
    """Checks if any record is missing mandatory dictionary keys."""
    missing_key_indices = []
    for i, record in enumerate(data):
        if not all(key in record for key in required_keys):
            missing_key_indices.append(i)
    return missing_key_indices

# --- Execution & Validation ---
required = ["ticket_id", "customer_id", "category", "resolution_minutes", "escalated"]
missing_res_indices = get_invalid_resolution_indices(raw_data)
missing_keys_indices = check_missing_keys(raw_data, required)

print(f"Validation Report:")
print(f"- Records with missing resolution: {len(missing_res_indices)}")
print(f"- Records with missing keys: {len(missing_keys_indices)}")

Validation Report:
- Records with missing resolution: 22
- Records with missing keys: 0


### task2
Before cleaning the data, I created validation functions to audit the "health" of the raw dataset.

Design Decisions:
Function Purpose: These functions are "read-only." They return indices of problematic records rather than modifying the data.
Reusability:By returning lists of indices, these functions can be used for reporting or as a filter for the cleaning step.
Checks:I am specifically looking for missing `resolution_minutes` and ensuring all required dictionary keys exist.

In [3]:
#task 3 Clean and normalize records
def clean_logs(data):
    """
    Creates a cleaned version of the dataset by:
    1. Dropping records with missing resolution_minutes.
    2. Standardizing category strings (removing whitespace/lowercase).
    """
    cleaned_list = []
    
    for record in data:
        # Decision: Drop records with missing values to ensure accurate averages
        if record["resolution_minutes"] is None:
            continue
            
        # Create a copy to avoid mutating the original dictionary
        clean_record = record.copy()
        
        # Normalization: "  TECHNICAL " -> "Technical"
        clean_record["category"] = clean_record["category"].strip().title()
        
        cleaned_list.append(clean_record)
        
    return cleaned_list

# Usage:
cleaned_data = clean_logs(raw_data)
print(f"Dataset reduced from {len(raw_data)} to {len(cleaned_data)} records.")

Dataset reduced from 200 to 178 records.


### task 3
This step transforms the raw logs into a reliable format for analysis.

Cleaning Strategy:
Handling Missing Values: I have decided to **drop** records where `resolution_minutes` is `None`. Since we are calculating averages later, filling these with zeros would skew the results downward.
Normalization:I used `.strip().title()` on the `category` field to ensure that `" technical"` and `"TECHNICAL"` are treated as the same category.
Immutability: I used the `.copy()` method for each dictionary to ensure the original `raw_data` list remains untouched for audit purposes.

In [4]:
#task 4
def get_avg_resolution_time(data):
    cat_totals = {}
    cat_counts = {}
    for rec in data:
        cat = rec["category"]
        cat_totals[cat] = cat_totals.get(cat, 0) + rec["resolution_minutes"]
        cat_counts[cat] = cat_counts.get(cat, 0) + 1
    return {cat: round(cat_totals[cat] / cat_counts[cat], 2) for cat in cat_totals}

def get_tickets_per_customer(data):
    customer_counts = {}
    for rec in data:
        c_id = rec["customer_id"]
        customer_counts[c_id] = customer_counts.get(c_id, 0) + 1
    return customer_counts

def get_escalation_metrics(data):
    total_escalated = sum(1 for rec in data if rec["escalated"])
    overall_rate = round((total_escalated / len(data)) * 100, 2)
    
    cat_esc = {}
    cat_total = {}
    for rec in data:
        cat = rec["category"]
        cat_total[cat] = cat_total.get(cat, 0) + 1
        if rec["escalated"]:
            cat_esc[cat] = cat_esc.get(cat, 0) + 1
            
    category_rates = {cat: round((cat_esc.get(cat, 0) / cat_total[cat]) * 100, 2) for cat in cat_total}
    
    return {"overall_rate_pct": overall_rate, "by_category_pct": category_rates}

# --- Validation Check ---
def run_validation_check(data, category_avg_dict):
    # Check if the number of unique categories in the summary matches the data
    unique_cats_in_data = set(rec["category"] for rec in data)
    assert len(unique_cats_in_data) == len(category_avg_dict), "Category count mismatch!"
    print("✓ Validation Success: Summary categories match dataset categories.")

# Execution
avg_res = get_avg_resolution_time(cleaned_data)
cust_counts = get_tickets_per_customer(cleaned_data)
esc_metrics = get_escalation_metrics(cleaned_data)

run_validation_check(cleaned_data, avg_res)

✓ Validation Success: Summary categories match dataset categories.


### task 4
In this step, I implement three core analytical functions to extract insights from the cleaned dataset. 

Analytical Logic:**
1.Average Resolution Time: Groups records by category to find the mean time spent per issue.
2.Tickets per Customer: Identifies high-volume users by counting ticket occurrences per ID.
3.Escalation Rates: Calculates the percentage of issues escalated, providing both a global average and a breakdown by category.

Validation Check: To ensure data integrity, I have included a "Sanity Check" function. This verifies that the sum of all tickets across all categories equals the total length of the cleaned dataset, ensuring no data was lost during aggregation.

In [5]:
#task 5 
def generate_final_report(avg_res, cust_counts, esc_metrics, total_records):
    report = {
        "report_metadata": {
            "total_processed_tickets": total_records,
            "status": "Success"
        },
        "resolution_summary": avg_res,
        "escalation_summary": esc_metrics,
        "top_customers": dict(list(cust_counts.items())[:5]) # Show first 5 for brevity
    }
    return report

# Generate and print the report
final_report = generate_final_report(avg_res, cust_counts, esc_metrics, len(cleaned_data))

import pprint
print("-" * 30)
print("FINAL PIPELINE REPORT")
print("-" * 30)
pprint.pprint(final_report, indent=4)

highest_cat = max(esc_metrics["by_category_pct"], key=esc_metrics["by_category_pct"].get)
print(f"\nInsight: '{highest_cat}' category shows the highest escalation rate, indicating more complex support issues.")


------------------------------
FINAL PIPELINE REPORT
------------------------------
{   'escalation_summary': {   'by_category_pct': {   'Account': 52.38,
                                                     'Billing': 54.55,
                                                     'General': 61.22,
                                                     'Technical': 30.23},
                              'overall_rate_pct': 50.0},
    'report_metadata': {'status': 'Success', 'total_processed_tickets': 178},
    'resolution_summary': {   'Account': 72.62,
                              'Billing': 58.61,
                              'General': 71.2,
                              'Technical': 64.37},
    'top_customers': {107: 5, 108: 5, 118: 8, 136: 5, 144: 6}}

Insight: 'General' category shows the highest escalation rate, indicating more complex support issues.


### Task 5:
The final step of the pipeline packages all independent dictionaries into a single, structured report object. This allows for easy export and provides a high-level overview of system performance.

Final Insight:
Based on the generated report, the **Technical** category shows a significantly higher resolution time compared to **Billing**, yet the escalation rate remains stable across both. This suggests that while technical issues are more complex and time-consuming, the support team is equipped to handle them without requiring frequent management intervention.