# Azure Digital Twins Integration

This notebook demonstrates how to:
1. Connect to Azure Digital Twins
2. Query twins and relationships
3. Update twins with ML predictions
4. Build an end-to-end ML pipeline with ADT

## 1. Setup & Authentication

In [None]:
import sys
sys.path.insert(0, '/app/src')

import os
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# ML imports
from sklearn.ensemble import RandomForestRegressor, IsolationForest
from sklearn.preprocessing import StandardScaler

# Our custom connectors
from data.adt_connector import AzureDigitalTwinsConnector, TwinState
from data.mongodb_connector import MongoDBConnector
from config import config

print("Imports loaded successfully!")
print(f"ADT Endpoint configured: {config.azure_dt.is_configured}")

In [None]:
# Check configuration
if not config.azure_dt.is_configured:
    print("Azure Digital Twins is not configured!")
    print("Set AZURE_DT_ENDPOINT environment variable to your ADT instance URL.")
    print("Example: https://your-instance.api.westeurope.digitaltwins.azure.net")
    print("\nFor authentication, you can use:")
    print("  1. Azure CLI: az login")
    print("  2. Managed Identity (in Azure)")
    print("  3. Service Principal: Set AZURE_TENANT_ID, AZURE_CLIENT_ID, AZURE_CLIENT_SECRET")
else:
    print(f"ADT Endpoint: {config.azure_dt.endpoint}")

## 2. Connect to Azure Digital Twins

In [None]:
# Initialize the ADT connector
# This will use DefaultAzureCredential (Azure CLI, Managed Identity, etc.)
adt = AzureDigitalTwinsConnector()

try:
    adt.connect()
    print("Successfully connected to Azure Digital Twins!")
except Exception as e:
    print(f"Connection failed: {e}")
    print("\nTroubleshooting:")
    print("  1. Check AZURE_DT_ENDPOINT is correct")
    print("  2. Run 'az login' if using Azure CLI credentials")
    print("  3. Ensure you have 'Azure Digital Twins Data Reader' role")

## 3. Query Twins

In [None]:
# Get all coordinators
coordinators = adt.get_all_coordinators()
print(f"Found {len(coordinators)} coordinators:")

for coord in coordinators:
    print(f"  - {coord.twin_id}")
    print(f"    Status: {coord.properties.get('status', 'unknown')}")
    print(f"    Towers online: {coord.properties.get('towers_online', 0)}")

In [None]:
# Get all towers
towers = adt.get_all_towers()
print(f"Found {len(towers)} towers:")

for tower in towers:
    print(f"  - {tower.twin_id}")
    print(f"    Crop: {tower.properties.get('crop_type', 'unknown')}")
    print(f"    Status: {tower.properties.get('status', 'unknown')}")
    
    reported = tower.properties.get('reported_state', {})
    if reported:
        print(f"    Temp: {reported.get('air_temp_c', 'N/A')}°C")
        print(f"    Humidity: {reported.get('humidity_pct', 'N/A')}%")

In [None]:
# Get twins as DataFrame for analysis
twins_df = adt.get_all_twins_as_dataframe()

if not twins_df.empty:
    print(f"Retrieved {len(twins_df)} twins")
    display(twins_df.head())
else:
    print("No twins found in ADT instance.")

In [None]:
# Custom query - find towers with specific conditions
query = """
SELECT tower 
FROM digitaltwins tower
WHERE IS_OF_MODEL('dtmi:iot:hydroponics:Tower;1')
AND tower.status = 'operational'
"""

operational_towers = adt.query_twins(query)
print(f"Found {len(operational_towers)} operational towers")

## 4. Query Relationships (Twin Graph)

In [None]:
# Get towers connected to a specific coordinator
if coordinators:
    coord_id = coordinators[0].twin_id.replace('coordinator-', '')
    coord_towers = adt.get_towers_by_coordinator(coord_id)
    
    print(f"Towers connected to {coordinators[0].twin_id}:")
    for tower in coord_towers:
        print(f"  - {tower.twin_id}")

In [None]:
# Get relationships from a coordinator
if coordinators:
    relationships = adt.get_relationships(coordinators[0].twin_id)
    
    print(f"Relationships from {coordinators[0].twin_id}:")
    for rel in relationships:
        print(f"  -{rel.relationship_name}-> {rel.target_id}")

## 5. ML Pipeline with ADT

Example: Train a growth prediction model using MongoDB historical data, then push predictions to ADT.

In [None]:
# Step 1: Get historical data from MongoDB
mongo = MongoDBConnector()

