# 🧠 Smart Home Digital Twin + FMU Integration Notebook

This notebook shows how to: (1) load the rebuilt linux64 FMU for the smart home AC room, (2) pipe its outputs into a Python digital twin model, and (3) expose predictions through a FastAPI + PostgreSQL deployment architecture.

## 📦 Prerequisites
- Rebuilt linux64 FMU available locally (e.g., `ACRoom_linux.fmu`).
- Python 3.10+ environment with the packages used below (`fmpy`, `pandas`, `numpy`, `sqlalchemy`, `fastapi`, `uvicorn[standard]`, `psycopg2-binary` or `asyncpg`).
- PostgreSQL instance (local Docker or managed service).
- Optional for production: Docker & Kubernetes to containerize the FastAPI service.

## 🏗️ Target Architecture Overview
1. **Simulation**: Execute the FMU with `fmpy` to generate thermal state trajectories (temperature, humidity, maintenance signals).
2. **Digital Twin Core**: Enrich FMU outputs with contextual features and run ML/heuristic inference to estimate health scores or maintenance risk.
3. **API Layer**: FastAPI service exposes REST endpoints for running simulations on demand, retrieving results, and persisting summaries to PostgreSQL.
4. **Persistence**: PostgreSQL stores simulation runs, aggregated KPIs, and maintenance alerts for downstream analytics dashboards.

```mermaid
flowchart LR
    Sensors[[FMU Inputs]] -->|load & simulate| FMU[ACRoom linux64 FMU];
    FMU -->|time-series state| TwinCore[SmartHomeDigitalTwin];
    TwinCore -->|predictions + KPIs| API[FastAPI Service];
    API -->|persist| DB[(PostgreSQL)];
    Client[/Dashboard, Scheduler/] --> API;
```

In [None]:
# !pip install fmpy fastapi uvicorn[standard] sqlalchemy psycopg2-binary pandas numpy

In [None]:
from pathlib import Path
from datetime import datetime
from typing import List, Optional

import numpy as np
import pandas as pd
from fmpy import simulate_fmu

from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
import sqlalchemy as sa
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker, declarative_base

print('✅ Imports ready')

## 🔁 Step 1: Run the FMU and collect state trajectories

In [None]:
FMU_PATH = Path('ACRoom_linux.fmu')  # update if the FMU sits elsewhere
if not FMU_PATH.exists():
    raise FileNotFoundError(f'FMU not found at {FMU_PATH}. Rebuild or adjust the path.')

simulation_outputs = ['time', 'T_room', 'humidity_out', 'needsService_out']

fmu_result = simulate_fmu(
    filename=str(FMU_PATH),
    start_time=0.0,
    stop_time=3600.0,
    output=simulation_outputs
)

df_fmu = pd.DataFrame(fmu_result)
df_fmu.head()

## 🧠 Step 2: Digital Twin Core
Define a simple rule-based + ML-ready twin that fuses FMU signals with contextual metadata (occupancy, energy use) and emits a maintenance risk score.

In [None]:
class SmartHomeDigitalTwin:
    """Core twin that tracks room state, computes features, and flags maintenance risk."""

    def __init__(self, temp_threshold: float = 298.15, humidity_threshold: float = 65.0):
        self.temp_threshold = temp_threshold
        self.humidity_threshold = humidity_threshold
        self.history: List[dict] = []

    def ingest_fmu_row(self, row: pd.Series, occupancy: int = 1, power_watts: float = 950.0) -> dict:
        state = {
            'timestamp': datetime.utcfromtimestamp(float(row['time'])),
            'temperature_K': float(row['T_room']),
            'temperature_C': float(row['T_room']) - 273.15,
            'humidity_pct': float(row['humidity_out']),
            'needs_service_flag': bool(row['needsService_out']),
            'occupancy': occupancy,
            'power_watts': power_watts
        }
        state['thermal_load'] = state['temperature_C'] * occupancy
        state['maintenance_score'] = self._compute_score(state)
        self.history.append(state)
        return state

    def _compute_score(self, state: dict) -> float:
        temp_penalty = max(0.0, state['temperature_K'] - self.temp_threshold)
        humidity_penalty = max(0.0, state['humidity_pct'] - self.humidity_threshold)
        service_penalty = 5.0 if state['needs_service_flag'] else 0.0
        return float(0.5 * temp_penalty + 0.2 * humidity_penalty + service_penalty)

    def latest(self) -> Optional[dict]:
        return self.history[-1] if self.history else None

    def to_frame(self) -> pd.DataFrame:
        return pd.DataFrame(self.history) if self.history else pd.DataFrame()

