# 03 - Perception Layer

## Agentic Logistics Control System

This notebook implements the perception/observation layer:
- GPS data collection from trucks
- Traffic condition monitoring
- Load manifest collection
- Data preprocessing and validation
- LangGraph observation node

### Control Loop Role: OBSERVATION

```
[OBSERVE] -> REASON -> PLAN -> DECIDE -> ACT -> FEEDBACK -> (loop)
    ^
    |
  YOU ARE HERE
```

The observation phase gathers all data needed for the reasoning layer.

In [None]:
# Setup
import sys
from pathlib import Path
import asyncio

PROJECT_ROOT = Path.cwd().parent
sys.path.insert(0, str(PROJECT_ROOT))
sys.path.insert(0, str(PROJECT_ROOT / "src"))

# For Jupyter async compatibility
import nest_asyncio
try:
    nest_asyncio.apply()
except:
    pass  # May not be needed in all environments

In [None]:
# Imports
from datetime import datetime
import uuid

from src.models import (
    Truck, TruckStatus, Location, GPSReading,
    TrafficCondition, TrafficLevel, Load, LoadPriority,
    Route, AgentState, ControlLoopPhase
)
from src.perception.collectors import (
    GPSCollector, TrafficCollector, LoadCollector, AggregatedCollector
)
from src.perception.preprocessor import DataPreprocessor
from src.perception.observation_node import create_observation_node, create_sample_fleet

print("Perception layer modules loaded!")

## 1. GPS Data Collection

In [None]:
# Create sample fleet
fleet = create_sample_fleet()

print(f"Fleet size: {len(fleet)} trucks")
for truck in fleet:
    print(f"  {truck.id}: {truck.name} - {truck.status.value}")

In [None]:
# Create GPS collector
gps_collector = GPSCollector(trucks=fleet, simulate=True)

# Collect GPS readings
async def collect_gps():
    return await gps_collector.collect()

gps_readings = asyncio.get_event_loop().run_until_complete(collect_gps())

print(f"\nCollected {len(gps_readings)} GPS readings:")
for reading in gps_readings:
    print(f"  {reading.truck_id}: ({reading.location.latitude:.4f}, {reading.location.longitude:.4f}) @ {reading.speed_kmh:.1f} km/h")

In [None]:
# Collect multiple times to see movement simulation
print("Simulating truck movement over 3 collection cycles:\n")

for cycle in range(3):
    readings = asyncio.get_event_loop().run_until_complete(collect_gps())
    print(f"Cycle {cycle + 1}:")
    for r in readings[:3]:  # Show first 3
        print(f"  {r.truck_id}: {r.speed_kmh:.1f} km/h, heading {r.heading:.0f}°")
    print()

## 2. Traffic Data Collection

In [None]:
# Create traffic collector
traffic_collector = TrafficCollector(simulate=True)

# Collect traffic conditions
async def collect_traffic():
    return await traffic_collector.collect()

traffic_conditions = asyncio.get_event_loop().run_until_complete(collect_traffic())

print(f"Collected {len(traffic_conditions)} traffic conditions:\n")
for tc in traffic_conditions:
    incident = f" - {tc.incident_description}" if tc.incident_description else ""
    print(f"  {tc.segment_id}: {tc.level.value} ({tc.speed_kmh:.0f} km/h){incident}")

In [None]:
# Check for traffic incidents
incidents = [tc for tc in traffic_conditions if tc.incident_description]
heavy_traffic = [tc for tc in traffic_conditions if tc.level in [TrafficLevel.HEAVY, TrafficLevel.STANDSTILL]]

print(f"Traffic Summary:")
print(f"  Incidents: {len(incidents)}")
print(f"  Heavy/Standstill: {len(heavy_traffic)}")

if incidents:
    print(f"\nIncident Details:")
    for tc in incidents:
        print(f"  {tc.segment_id}: {tc.incident_description}")
        print(f"    Delay: {tc.delay_minutes:.0f} min, Speed: {tc.speed_kmh:.0f} km/h")

## 3. Load Data Collection

In [None]:
# Create load collector
load_collector = LoadCollector(simulate=True)

# Collect loads
async def collect_loads():
    return await load_collector.collect()

loads = asyncio.get_event_loop().run_until_complete(collect_loads())

print(f"Collected {len(loads)} loads:\n")
for load in loads:
    hours_to_deadline = (load.delivery_deadline - datetime.utcnow()).total_seconds() / 3600 if load.delivery_deadline else 0
    print(f"  {load.id}: {load.description[:30]}...")
    print(f"    Priority: {load.priority.value}, Weight: {load.weight_kg:.0f} kg")
    print(f"    Deadline: {hours_to_deadline:.1f} hours")

## 4. Aggregated Collection

