# DATA SCIENCE ASSIGNMENT â€“ SMART CITY

# TRAFFIC & SAFETY ANALYTICS

In [17]:
traffic_logs = [
    "id:501,zone:A1,vehicle:Car,speed:62,time:07:30,violations:[None],status:Smooth",
    "id:502,zone:A1,vehicle:Bike,speed:85,time:09:10,violations:[Helmet],status:Busy",
    "id:503,zone:B2,vehicle:Bus,speed:45,time:17:25,violations:[None],status:Smooth",
    "id:504,zone:C3,vehicle:Car,speed:110,time:14:15,violations:[Overspeed],status:Congested",
    "id:505,zone:A1,vehicle:Truck,speed:40,time:18:50,violations:[None],status:Smooth"
]


Q1: Convert each traffic log into structured Python data with validation.

In [18]:
def structured(log):
    fields = log.split(',')
    entry = {}
    
    for field in fields:
        key, value = field.split(':', 1)

        if key == "violations":
            value = value.strip("[]")
            value = None if value == "None" else value
        elif key == "speed":
            value = int(value)
        else:
            value = value

        entry[key] = value
    return entry


structured_data = [structured(log) for log in traffic_logs]

for d in structured_data:
    print(d)


{'id': '501', 'zone': 'A1', 'vehicle': 'Car', 'speed': 62, 'time': '07:30', 'violations': None, 'status': 'Smooth'}
{'id': '502', 'zone': 'A1', 'vehicle': 'Bike', 'speed': 85, 'time': '09:10', 'violations': 'Helmet', 'status': 'Busy'}
{'id': '503', 'zone': 'B2', 'vehicle': 'Bus', 'speed': 45, 'time': '17:25', 'violations': None, 'status': 'Smooth'}
{'id': '504', 'zone': 'C3', 'vehicle': 'Car', 'speed': 110, 'time': '14:15', 'violations': 'Overspeed', 'status': 'Congested'}
{'id': '505', 'zone': 'A1', 'vehicle': 'Truck', 'speed': 40, 'time': '18:50', 'violations': None, 'status': 'Smooth'}


Q2: Calculate average speed per zone.

In [19]:
def avg_speed_per_zone(data):
    zones = {}
    for d in data:
        z = d["zone"]
        
        if z not in zones:
            zones[z] = []
        zones[z].append(d["speed"])
    
    result = {}
    for z, v in zones.items():
        result[z] = sum(v) / len(v)
    return result

print(avg_speed_per_zone(structured_data))

{'A1': 62.333333333333336, 'B2': 45.0, 'C3': 110.0}


Q3: Determine peak hour (hour with highest traffic entries).

In [26]:
def peak_hour(data):
    freq = {}
    for d in data:
        hr = int(d['time'].split(':')[0])
        freq[hr] = freq.get(hr, 0) + 1
    return max(freq, key=freq.get)

print("Peak traffic hour:", peak_hour(structured_data))

Peak traffic hour: 7


Q4: Find vehicles with speed > 80 km/h.

In [28]:
def speed_over_80(data):
    veh = []
    for d in data:
        if d["speed"] > 80:
           veh = d["vehicle"] 

    return veh
print(speed_over_80(structured_data))

Car


Q5: Count occurrences of each violation type.

In [43]:
def occurance(data):
    counts = {}
    for d in data:
        v = d["violations"]
        if v is not None:
            counts[v] = counts.get(v, 0)+1
    return counts
        
print(occurance(structured_data))             

{'Helmet': 1, 'Overspeed': 1}


Q6: Compute safety index for each zone.

In [28]:
def safety_index(data):
    zones = {}

    for d in data:
        score = 100

        if d["violations"] == "Overspeed":
            score -= 10
        if d["violations"] == "Helmet":
            score -= 5
        if d["status"] == "Busy":
            score -= 5
        if d["status"] == "Congested":
            score -= 15

        z = d["zone"]
        if z not in zones:
            zones[z] = []
        zones[z].append(score)

    result = {}
    for z, scores in zones.items():
        result[z] = sum(scores) / len(scores)
    return result

print(safety_index(structured_data))


{'A1': 96.66666666666667, 'B2': 100.0, 'C3': 75.0}


Q7: Create a summary for each vehicle category.