twin = SmartHomeDigitalTwin(temp_threshold=296.15, humidity_threshold=60.0)

In [None]:
# Example: Stream FMU states through the twin
sample_rows = df_fmu.iloc[::32]  # downsample for demo
processed = [twin.ingest_fmu_row(row, occupancy=2, power_watts=900) for _, row in sample_rows.iterrows()]
pd.DataFrame(processed).head()

## 🌐 Step 3: FastAPI + PostgreSQL scaffolding
For production we expose the twin through FastAPI and persist runs to PostgreSQL using SQLAlchemy async sessions.

In [None]:
DATABASE_URL = 'postgresql+asyncpg://app_user:supersecret@localhost:5432/smart_home'
engine = create_async_engine(DATABASE_URL, echo=False, future=True)
AsyncSessionLocal = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
Base = declarative_base()

In [None]:
class SimulationRun(Base):
    __tablename__ = 'simulation_runs'
    id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
    created_at = sa.Column(sa.DateTime(timezone=True), server_default=sa.func.now())
    fmu_path = sa.Column(sa.String(255), nullable=False)
    temperature_c = sa.Column(sa.Float, nullable=False)
    humidity_pct = sa.Column(sa.Float, nullable=False)
    maintenance_score = sa.Column(sa.Float, nullable=False)
    needs_service = sa.Column(sa.Boolean, nullable=False)

class SimulationCreate(BaseModel):
    time: float
    occupancy: int = 1
    power_watts: float = 900.0

class SimulationResponse(BaseModel):
    id: int
    created_at: datetime
    temperature_c: float
    humidity_pct: float
    maintenance_score: float
    needs_service: bool

    class Config:
        orm_mode = True

In [None]:
app = FastAPI(title='Smart Home Twin API', version='1.0.0')

async def get_session() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        yield session

@app.on_event('startup')
async def on_startup():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

@app.post('/simulate', response_model=SimulationResponse)
async def simulate(sim_request: SimulationCreate, session: AsyncSession = Depends(get_session)):
    row = df_fmu.iloc[(df_fmu['time'] - sim_request.time).abs().argmin()]
    state = twin.ingest_fmu_row(row, sim_request.occupancy, sim_request.power_watts)
    record = SimulationRun(
        fmu_path=str(FMU_PATH),
        temperature_c=state['temperature_C'],
        humidity_pct=state['humidity_pct'],
        maintenance_score=state['maintenance_score'],
        needs_service=state['needs_service_flag']
    )
    session.add(record)
    await session.commit()
    await session.refresh(record)
    return record

@app.get('/runs', response_model=List[SimulationResponse])
async def list_runs(session: AsyncSession = Depends(get_session)):
    result = await session.execute(sa.select(SimulationRun).order_by(SimulationRun.created_at.desc()).limit(50))
    return result.scalars().all()

## 🚀 Running the service
1. Set the `DATABASE_URL` to point at your PostgreSQL instance (use `docker compose` for local dev).
2. Launch the API: `uvicorn smart_home_api:app --host 0.0.0.0 --port 8000`.
3. `POST /simulate` with `{"time": 120.0, "occupancy": 3}` to run a simulation slice and persist results.
4. `GET /runs` to inspect the latest saved KPIs for dashboards or alerting pipelines.

## ✅ Next Steps
- Replace rule-based scoring with your trained RandomForest model (load pickle or retrain inline).
- Integrate real sensor inputs by injecting live data before FMU runs (e.g., MQTT, Kafka).
- Harden deployments: containerize FastAPI app, add migrations (Alembic), wire into CI/CD.
- Add monitoring: Prometheus metrics for simulation latency, Postgres health, and twin predictions.