In [None]:
# Collect all data concurrently
aggregator = AggregatedCollector(
    gps_collector=GPSCollector(trucks=fleet, simulate=True),
    traffic_collector=TrafficCollector(simulate=True),
    load_collector=LoadCollector(simulate=True)
)

async def collect_all():
    return await aggregator.collect_all()

all_data = asyncio.get_event_loop().run_until_complete(collect_all())

print("Aggregated Collection Results:")
print(f"  Timestamp: {all_data['timestamp']}")
print(f"  GPS Readings: {all_data['collection_stats']['gps_count']}")
print(f"  Traffic Segments: {all_data['collection_stats']['traffic_segments']}")
print(f"  Active Loads: {all_data['collection_stats']['active_loads']}")

## 5. Data Preprocessing

In [None]:
# Create preprocessor
preprocessor = DataPreprocessor()

# Preprocess GPS readings
validated_gps, updated_trucks = preprocessor.preprocess_gps_readings(
    all_data['gps_readings'],
    fleet
)

print(f"GPS Preprocessing:")
print(f"  Input: {len(all_data['gps_readings'])} readings")
print(f"  Output: {len(validated_gps)} validated readings")
print(f"  Validation errors: {len(preprocessor.validation_errors)}")

In [None]:
# Preprocess traffic conditions
routes = []  # Empty routes for now
validated_traffic = preprocessor.preprocess_traffic_conditions(
    all_data['traffic_conditions'],
    routes
)

print(f"Traffic Preprocessing:")
print(f"  Input: {len(all_data['traffic_conditions'])} conditions")
print(f"  Output: {len(validated_traffic)} validated conditions")

In [None]:
# Preprocess loads
validated_loads = preprocessor.preprocess_loads(all_data['loads'])

print(f"Load Preprocessing:")
print(f"  Input: {len(all_data['loads'])} loads")
print(f"  Output: {len(validated_loads)} validated loads")

In [None]:
# Check preprocessing summary
summary = preprocessor.get_preprocessing_summary()

print("Preprocessing Summary:")
print(f"  Timestamp: {summary['timestamp']}")
print(f"  Validation Errors: {summary['validation_errors']}")
print(f"  Anomalies Detected: {summary['anomalies_detected']}")

if summary['recent_anomalies']:
    print("\nRecent Anomalies:")
    for anomaly in summary['recent_anomalies']:
        print(f"  [{anomaly['type']}] {anomaly.get('details', 'No details')}")

## 6. LangGraph Observation Node

In [None]:
# Create the observation node
observation_node = create_observation_node()

# Create initial state
initial_state: AgentState = {
    "current_phase": ControlLoopPhase.OBSERVE,
    "cycle_id": str(uuid.uuid4()),
    "trucks": [t.model_dump() for t in fleet],
    "routes": [],
    "loads": [],
    "traffic_conditions": [],
    "gps_readings": [],
    "observation_timestamp": "",
    "reasoning_result": None,
    "current_issues": [],
    "planning_result": None,
    "scenarios": [],
    "decision_result": None,
    "selected_decision": None,
    "action_results": [],
    "notifications_sent": [],
    "feedback_result": None,
    "continue_loop": True,
    "requires_human_intervention": False,
    "error_message": None,
    "cycle_start_time": datetime.utcnow().isoformat(),
    "cycle_end_time": None,
    "total_cycles": 0,
}

print(f"Initial state created for cycle: {initial_state['cycle_id']}")

In [None]:
# Run observation node
observed_state = observation_node(initial_state)

print("Observation Node Results:")
print(f"  Phase: {observed_state['current_phase']}")
print(f"  Timestamp: {observed_state['observation_timestamp']}")
print(f"  Trucks: {len(observed_state['trucks'])}")
print(f"  GPS Readings: {len(observed_state['gps_readings'])}")
print(f"  Traffic Conditions: {len(observed_state['traffic_conditions'])}")
print(f"  Loads: {len(observed_state['loads'])}")
print(f"  Continue Loop: {observed_state['continue_loop']}")
print(f"  Error: {observed_state['error_message']}")

## 7. Workflow Scenarios

### Scenario A: Normal Observation Cycle

In [None]:
# Normal observation cycle
print("=" * 50)
print("SCENARIO A: Normal Observation Cycle")
print("=" * 50)

state_a = observation_node(initial_state)

# Check for issues
stuck_trucks = [t for t in state_a['trucks'] if t['status'] == 'stuck']
heavy_traffic = [tc for tc in state_a['traffic_conditions'] 
                 if tc['level'] in ['heavy', 'standstill']]
urgent_loads = [l for l in state_a['loads'] 
                if l['priority'] in ['urgent', 'critical']]

print(f"\nFleet Status:")
print(f"  Active trucks: {len(state_a['trucks'])}")
print(f"  Stuck trucks: {len(stuck_trucks)}")