# Get height measurements for training
height_df = mongo.get_height_measurements()

if height_df.empty:
    print("No height data available. Creating synthetic data for demo...")
    # Create synthetic data for demonstration
    np.random.seed(42)
    n_samples = 100
    height_df = pd.DataFrame({
        'tower_id': [f'tower-{i % 5}' for i in range(n_samples)],
        'days_since_planting': np.random.randint(1, 60, n_samples),
        'height_cm': None,  # Will be calculated
        'crop_type': np.random.choice(['Lettuce', 'Basil', 'Spinach'], n_samples),
    })
    # Simulate growth: height = base + growth_rate * days + noise
    height_df['height_cm'] = 2 + 0.5 * height_df['days_since_planting'] + np.random.randn(n_samples) * 2
    height_df['height_cm'] = height_df['height_cm'].clip(lower=0)

print(f"Training data: {len(height_df)} samples")
display(height_df.head())

In [None]:
# Step 2: Train a growth prediction model
from sklearn.model_selection import train_test_split

# Prepare features
X = height_df[['days_since_planting']].values
y = height_df['height_cm'].values

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train model
model = RandomForestRegressor(n_estimators=50, random_state=42)
model.fit(X_train, y_train)

# Evaluate
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)

print(f"Model trained!")
print(f"  Train R²: {train_score:.3f}")
print(f"  Test R²: {test_score:.3f}")

In [None]:
# Step 3: Generate predictions for each tower in ADT
def predict_for_tower(tower: TwinState) -> dict:
    """Generate ML predictions for a tower."""
    # Get current state
    growth = tower.properties.get('growth_tracking', {})
    days_planted = growth.get('days_since_planting', 0)
    current_height = growth.get('last_height_cm', 0)
    
    # Predict future height (7 days from now)
    future_days = days_planted + 7
    predicted_height = model.predict([[future_days]])[0]
    
    # Estimate growth rate
    if days_planted > 0:
        growth_rate = current_height / days_planted
    else:
        growth_rate = 0.5  # Default estimate
    
    # Estimate harvest date (assume harvest at 30cm for lettuce)
    target_height = 30
    if growth_rate > 0 and current_height < target_height:
        days_to_harvest = int((target_height - current_height) / growth_rate)
    else:
        days_to_harvest = 0
    
    return {
        'predicted_height_cm': round(predicted_height, 1),
        'growth_rate_cm_per_day': round(growth_rate, 2),
        'days_to_harvest': max(0, days_to_harvest),
        'health_score': 0.85,  # Placeholder - would use anomaly detection
    }

# Generate predictions for all towers
predictions = {}
for tower in towers:
    preds = predict_for_tower(tower)
    predictions[tower.twin_id] = preds
    print(f"{tower.twin_id}: height={preds['predicted_height_cm']}cm, harvest in {preds['days_to_harvest']} days")

In [None]:
# Step 4: Push predictions to Azure Digital Twins
if predictions:
    results = adt.batch_update_predictions(
        predictions=predictions,
        model_name="growth_predictor",
        model_version="1.0.0"
    )
    
    success_count = sum(results.values())
    print(f"\nUpdated {success_count}/{len(predictions)} twins with ML predictions")
else:
    print("No towers to update.")

## 6. Anomaly Detection & Alerts

In [None]:
# Train an anomaly detection model on telemetry data
tower_telemetry = mongo.get_tower_telemetry(hours=168)  # Last week

if tower_telemetry.empty:
    print("No telemetry data. Creating synthetic data for demo...")
    np.random.seed(42)
    n = 200
    tower_telemetry = pd.DataFrame({
        'air_temp_c': np.random.normal(24, 2, n),
        'humidity_pct': np.random.normal(65, 5, n),
    })
    # Add some anomalies
    tower_telemetry.loc[190:195, 'air_temp_c'] = 40  # Temperature spike
    tower_telemetry.loc[196:199, 'humidity_pct'] = 10  # Humidity drop

# Train Isolation Forest
features = ['air_temp_c', 'humidity_pct']
available_features = [f for f in features if f in tower_telemetry.columns]

if available_features:
    X = tower_telemetry[available_features].dropna()
    
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    iso_forest = IsolationForest(contamination=0.05, random_state=42)
    iso_forest.fit(X_scaled)
    
    print(f"Anomaly detection model trained on {len(X)} samples")

