**CREATING SAMPLE DATASET**

In [2]:
import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta

# Set random seed for reproducibility
random.seed(42)
np.random.seed(42)

# Define shipments and sensors
shipments = [101, 102, 103, 104, 105]
sensors = {
    "ThermoProbe T-100": {"type": "Temperature", "unit": "C", "range": (2, 8)},
    "TempTrack X5": {"type": "Surface Temp", "unit": "C", "range": (2, 10)},
    "HumidSensor H-200": {"type": "Humidity", "unit": "%", "range": (30, 50)},
    "ShockLog S-50": {"type": "Shock", "unit": "g", "range": (0, 5)},
    "GPS ColdTrack G-12": {"type": "GPS", "unit": "km deviation", "range": (0, 5)},
    "BatteryMonitor B-10": {"type": "Battery", "unit": "V", "range": (3.2, 5)}
}

# Generate sample dataset
data = []
start_time = datetime(2025, 1, 1, 6, 0, 0)

reading_id = 1
for shipment in shipments:
    for sensor, details in sensors.items():
        timestamp = start_time
        for i in range(10):  # 10 readings per sensor per shipment
            # Normally distributed values within range, with some noise
            low, high = details["range"]
            value = np.random.uniform(low, high)

            # Inject some violations randomly
            if random.random() < 0.15:
                if details["type"] == "Temperature":
                    value = np.random.uniform(9, 12)  # spike high
                elif details["type"] == "Humidity":
                    value = np.random.uniform(55, 70)  # too humid
                elif details["type"] == "Shock":
                    value = np.random.uniform(6, 12)  # high g
                elif details["type"] == "GPS":
                    value = np.random.uniform(6, 12)  # deviation
                elif details["type"] == "Battery":
                    value = np.random.uniform(2.5, 3.1)  # low battery

            data.append([
                reading_id, shipment, sensor, details["type"], timestamp, round(value, 2), details["unit"]
            ])
            reading_id += 1
            timestamp += timedelta(minutes=5)

# Create DataFrame
df = pd.DataFrame(data, columns=[
    "reading_id", "shipment_id", "sensor_model", "sensor_type", "timestamp", "value", "unit"
])




In [13]:
print(df.head(30))

    reading_id  shipment_id       sensor_model   sensor_type  \
0            1          101  ThermoProbe T-100   Temperature   
1            2          101  ThermoProbe T-100   Temperature   
2            3          101  ThermoProbe T-100   Temperature   
3            4          101  ThermoProbe T-100   Temperature   
4            5          101  ThermoProbe T-100   Temperature   
5            6          101  ThermoProbe T-100   Temperature   
6            7          101  ThermoProbe T-100   Temperature   
7            8          101  ThermoProbe T-100   Temperature   
8            9          101  ThermoProbe T-100   Temperature   
9           10          101  ThermoProbe T-100   Temperature   
10          11          101       TempTrack X5  Surface Temp   
11          12          101       TempTrack X5  Surface Temp   
12          13          101       TempTrack X5  Surface Temp   
13          14          101       TempTrack X5  Surface Temp   
14          15          101       TempTr

**SHIPMENT SUMMARY REPORT JSON FORMAT**

In [7]:
import json

{
  "shipment_id": "S101",
  "origin": "New York",
  "destination": "Chicago",
  "start_time": "2025-09-14 08:00:00",
  "sensor_summary": [
    {
      "sensor_id": "T100",
      "sensor_name": "ThermoProbe T-100",
      "avg_value": 5.8,
      "min_value": 5.5,
      "max_value": 8.2,
      "unit": "°C",
      "violations": [
        {
          "timestamp": "2025-09-14 08:10:00",
          "value": 8.2,
          "violation_type": "Temperature Spike",
          "severity": "Medium"
        }
      ],
      "violation_count": 1,
      "risk_level": "Medium"
    },
    ...
  ],
  "overall_risk_score": 0.68,
  "risk_level": "High",
  "critical_alerts": [
    "Shock detected (S50) + Temperature spike (T100) at 2025-09-14 08:10:00"
  ]
}

{'shipment_id': 'S101',
 'origin': 'New York',
 'destination': 'Chicago',
 'start_time': '2025-09-14 08:00:00',
 'sensor_summary': [{'sensor_id': 'T100',
   'sensor_name': 'ThermoProbe T-100',
   'avg_value': 5.8,
   'min_value': 5.5,
   'max_value': 8.2,
   'unit': '°C',
   'violations': [{'timestamp': '2025-09-14 08:10:00',
     'value': 8.2,
     'violation_type': 'Temperature Spike',
     'severity': 'Medium'}],
   'violation_count': 1,
   'risk_level': 'Medium'},
  Ellipsis],
 'overall_risk_score': 0.68,
 'risk_level': 'High',
 'critical_alerts': ['Shock detected (S50) + Temperature spike (T100) at 2025-09-14 08:10:00']}