print(f"\nTraffic Status:")
print(f"  Segments monitored: {len(state_a['traffic_conditions'])}")
print(f"  Heavy/Standstill: {len(heavy_traffic)}")

print(f"\nLoad Status:")
print(f"  Active loads: {len(state_a['loads'])}")
print(f"  Urgent/Critical: {len(urgent_loads)}")

print(f"\nReady for reasoning phase: {state_a['continue_loop']}")

### Scenario B: Missing Data Handling

In [None]:
# Missing data scenario - empty fleet
print("=" * 50)
print("SCENARIO B: Missing Data Handling")
print("=" * 50)

empty_state: AgentState = {
    **initial_state,
    "cycle_id": str(uuid.uuid4()),
    "trucks": [],  # No trucks
}

state_b = observation_node(empty_state)

print(f"\nResults with no trucks:")
print(f"  GPS Readings: {len(state_b['gps_readings'])}")
print(f"  Traffic still collected: {len(state_b['traffic_conditions'])}")
print(f"  Loads still collected: {len(state_b['loads'])}")
print(f"  Continue Loop: {state_b['continue_loop']}")

### Scenario C: High-Frequency Batch Processing

In [None]:
# High-frequency collection simulation
print("=" * 50)
print("SCENARIO C: High-Frequency Batch Processing")
print("=" * 50)

import time

num_cycles = 5
cycle_times = []
current_state = initial_state.copy()

for i in range(num_cycles):
    start = time.time()
    
    current_state = observation_node({
        **current_state,
        "cycle_id": str(uuid.uuid4()),
        "total_cycles": i + 1,
    })
    
    elapsed = time.time() - start
    cycle_times.append(elapsed)
    
    print(f"Cycle {i+1}: {elapsed*1000:.1f}ms, {len(current_state['gps_readings'])} GPS readings")

avg_time = sum(cycle_times) / len(cycle_times)
print(f"\nAverage cycle time: {avg_time*1000:.1f}ms")
print(f"Max throughput: ~{1/avg_time:.0f} cycles/second")

## 8. Integration Test

In [None]:
# Full integration test
print("=" * 50)
print("PERCEPTION LAYER INTEGRATION TEST")
print("=" * 50)

tests_passed = 0
tests_total = 0

# Test 1: Observation node creation
tests_total += 1
try:
    test_node = create_observation_node()
    assert callable(test_node)
    print("✓ Test 1: Observation node creation")
    tests_passed += 1
except Exception as e:
    print(f"✗ Test 1: {e}")

# Test 2: GPS collection
tests_total += 1
try:
    collector = GPSCollector(trucks=fleet, simulate=True)
    readings = asyncio.get_event_loop().run_until_complete(collector.collect())
    assert len(readings) == len(fleet)
    print("✓ Test 2: GPS collection")
    tests_passed += 1
except Exception as e:
    print(f"✗ Test 2: {e}")

# Test 3: Traffic collection
tests_total += 1
try:
    collector = TrafficCollector(simulate=True)
    conditions = asyncio.get_event_loop().run_until_complete(collector.collect())
    assert len(conditions) > 0
    print("✓ Test 3: Traffic collection")
    tests_passed += 1
except Exception as e:
    print(f"✗ Test 3: {e}")

# Test 4: Load collection
tests_total += 1
try:
    collector = LoadCollector(simulate=True)
    loads = asyncio.get_event_loop().run_until_complete(collector.collect())
    assert len(loads) > 0
    print("✓ Test 4: Load collection")
    tests_passed += 1
except Exception as e:
    print(f"✗ Test 4: {e}")

# Test 5: Preprocessor
tests_total += 1
try:
    preprocessor = DataPreprocessor()
    summary = preprocessor.get_preprocessing_summary()
    assert 'timestamp' in summary
    print("✓ Test 5: Preprocessor")
    tests_passed += 1
except Exception as e:
    print(f"✗ Test 5: {e}")

# Test 6: Full observation cycle
tests_total += 1
try:
    result = observation_node(initial_state)
    assert result['observation_timestamp'] != ""
    assert result['continue_loop'] == True
    print("✓ Test 6: Full observation cycle")
    tests_passed += 1
except Exception as e:
    print(f"✗ Test 6: {e}")

print("=" * 50)
print(f"Tests passed: {tests_passed}/{tests_total}")
if tests_passed == tests_total:
    print("✓ All tests passed! Perception layer ready.")
else:
    print("✗ Some tests failed. Please review.")

## Next Steps

1. The observation phase is complete
2. Proceed to `04_grok_integration.ipynb` for Grok LLM setup
3. Then `05_reasoning_prompts.ipynb` and `06_reasoning_layer.ipynb` for the REASON phase

### State Transition

The observed state is now ready to be passed to the reasoning layer:
```python
# After observation
state = observation_node(initial_state)

# Pass to reasoning (next phase)
state = reasoning_node(state)  # Implemented in Phase 2
```