In [None]:
# Check each tower's current state for anomalies
def check_tower_anomaly(tower: TwinState) -> float:
    """Calculate anomaly score for a tower's current state."""
    reported = tower.properties.get('reported_state', {})
    
    if not reported:
        return 0.0
    
    # Extract features
    temp = reported.get('air_temp_c')
    humidity = reported.get('humidity_pct')
    
    if temp is None or humidity is None:
        return 0.0
    
    # Scale and predict
    X_current = scaler.transform([[temp, humidity]])
    score = iso_forest.decision_function(X_current)[0]
    
    # Convert to 0-1 scale (higher = more anomalous)
    anomaly_score = max(0, -score)  # Negative scores indicate anomalies
    return min(1.0, anomaly_score)

# Check all towers
for tower in towers:
    score = check_tower_anomaly(tower)
    status = "ALERT" if score > 0.5 else "OK"
    print(f"{tower.twin_id}: anomaly_score={score:.2f} [{status}]")
    
    # Update ADT with anomaly score
    if score > 0:
        adt.update_ml_predictions(
            tower.twin_id,
            {'anomaly_score': round(score, 3)},
            model_name="anomaly_detector",
            model_version="1.0.0"
        )

## 7. Query ADT for ML Insights

In [None]:
# Find towers that need attention (high anomaly scores)
towers_needing_attention = adt.get_towers_needing_attention(anomaly_threshold=0.3)

print(f"Towers needing attention: {len(towers_needing_attention)}")
for tower in towers_needing_attention:
    ml_preds = tower.properties.get('ml_predictions', {})
    print(f"  - {tower.twin_id}")
    print(f"    Anomaly score: {ml_preds.get('anomaly_score', 'N/A')}")
    print(f"    Generated at: {ml_preds.get('generated_at', 'N/A')}")

In [None]:
# Custom query: Find towers ready for harvest
harvest_query = """
SELECT tower
FROM digitaltwins tower
WHERE IS_OF_MODEL('dtmi:iot:hydroponics:Tower;1')
AND tower.ml_predictions.days_to_harvest <= 7
"""

try:
    ready_for_harvest = adt.query_twins(harvest_query)
    print(f"Towers ready for harvest within 7 days: {len(ready_for_harvest)}")
    for tower in ready_for_harvest:
        ml_preds = tower.properties.get('ml_predictions', {})
        print(f"  - {tower.twin_id}: {ml_preds.get('days_to_harvest', 'N/A')} days")
except Exception as e:
    print(f"Query failed (may not have ml_predictions yet): {e}")

## 8. Upload DTDL Models (One-time Setup)

Run this section once to upload the DTDL models to your ADT instance.

In [None]:
import json
from pathlib import Path

def upload_dtdl_models(adt_connector: AzureDigitalTwinsConnector, models_dir: str):
    """Upload DTDL models to Azure Digital Twins."""
    models_path = Path(models_dir)
    
    # Order matters - upload dependencies first
    model_order = ['Farm.json', 'Reservoir.json', 'Tower.json', 'Coordinator.json']
    
    models = []
    for model_file in model_order:
        file_path = models_path / model_file
        if file_path.exists():
            with open(file_path) as f:
                models.append(json.load(f))
            print(f"Loaded: {model_file}")
    
    if models:
        try:
            adt_connector.client.create_models(models)
            print(f"\nSuccessfully uploaded {len(models)} models to ADT!")
        except Exception as e:
            if "already exists" in str(e).lower():
                print("Models already exist in ADT (this is OK).")
            else:
                print(f"Error uploading models: {e}")

# Uncomment to upload models
# upload_dtdl_models(adt, '/app/models/dtdl')

## 9. Cleanup

In [None]:
mongo.close()
print("Connections closed.")

## Next Steps

1. **Set up ADT instance** in Azure Portal
2. **Upload DTDL models** using the cell above
3. **Create twins** for your coordinators, towers, and reservoirs
4. **Set up IoT Hub** to route telemetry to ADT
5. **Schedule ML jobs** to periodically update predictions

### Architecture with ADT:

```
┌─────────────┐    ┌─────────────┐    ┌─────────────────────┐
│  ESP32      │───>│  IoT Hub    │───>│  Azure Digital Twins│
│  Devices    │    │  (Telemetry)│    │  (Twin State)       │
└─────────────┘    └─────────────┘    └──────────┬──────────┘
                                                 │
                   ┌─────────────┐               │
                   │  MongoDB    │<──────────────┤ (Historical)
                   │  (History)  │               │
                   └──────┬──────┘               │
                          │                      │
                   ┌──────▼──────┐               │
                   │  ML Service │───────────────┘ (Predictions)
                   │  (This!)    │
                   └─────────────┘
```