In [30]:
def vehicle_summary(data):
    summary = {}
    for d in data:
        v = d["vehicle"]
        if v not in summary:
            summary[v] = {"count": 0, "avg_speed": 0, "violations": []}
        
        summary[v]["count"] += 1
        summary[v]["avg_speed"] += d["speed"]
        
        if d["violations"] is not None:
            summary[v]["violations"].append(d["violations"])

    # finalize avg speed
    for v in summary:
        summary[v]["avg_speed"] /= summary[v]["count"]
    
    return summary

print(vehicle_summary(structured_data))


{'Car': {'count': 2, 'avg_speed': 86.0, 'violations': ['Overspeed']}, 'Bike': {'count': 1, 'avg_speed': 85.0, 'violations': ['Helmet']}, 'Bus': {'count': 1, 'avg_speed': 45.0, 'violations': []}, 'Truck': {'count': 1, 'avg_speed': 40.0, 'violations': []}}


Q8: Identify high-congestion zones.

In [32]:
def congested_zones(data):
    return list({d["zone"] for d in data if d["status"] == "Congested"})

print(congested_zones(structured_data))


['C3']


Q9: Classify each log into time windows (Morning, Afternoon, Evening, Night).

In [34]:
def time_window(data):
    for d in data:
        hr = extract_hour(d["time"])
        if 6 <= hr <= 11:
            d["time_window"] = "Morning"
        elif 12 <= hr <= 16:
            d["time_window"] = "Afternoon"
        elif 17 <= hr <= 20:
            d["time_window"] = "Evening"
        else:
            d["time_window"] = "Night"
    return data

structured_data = time_window(structured_data)
for d in structured_data:
    print(d)


{'id': '501', 'zone': 'A1', 'vehicle': 'Car', 'speed': 62, 'time': '07:30', 'violations': None, 'status': 'Smooth', 'time_window': 'Morning'}
{'id': '502', 'zone': 'A1', 'vehicle': 'Bike', 'speed': 85, 'time': '09:10', 'violations': 'Helmet', 'status': 'Busy', 'time_window': 'Morning'}
{'id': '503', 'zone': 'B2', 'vehicle': 'Bus', 'speed': 45, 'time': '17:25', 'violations': None, 'status': 'Smooth', 'time_window': 'Evening'}
{'id': '504', 'zone': 'C3', 'vehicle': 'Car', 'speed': 110, 'time': '14:15', 'violations': 'Overspeed', 'status': 'Congested', 'time_window': 'Afternoon'}
{'id': '505', 'zone': 'A1', 'vehicle': 'Truck', 'speed': 40, 'time': '18:50', 'violations': None, 'status': 'Smooth', 'time_window': 'Evening'}


Q10: Generate final zone-level report (vehicles, avg speed, violations, common vehicle type, safety
category).

In [36]:
def zone_report(data):
    zones = {}

    for d in data:
        z = d["zone"]
        if z not in zones:
            zones[z] = {
                "vehicles": 0,
                "total_speed": 0,
                "vehicle_types": {},
                "violations": []
            }
        
        zones[z]["vehicles"] += 1
        zones[z]["total_speed"] += d["speed"]

        v = d["vehicle"]
        zones[z]["vehicle_types"][v] = zones[z]["vehicle_types"].get(v, 0) + 1

        if d["violations"] is not None:
            zones[z]["violations"].append(d["violations"])




    
    final_report = {}
    for z, info in zones.items():
        final_report[z] = {
            "avg_speed": info["total_speed"] / info["vehicles"],
            "violations": info["violations"],
            "vehicles": info["vehicles"],
            "common_vehicle_type": max(info["vehicle_types"], key=info["vehicle_types"].get),
            "safety_index": safety_index(data)[z]
        }

    return final_report

print(zone_report(structured_data))


{'A1': {'avg_speed': 62.333333333333336, 'violations': ['Helmet'], 'vehicles': 3, 'common_vehicle_type': 'Car', 'safety_index': 96.66666666666667}, 'B2': {'avg_speed': 45.0, 'violations': [], 'vehicles': 1, 'common_vehicle_type': 'Bus', 'safety_index': 100.0}, 'C3': {'avg_speed': 110.0, 'violations': ['Overspeed'], 'vehicles': 1, 'common_vehicle_type': 'Car', 'safety_index': 75.0}}
