In [10]:
import numpy as np
import pandas as pd
from datetime import datetime
from enum import Enum
from typing import List, Tuple
import json

In [11]:
# ============================================================================
# 1. FAILURE PROGRESSION MODEL
# ============================================================================
class FailurePhase(Enum):
    NORMAL = 0
    DEGRADATION = 1
    PRE_FAILURE = 2
    FAILURE_RISK = 3

def determine_phase(elapsed_hours: float, failure_start: float, duration: float) -> FailurePhase:
    if elapsed_hours < failure_start:
        return FailurePhase.NORMAL
    elif elapsed_hours < failure_start + duration * 0.3:
        return FailurePhase.DEGRADATION
    elif elapsed_hours < failure_start + duration * 0.7:
        return FailurePhase.PRE_FAILURE
    else:
        return FailurePhase.FAILURE_RISK

In [12]:
# ============================================================================
# 2. SENSOR GENERATION ENGINE (WITH REALISM UPGRADES)
# ============================================================================
def generate_vibration(elapsed_hours: np.ndarray, phase: FailurePhase, base_rms: float = 3.0) -> np.ndarray:
    noise_std = 0.2 + phase.value * 0.3 + 0.15 * np.abs(np.sin(2 * np.pi * elapsed_hours / 36.0))
    noise = np.random.normal(0, noise_std, len(elapsed_hours))

    if phase == FailurePhase.NORMAL:
        signal = base_rms
    elif phase == FailurePhase.DEGRADATION:
        trend = 0.02 * (elapsed_hours - elapsed_hours[0])
        signal = base_rms + trend
    elif phase == FailurePhase.PRE_FAILURE:
        spike_time = elapsed_hours[0] + 5
        spike = 2.5 * np.exp(-0.15 * np.abs(elapsed_hours - spike_time))
        harmonics = 0.7 * np.sin(2 * np.pi * elapsed_hours / 0.35) + 0.4 * np.sin(2 * np.pi * elapsed_hours / 0.12)
        signal = base_rms + 1.8 + spike + harmonics
    else:  # FAILURE_RISK
        chaos = 1.5 * np.sin(2 * np.pi * elapsed_hours / 1.2)
        harmonics = 1.2 * np.sin(2 * np.pi * elapsed_hours / 0.28) + 0.9 * np.sin(2 * np.pi * elapsed_hours / 0.09)
        signal = base_rms + 4.0 + chaos + harmonics

    return np.clip(signal + noise, 1.5, 22.0)

def generate_temperature(elapsed_hours: np.ndarray, vibration: np.ndarray, phase: FailurePhase, base_temp: float = 45.0) -> np.ndarray:
    noise_std = 0.5 + phase.value * 0.4 + 0.1 * np.abs(np.sin(2 * np.pi * elapsed_hours / 24.0))
    noise = np.random.normal(0, noise_std, len(elapsed_hours))

    if phase == FailurePhase.NORMAL:
        return np.clip(base_temp + noise, 38.0, 82.0)

    thermal_kernel = np.exp(-np.arange(15) / 4.0)
    thermal_response = np.convolve(vibration - vibration.min() + 0.1, thermal_kernel, mode='same')[:len(elapsed_hours)]

    if thermal_response.max() > thermal_response.min():
        thermal_response = (thermal_response - thermal_response.min()) / (thermal_response.max() - thermal_response.min()) * 12.0
    else:
        thermal_response = np.zeros_like(thermal_response)

    return np.clip(base_temp + thermal_response + noise, 38.0, 82.0)

def generate_load(elapsed_hours: np.ndarray, vibration: np.ndarray, phase: FailurePhase, base_load: float = 75.0) -> np.ndarray:
    noise_std = 1.5 + phase.value * 1.0 + 0.8 * np.abs(np.sin(2 * np.pi * elapsed_hours / 12.0))
    noise = np.random.normal(0, noise_std, len(elapsed_hours))
    cyclic = 6.0 * np.sin(2 * np.pi * elapsed_hours / 24.0)

    if phase.value < 2:
        signal = base_load + cyclic
    else:
        instability = 9.0 * np.sin(2 * np.pi * elapsed_hours / 5.0) * (phase.value / 3.0)
        signal = base_load + cyclic + instability

    return np.clip(signal + noise, 40.0, 110.0)

