Task 1

In [16]:
import random 

def generate_support_logs(n=200):
    
    categories = ['Technical', 'Billing', 'Account Access', 'Feature Request', 'API Integration', 'Security']
    cat_weights = [85, 55, 45, 25, 20, 20] # Total = 100%

    statuses = ['Resolved', 'In Progress', 'Closed', 'Escalated', 'Pending Vendor']
    stat_weights = [80, 60, 45, 30, 35]

    dataset = []

    for i in range(n):
        category = random.choices(categories, weights=cat_weights, k =1)[0]
        status = random.choices(statuses, weights=stat_weights, k = 1)[0]

        #Simulating dirty data with 12% of an issue

        if random.random()<0.12:
            resolution = random.choice([None, 'ERR_TIMEOUT', -99, " "])
        else:
            resolution = random.randint(2, 288)    

        dataset.append({
            'ticket_id' : f'TKT-{5000+i}',
            'category' : category,
            'statuses':statuses, 
            'resolution_minutes':resolution,
            "priority": random.choice(['Low', 'Medium', 'High', 'Urgent'])
        })    

    return dataset

raw_logs = generate_support_logs(250)
print(f"Generated {len(raw_logs)} logs.")
print("Sample Entry:", raw_logs[0])

Generated 250 logs.
Sample Entry: {'ticket_id': 'TKT-5000', 'category': 'Security', 'statuses': ['Resolved', 'In Progress', 'Closed', 'Escalated', 'Pending Vendor'], 'resolution_minutes': 68, 'priority': 'High'}


In [17]:
def validate_structure(data, required_keys):
    """Checks for missing fields in any record."""
    missing = [r['ticket_id'] for r in data if not all(k in r for k in required_keys)]
    return {
        "is_valid": len(missing) == 0,
        "count_missing": len(missing),
        "flagged_ids": missing
    }

def find_numeric_anomalies(data, field_name):
    """
    Identifies records where a specific field is not a positive number.
    Returns a list of the full records for inspection.
    """
    anomalies = []
    for record in data:
        val = record.get(field_name)
        # Check: Is it an integer? Is it positive?
        if not isinstance(val, int) or val < 0:
            anomalies.append(record)
    return anomalies

# Running Validation
fields = ["ticket_id", "category", "statuses", "resolution_minutes", "priority"]
structure_report = validate_structure(raw_logs, fields)
resolution_anomalies = find_numeric_anomalies(raw_logs, "resolution_minutes")

print(f"--- Data Health Report ---")
print(f"Records with missing fields: {structure_report['count_missing']}")
print(f"Records with invalid resolution times: {len(resolution_anomalies)}")

--- Data Health Report ---
Records with missing fields: 0
Records with invalid resolution times: 34


Task 3