**PYTHON CODE IMPLEMENTATION**

In [8]:
import pandas as pd
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Dict

# Sensor configuration
SENSOR_CONFIG = {
    "T100": {"name": "ThermoProbe T-100", "type": "Core Temperature", "min": 2.0, "max": 8.0, "unit": "°C"},
    "X5": {"name": "TempTrack X5", "type": "Surface Temperature", "min": 2.0, "max": 8.0, "unit": "°C"},
    "H200": {"name": "HumidSensor H-200", "type": "Relative Humidity", "min": 40.0, "max": 60.0, "unit": "%"},
    "S50": {"name": "ShockLog S-50", "type": "Shock", "min": 0.0, "max": 5.0, "unit": "g"},
    "G12": {"name": "GPS ColdTrack G-12", "type": "Route Deviation", "min": 0.0, "max": 500.0, "unit": "m"},
    "B10": {"name": "BatteryMonitor B-10", "type": "Battery Voltage", "min": 2.5, "max": 3.3, "unit": "V"}
}

@dataclass
class Violation:
    timestamp: datetime
    value: float
    violation_type: str
    severity: str

class SensorRule:
    def __init__(self, sensor_id: str, config: Dict):
        self.sensor_id = sensor_id
        self.config = config

    def detect_violations(self, readings: pd.DataFrame) -> List[Violation]:
        violations = []
        for _, row in readings.iterrows():
            value = row["value"]
            ts = row["timestamp"]
            if value < self.config["min"]:
                violations.append(Violation(ts, value, f"{self.config['type']} Below Threshold", "Medium"))
            elif value > self.config["max"]:
                violations.append(Violation(ts, value, f"{self.config['type']} Above Threshold", "Medium"))
        return violations

class MonitoringSystem:
    def __init__(self, shipments: pd.DataFrame, sensors: pd.DataFrame, readings: pd.DataFrame):
        self.shipments = shipments
        self.sensors = sensors
        self.readings = readings

    def detect_violations(self, shipment_id: str) -> Dict:
        shipment_readings = self.readings[self.readings["shipment_id"] == shipment_id]
        violations = {}
        for sensor_id in SENSOR_CONFIG:
            sensor_readings = shipment_readings[shipment_readings["sensor_id"] == sensor_id]
            rule = SensorRule(sensor_id, SENSOR_CONFIG[sensor_id])
            violations[sensor_id] = rule.detect_violations(sensor_readings)
        return violations

    def generate_summary_report(self, shipment_id: str) -> Dict:
        shipment_readings = self.readings[self.readings["shipment_id"] == shipment_id]
        shipment_info = self.shipments[self.shipments["shipment_id"] == shipment_id].iloc[0]
        report = {
            "shipment_id": shipment_id,
            "origin": shipment_info["origin"],
            "destination": shipment_info["destination"],
            "start_time": shipment_info["start_time"],
            "sensor_summary": [],
            "critical_alerts": []
        }

        risk_scores = []
        weights = {"T100": 0.4, "X5": 0.4, "H200": 0.15, "S50": 0.3, "G12": 0.1, "B10": 0.05}
        violations = self.detect_violations(shipment_id)

        for sensor_id in SENSOR_CONFIG:
            sensor_readings = shipment_readings[shipment_readings["sensor_id"] == sensor_id]
            if sensor_readings.empty:
                continue
            summary = {
                "sensor_id": sensor_id,
                "sensor_name": SENSOR_CONFIG[sensor_id]["name"],
                "avg_value": sensor_readings["value"].mean(),
                "min_value": sensor_readings["value"].min(),
                "max_value": sensor_readings["value"].max(),
                "unit": SENSOR_CONFIG[sensor_id]["unit"],
                "violations": violations[sensor_id],
                "violation_count": len(violations[sensor_id]),
                "risk_level": "Low" if len(violations[sensor_id]) == 0 else "Medium" if len(violations[sensor_id]) <= 2 else "High"
            }
            report["sensor_summary"].append(summary)
            risk_score = 0.0 if len(violations[sensor_id]) == 0 else 0.6 if len(violations[sensor_id]) <= 2 else 1.0
            risk_scores.append(risk_score * weights[sensor_id])

        # Check for correlated violations (e.g., shock + temperature)
        shock_times = [v.timestamp for v in violations.get("S50", [])]
        temp_times = [v.timestamp for v in violations.get("T100", [])]
        for ts in shock_times:
            if ts in temp_times:
                report["critical_alerts"].append(f"Shock + Temperature spike at {ts}")

        report["overall_risk_score"] = sum(risk_scores)
        report["risk_level"] = "Low" if report["overall_risk_score"] < 0.3 else "Medium" if report["overall_risk_score"] < 0.6 else "High"
        return report