In [13]:
# ============================================================================
# 3. ASSET SIMULATION ORCHESTRATOR
# ============================================================================
class IndustrialSensorSimulator:
    def __init__(self, start_time: str = "2026-01-01 00:00:00", sampling_min: int = 1):
        self.start_time = pd.to_datetime(start_time)
        self.sampling_min = sampling_min

    def simulate_asset(self, asset_id: str, failure_start_hour: int, total_hours: int = 168) -> pd.DataFrame:
        n_samples = int(total_hours * 60 / self.sampling_min)

        # Sampling jitter (±8 seconds)
        base_timestamps = pd.date_range(self.start_time, periods=n_samples, freq=f'{self.sampling_min}min')
        jitter_sec = np.random.randint(-8, 9, size=n_samples)
        timestamps = base_timestamps + pd.to_timedelta(jitter_sec, unit='s')

        elapsed_hours = np.arange(n_samples) * self.sampling_min / 60.0

        vibration = np.zeros(n_samples)
        temperature = np.zeros(n_samples)
        load = np.zeros(n_samples)
        phases = np.zeros(n_samples, dtype=int)

        for i, hour in enumerate(elapsed_hours):
            phase = determine_phase(hour, failure_start_hour, duration=48)
            phases[i] = phase.value
            vib = generate_vibration(np.array([hour]), phase)[0]
            vibration[i] = vib
            temp = generate_temperature(np.array([hour]), np.array([vib]), phase)[0]
            temperature[i] = temp
            ld = generate_load(np.array([hour]), np.array([vib]), phase)[0]
            load[i] = ld

        df = pd.DataFrame({
            'timestamp': timestamps,
            'asset_id': asset_id,
            'vibration_rms': vibration,
            'temperature_c': temperature,
            'load_percent': load,
            'failure_phase': phases,
            'failure_start_hour': failure_start_hour
        })

        # Packet loss (3%) + forward-fill
        packet_loss_mask = np.random.random(n_samples) < 0.03
        df.loc[packet_loss_mask, ['vibration_rms', 'temperature_c', 'load_percent']] = np.nan
        df[['vibration_rms', 'temperature_c', 'load_percent']] = df[['vibration_rms', 'temperature_c', 'load_percent']].ffill().bfill()

        return df

    def simulate_plant(self, assets_config: List[Tuple[str, int]]) -> pd.DataFrame:
        all_assets = []
        for asset_id, failure_hour in assets_config:
            df = self.simulate_asset(asset_id, failure_hour)
            all_assets.append(df)
            print(f"✓ Generated {len(df):,} samples for {asset_id} (failure starts at hour {failure_hour})")
        return pd.concat(all_assets, ignore_index=True)

In [15]:
# ============================================================================
# 4. JSON PACKET EXPORTER (EXACT FORMAT REQUIRED)
# ============================================================================
def export_clean_packets(df: pd.DataFrame, output_path: str = "sensor_packets.jsonl"):
    """
    Export sensor packets in EXACT required format:
    {"timestamp":"2026-02-01T22:15:00","vibration":1.42,"temperature":68.2,"load":0.73}

    Notes:
    - timestamp: ISO 8601 with 'T' separator
    - vibration: raw mm/s RMS value
    - temperature: °C
    - load: normalized 0.0-1.0 (not percent)
    """
    packets = []
    for _, row in df.iterrows():
        packet = {
            "timestamp": row['timestamp'].strftime("%Y-%m-%dT%H:%M:%S"),  # ISO 8601 with 'T'
            "vibration": round(float(row['vibration_rms']), 2),
            "temperature": round(float(row['temperature_c']), 1),
            "load": round(float(np.clip((row['load_percent'] - 40) / 70, 0.0, 1.0)), 2)  # Normalize 40-110% → 0.0-1.0
        }
        packets.append(packet)

    # Write as JSONL (one packet per line - standard for IoT streams)
    with open(output_path, 'w') as f:
        for packet in packets:
            f.write(json.dumps(packet) + '\n')

    print(f"✅ Clean sensor packets exported: {output_path} ({len(packets):,} packets)")
    print(f"   Example packet: {json.dumps(packets[0])}")

In [16]:
# ============================================================================
# 5. EXECUTION & VALIDATION
# ============================================================================
def main():
    ASSETS = [
        ("pump-001", 48),
        ("conveyor-042", 96),
        ("compressor-118", 24)
    ]

    simulator = IndustrialSensorSimulator()
    plant_data = simulator.simulate_plant(ASSETS)

    # Export FULL dataset with ground truth (for XAI layer validation)
    plant_data.to_csv("simulated_assets.csv", index=False)
    print(f"\n✅ Full dataset exported: simulated_assets.csv ({len(plant_data):,} rows)")

    # Export CLEAN JSON packets (for IoT stream simulation)
    export_clean_packets(plant_data, "sensor_packets.jsonl")

    # Also export pretty JSON array for easy loading (optional)
    packets = []
    for _, row in plant_data.iterrows():
        packets.append({
            "timestamp": row['timestamp'].strftime("%Y-%m-%dT%H:%M:%S"),
            "vibration": round(float(row['vibration_rms']), 2),
            "temperature": round(float(row['temperature_c']), 1),
            "load": round(float(np.clip((row['load_percent'] - 40) / 70, 0.0, 1.0)), 2)
        })
    with open("sensor_packets.json", 'w') as f:
        json.dump(packets, f, indent=2)
    print(f"✅ Pretty JSON array exported: sensor_packets.json")

    # VALIDATION
    assert len(plant_data) > 0
    assert plant_data['vibration_rms'].between(1.5, 22.0).all()
    assert plant_data['temperature_c'].between(38.0, 82.0).all()
    assert plant_data['load_percent'].between(40.0, 110.0).all()
    assert not plant_data.isnull().any().any()
    print("\n✅ ALL VALIDATIONS PASSED — READY FOR TEAM HANDOFF")

if __name__ == "__main__":
    main()

✓ Generated 10,080 samples for pump-001 (failure starts at hour 48)
✓ Generated 10,080 samples for conveyor-042 (failure starts at hour 96)
✓ Generated 10,080 samples for compressor-118 (failure starts at hour 24)

✅ Full dataset exported: simulated_assets.csv (30,240 rows)
✅ Clean sensor packets exported: sensor_packets.jsonl (30,240 packets)
   Example packet: {"timestamp": "2026-01-01T00:00:07", "vibration": 3.06, "temperature": 44.6, "load": 0.48}
✅ Pretty JSON array exported: sensor_packets.json

✅ ALL VALIDATIONS PASSED — READY FOR TEAM HANDOFF