In [18]:
def clean_and_normalize(data):
    # 1. Identify valid resolution times to calculate a replacement value (median)
    valid_times = sorted([r['resolution_minutes'] for r in data if isinstance(r['resolution_minutes'], int) and r['resolution_minutes'] > 0])
    median_res = valid_times[len(valid_times)//2] if valid_times else 60
    
    cleaned_data = []
    
    for record in data:
        
        clean_rec = record.copy()
       
        
        # Normalize Category strings (Trimming and standardizing case)
        clean_rec['category'] = clean_rec['category'].strip().title()
        
        # Repair resolution_minutes
        val = clean_rec.get('resolution_minutes')
        if not isinstance(val, int) or val <= 0:
            clean_rec['resolution_minutes'] = median_res
            clean_rec['was_repaired'] = True 
        else:
            clean_rec['was_repaired'] = False
            
        cleaned_data.append(clean_rec)
        
    return cleaned_data

cleaned_logs = clean_and_normalize(raw_logs)
print(f"Data Cleaned. Before: {len(raw_logs)} | After: {len(cleaned_logs)}")
print("Sample Cleaned Entry:", cleaned_logs[0])

Data Cleaned. Before: 250 | After: 250
Sample Cleaned Entry: {'ticket_id': 'TKT-5000', 'category': 'Security', 'statuses': ['Resolved', 'In Progress', 'Closed', 'Escalated', 'Pending Vendor'], 'resolution_minutes': 68, 'priority': 'High', 'was_repaired': False}


Task 4

In [22]:
def get_category_metrics(data):
    metrics = {}
    for r in data:
        cat = r['category']
        if cat not in metrics:
            metrics[cat] = {'total_time': 0, 'count': 0, 'escalations': 0}
        
        metrics[cat]['total_time'] += r['resolution_minutes']
        metrics[cat]['count'] += 1
        if r['statuses'] == 'Escalated':
            metrics[cat]['escalations'] += 1
    
    summary = {}
    for cat, vals in metrics.items():
        summary[cat] = {
            "avg_res_min": round(vals['total_time'] / vals['count'], 2),
            "escalation_rate": round(vals['escalations'] / vals['count'], 4),
            "volume": vals['count']
        }
    
    
    assert sum(s['volume'] for s in summary.values()) == len(data)
    return summary

def get_priority_distribution(data):
    dist = {}
    for r in data:
        p = r['priority']
        dist[p] = dist.get(p, 0) + 1
    return dist

cat_summary = get_category_metrics(cleaned_logs)
priority_dist = get_priority_distribution(cleaned_logs)
print(cat_summary, priority_dist)

{'Security': {'avg_res_min': 129.75, 'escalation_rate': 0.0, 'volume': 20}, 'Feature Request': {'avg_res_min': 118.85, 'escalation_rate': 0.0, 'volume': 20}, 'Billing': {'avg_res_min': 137.6, 'escalation_rate': 0.0, 'volume': 57}, 'Account Access': {'avg_res_min': 144.56, 'escalation_rate': 0.0, 'volume': 43}, 'Technical': {'avg_res_min': 146.46, 'escalation_rate': 0.0, 'volume': 92}, 'Api Integration': {'avg_res_min': 136.5, 'escalation_rate': 0.0, 'volume': 18}} {'High': 57, 'Low': 67, 'Medium': 63, 'Urgent': 63}


Task 5

In [24]:

def generate_final_report(data, cat_stats, prio_stats):
    total_tickets = len(data)
    total_escalated = sum(1 for r in data if r['statuses'] == 'Escalated')
    
    report = {
        "summary_metadata": {
            "total_processed": total_tickets,
            "overall_escalation_rate": f"{round((total_escalated / total_tickets) * 100, 2)}%"
        },
        "by_category": cat_stats,
        "priority_breakdown": prio_stats
    }
    return report

final_report = generate_final_report(cleaned_logs, cat_summary, priority_dist)

# Displaying a compact version of the report
import json
print("\n--- FINAL SUPPORT LOG REPORT ---")
print(json.dumps(final_report, indent=2))


--- FINAL SUPPORT LOG REPORT ---
{
  "summary_metadata": {
    "total_processed": 250,
    "overall_escalation_rate": "0.0%"
  },
  "by_category": {
    "Security": {
      "avg_res_min": 129.75,
      "escalation_rate": 0.0,
      "volume": 20
    },
    "Feature Request": {
      "avg_res_min": 118.85,
      "escalation_rate": 0.0,
      "volume": 20
    },
    "Billing": {
      "avg_res_min": 137.6,
      "escalation_rate": 0.0,
      "volume": 57
    },
    "Account Access": {
      "avg_res_min": 144.56,
      "escalation_rate": 0.0,
      "volume": 43
    },
    "Technical": {
      "avg_res_min": 146.46,
      "escalation_rate": 0.0,
      "volume": 92
    },
    "Api Integration": {
      "avg_res_min": 136.5,
      "escalation_rate": 0.0,
      "volume": 18
    }
  },
  "priority_breakdown": {
    "High": 57,
    "Low": 67,
    "Medium": 63,
    "Urgent": 63
  }
}