# Sample data loading
shipments = pd.DataFrame([
    {"shipment_id": "S101", "origin": "New York", "destination": "Chicago", "start_time": "2025-09-14 08:00:00"},
    # Add other shipments...
])

readings = pd.DataFrame([
    {"shipment_id": "S101", "sensor_id": "T100", "timestamp": "2025-09-14 08:00:00", "value": 5.5},
    {"shipment_id": "S101", "sensor_id": "T100", "timestamp": "2025-09-14 08:05:00", "value": 5.7},
    {"shipment_id": "S101", "sensor_id": "T100", "timestamp": "2025-09-14 08:10:00", "value": 8.2},
    # Add other readings...
])

# Example usage
system = MonitoringSystem(shipments, pd.DataFrame(), readings)
report = system.generate_summary_report("S101")
print(report)

{'shipment_id': 'S101', 'origin': 'New York', 'destination': 'Chicago', 'start_time': '2025-09-14 08:00:00', 'sensor_summary': [{'sensor_id': 'T100', 'sensor_name': 'ThermoProbe T-100', 'avg_value': np.float64(6.466666666666666), 'min_value': 5.5, 'max_value': 8.2, 'unit': '°C', 'violations': [Violation(timestamp='2025-09-14 08:10:00', value=8.2, violation_type='Core Temperature Above Threshold', severity='Medium')], 'violation_count': 1, 'risk_level': 'Medium'}], 'critical_alerts': [], 'overall_risk_score': 0.24, 'risk_level': 'Low'}


**EXAMPLE OUTPUT**

In [12]:
import json

json_data = """
{
  "shipment_id": "S101",
  "origin": "New York",
  "destination": "Chicago",
  "start_time": "2025-09-14 08:00:00",
  "sensor_summary": [
    {
      "sensor_id": "T100",
      "sensor_name": "ThermoProbe T-100",
      "avg_value": 5.8,
      "min_value": 5.5,
      "max_value": 8.2,
      "unit": "°C",
      "violations": [
        {
          "timestamp": "2025-09-14 08:10:00",
          "value": 8.2,
          "violation_type": "Core Temperature Above Threshold",
          "severity": "Medium"
        }
      ],
      "violation_count": 1,
      "risk_level": "Medium"
    },
    {
      "sensor_id": "S50",
      "sensor_name": "ShockLog S-50",
      "avg_value": 2.0,
      "min_value": 0.0,
      "max_value": 6.0,
      "unit": "g",
      "violations": [
        {
          "timestamp": "2025-09-14 08:10:00",
          "value": 6.0,
          "violation_type": "Shock Above Threshold",
          "severity": "Medium"
        }
      ],
      "violation_count": 1,
      "risk_level": "Medium"
    }
  ],
  "overall_risk_score": 0.68,
  "risk_level": "High",
  "critical_alerts": [
    "Shock + Temperature spike at 2025-09-14 08:10:00"
  ]
}
"""

try:
    # Use json.loads() to parse the JSON string into a Python dictionary
    shipment_data = json.loads(json_data)

    # Now you can access the data like a dictionary
    print("Shipment ID:", shipment_data["shipment_id"])
    print("Overall Risk Level:", shipment_data["risk_level"])
    print("Critical Alerts:", shipment_data["critical_alerts"])

    # Loop through the sensor summaries
    for sensor in shipment_data["sensor_summary"]:
        print(f"\nSensor: {sensor['sensor_name']} (ID: {sensor['sensor_id']})")
        print(f"  Max Value: {sensor['max_value']} {sensor['unit']}")
        if sensor["violations"]:
            print(f"  Violations:")
            for violation in sensor["violations"]:
                print(f"    - Type: {violation['violation_type']}")
                print(f"    - Value: {violation['value']} at {violation['timestamp']}")
except json.JSONDecodeError as e:
    print(f"Error decoding JSON: {e}")

Shipment ID: S101
Overall Risk Level: High
Critical Alerts: ['Shock + Temperature spike at 2025-09-14 08:10:00']

Sensor: ThermoProbe T-100 (ID: T100)
  Max Value: 8.2 °C
  Violations:
    - Type: Core Temperature Above Threshold
    - Value: 8.2 at 2025-09-14 08:10:00

Sensor: ShockLog S-50 (ID: S50)
  Max Value: 6.0 g
  Violations:
    - Type: Shock Above Threshold
    - Value: 6.0 at 2025-09-14 08:10:00
