## Model

##### Explanation:

The provided code concerns the training of an AI model for human activity recognition within a smart home context. The model is designed to analyze images using YOLO (You Only Look Once) for person detection, and then use an EfficientNet-based classifier to identify activities. Finally, temporal and sensor data is analyzed and classified via a dataset containing both visual and contextual information. This culminates in the creation of an AI model capable of recognizing activity sequences within a home environment, enabling smarter home management.

![Algorithm Diagram](algorithm_diagram.svg)


In [None]:
!pip install ultralytics==8.3.165 torch==2.3.0 timm==1.0.16

In [None]:
import random
import pickle
import os
import shutil
from PIL import Image
from tqdm.auto import tqdm
from datetime import datetime
import numpy as np
import json
import tempfile


import torch
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms as T
import torch.nn as nn
import torch.nn.functional as F
from torchvision.datasets import ImageFolder
import torch.optim as optim


import timm
from ultralytics import YOLO


In [None]:
device = 'cpu' #  'cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu'

## STEP 1: person recognition model

In this step, the model uses YOLO to detect people in images. The initially fine-tuned YOLO version is based on YOLOv8n, which was further improved using the bounding boxes (bboxes) detected by YOLOv12l.

In [None]:

img_path = "clusters_kmeans_1000"
pickle_path = "person_detections_per_image.pkl"
images_path = "Home Assistant Monitor Complete Jul 12 2025/images"

n_images = 500

with open(pickle_path, "rb") as f:
    all_data = pickle.load(f)

all_data = {path: [{"bbox": x["bbox"]} for x in all_data[path]] for path in all_data.keys()}

def getbasicname(path): # path ex: '11-07_13-43-46_106.jpg'
    return "_".join(x for x in path.split("_")[:-1]) + "."+ path.split(".")[-1]

bbox = {}

while len(list(bbox.keys())) < n_images:
    dirs = os.listdir(img_path)

    for dir in dirs:
        if os.path.isfile(os.path.join(img_path, dir)):
            continue
        
        path = random.choice(os.listdir(os.path.join(img_path, dir)))

        basic = getbasicname(path)

        if os.path.join(images_path, basic) in list(bbox.keys()) or len(all_data[os.path.join(images_path, basic)]) == 0:
            continue

        bbox[os.path.join(images_path, basic)] = [
            x["bbox"] for x in all_data[os.path.join(images_path, basic)]
        ]
        
 
        if len(list(bbox.keys())) > n_images:
            break


OUT_ROOT = "ultralytics_dataset"
IMAGES_TRAIN = os.path.join(OUT_ROOT, "images", "train")
IMAGES_VAL   = os.path.join(OUT_ROOT, "images", "val")
LABELS_TRAIN = os.path.join(OUT_ROOT, "labels", "train")
LABELS_VAL   = os.path.join(OUT_ROOT, "labels", "val")

os.makedirs(IMAGES_TRAIN, exist_ok=True)
os.makedirs(IMAGES_VAL, exist_ok=True)
os.makedirs(LABELS_TRAIN, exist_ok=True)
os.makedirs(LABELS_VAL, exist_ok=True)

val_ratio = 0.2

all_paths = list(bbox.keys())
if len(all_paths) == 0:
    raise RuntimeError("bbox is empty")

random.shuffle(all_paths)
selected_paths = all_paths[:n_images] if len(all_paths) >= n_images else all_paths

n_total = len(selected_paths)
n_val = int(n_total * val_ratio)
n_train = n_total - n_val

train_paths = selected_paths[:n_train]
val_paths   = selected_paths[n_train:n_train + n_val]

print(f"Total: {n_total} (train={len(train_paths)}, val={len(val_paths)})")

def sanitize_bbox(bbox, iw, ih):
    x1, y1, x2, y2 = bbox
    x1 = max(0, min(iw-1, float(x1)))
    y1 = max(0, min(ih-1, float(y1)))
    x2 = max(0, min(iw-1, float(x2)))
    y2 = max(0, min(ih-1, float(y2)))
    if x2 <= x1 or y2 <= y1:
        return None
    return [x1, y1, x2, y2]

def bbox_to_yolo_line(bbox, iw, ih, cls_id=0):
    x1, y1, x2, y2 = bbox
    x_c = ((x1 + x2) / 2.0) / iw
    y_c = ((y1 + y2) / 2.0) / ih
    w = (x2 - x1) / iw
    h = (y2 - y1) / ih

    x_c = min(max(x_c, 0.0), 1.0)
    y_c = min(max(y_c, 0.0), 1.0)
    w   = min(max(w,   0.0), 1.0)
    h   = min(max(h,   0.0), 1.0)
    return f"{cls_id} {x_c:.6f} {y_c:.6f} {w:.6f} {h:.6f}"

def safe_copy(src, dst_dir):
    base = os.path.basename(src)
    dst = os.path.join(dst_dir, base)
    if not os.path.exists(dst):
        shutil.copy2(src, dst)
        return dst
    name, ext = os.path.splitext(base)
    i = 1
    while True:
        new_name = f"{name}_{i}{ext}"
        dst = os.path.join(dst_dir, new_name)
        if not os.path.exists(dst):
            shutil.copy2(src, dst)
            return dst
        i += 1

def save_image_and_label(src_img_path, bbox_list, images_dir, labels_dir):
    dst_img_path = safe_copy(src_img_path, images_dir)
    dst_base = os.path.splitext(os.path.basename(dst_img_path))[0]
    label_path = os.path.join(labels_dir, dst_base + ".txt")

    iw, ih = Image.open(src_img_path).size
    lines = []
    for b in bbox_list:
        sb = sanitize_bbox(b, iw, ih)
        if sb is None:
            continue
        lines.append(bbox_to_yolo_line(sb, iw, ih, cls_id=0))
    with open(label_path, "w") as f:
        f.write("\n".join(lines))

for p in train_paths:
    save_image_and_label(p, bbox[p], IMAGES_TRAIN, LABELS_TRAIN)

for p in val_paths:
    save_image_and_label(p, bbox[p], IMAGES_VAL, LABELS_VAL)

dataset_yaml_path = os.path.join(OUT_ROOT, "dataset.yaml")
with open(dataset_yaml_path, "w") as f:
    f.write(f"train: {os.path.abspath(IMAGES_TRAIN)}\n")
    f.write(f"val:   {os.path.abspath(IMAGES_VAL)}\n")
    f.write("nc: 1\n")
    f.write("names: ['person']\n")

print("Dataset created in", os.path.abspath(OUT_ROOT))
print(" - images train:", len(os.listdir(IMAGES_TRAIN)))
print(" - images val:  ", len(os.listdir(IMAGES_VAL)))
print("YAML written in:", dataset_yaml_path)

Subsequently to train the model (in the same carpet of this file):

```
yolo task=detect mode=train model=yolov8n.pt data=ultralytics_dataset/dataset.yaml epochs=50 imgsz=640 batch=16
```

and save the model in this carpet with the name of "yolo-domotica-ai.pt"

In [None]:
class YOLOPersonDetector:
    def __init__(self, weights_path: str, device: str = 'cpu', person_class: int = 0, iou: float = 0.5):
        self.model = YOLO(weights_path)
        self.model.to(device)
        self.device = torch.device(device)
        self.person_class = person_class
        self.iou = iou
        self.num_classes = 1  
        self._weights_path = weights_path

    @staticmethod
    def _to_numpy(image_tensor: torch.Tensor) -> np.ndarray:
        # sposta sempre su CPU prima di convertire
        image_tensor = image_tensor.detach().cpu()

        arr = image_tensor
        if arr.dtype != torch.uint8:
            arr = arr.clamp(0, 255)
            if arr.max() <= 1.0:
                arr = arr * 255.0
            arr = arr.byte()
        arr = arr.permute(1, 2, 0).contiguous().numpy()  # CHW -> HWC
        return arr
    
    def predict(self, image: torch.Tensor, score_threshold: float = 0.5):
        np_img = self._to_numpy(image)

        results = self.model.predict(
            source=np_img, conf=score_threshold, iou=self.iou, verbose=False, device=self.device
        )
        res = results[0]
        if res.boxes is None or len(res.boxes) == 0:
            return {'boxes': torch.empty((0,4), dtype=torch.float32),
                    'scores': torch.empty((0,), dtype=torch.float32)}
        boxes_xyxy = res.boxes.xyxy  # (N,4) su device del modello
        confs = res.boxes.conf       # (N,)
        clss = res.boxes.cls         # (N,)

        keep = (clss == self.person_class)
        boxes_xyxy = boxes_xyxy[keep].detach().cpu()
        confs = confs[keep].detach().cpu()
        return {'boxes': boxes_xyxy, 'scores': confs}
    def save_weights(self, path: str):
        os.makedirs(os.path.dirname(path), exist_ok=True)
        if hasattr(self, "_weights_path") and os.path.isfile(self._weights_path):
            shutil.copy2(self._weights_path, path)
        else:
            raise FileNotFoundError(
                f"Non trovo il file dei pesi YOLO di origine: {getattr(self, '_weights_path', None)}"
            )

## STEP 2

The second step involves training an activity classification model using segmented images and sensor data. The EfficientNet model is used as the backbone for feature extraction, while a softmax classifier is used to predict which activity is being performed based on the images.

In [None]:
class ActivityClassifier(nn.Module):
    def __init__(self, num_classes, device='cpu'):
        super().__init__()
        self.device = device
        self.num_classes = num_classes

        self.backbone = timm.create_model(
            'efficientnet_b0', pretrained=True,
            num_classes=0, global_pool=''
        )
        self.backbone.to(self.device)
        self.backbone.eval()

        self.adaptive_pool = nn.AdaptiveAvgPool2d((1, 1))
        embed_dim = 1280

        self.classifier = nn.Linear(embed_dim, num_classes).to(self.device)

        self.preprocess = T.Compose([
            T.ToTensor(),
            T.Normalize(mean=[0.485, 0.456, 0.406],
                        std=[0.229, 0.224, 0.225])
        ])

        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = torch.optim.Adam(self.classifier.parameters(), lr=1e-3)

    def forward(self, images):
        with torch.no_grad():
            features = self.backbone(images)
            pooled = self.adaptive_pool(features)
            embeddings = pooled.flatten(1)
        return self.classifier(embeddings)

    def predict(self, image: Image.Image, return_prob=False):
        self.eval()
        with torch.no_grad():
            img_tensor = self.preprocess(image).unsqueeze(0).to(self.device)
            logits = self.forward(img_tensor)
            if return_prob:
                probs = F.softmax(logits, dim=1)
                return probs.argmax(dim=1).item(), probs.squeeze()
            else:
                return logits.argmax(dim=1).item()

    def save_weights(self, path="activity_classifier_weights.pth"):
        torch.save({
            'num_classes': self.num_classes,
            'state_dict': self.classifier.state_dict()
        }, path)

    @classmethod
    def load_from_file(cls, path, device='cpu'):
        checkpoint = torch.load(path, map_location=device)
        num_classes = checkpoint['num_classes']
        model = cls(num_classes=num_classes, device=device)
        model.classifier.load_state_dict(checkpoint['state_dict'])
        return model


In [None]:
class FilteredImageFolder(ImageFolder):
    def find_classes(self, directory):
        classes, class_to_idx = super().find_classes(directory)
        classes = [cls for cls in classes if cls != "undefined"]
        class_to_idx = {cls: idx for idx, cls in enumerate(classes)}
        return classes, class_to_idx


dataset  = FilteredImageFolder(
    root='clusters_kmeans_500_multi',
    transform=ActivityClassifier(1).preprocess
)

train_size = int(0.8 * len(dataset)) 
val_size = len(dataset) - train_size

train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=8, shuffle=False)

In [None]:
model = ActivityClassifier(num_classes=len(train_dataset.dataset.classes), device=device)
model.train()

In [None]:
best_val_loss = float('inf')

for epoch in range(20): # 20 hours more or less
    model.train()
    total_train_loss = 0.0

    progress_bar = tqdm(enumerate(train_dataloader), desc=f"Epoch {epoch+1} [Train]", total=len(train_dataloader))
    for i, (images, labels) in progress_bar:
        images = images.to(model.device)
        labels = labels.to(model.device)

        outputs = model(images)
        loss = model.criterion(outputs, labels)

        model.optimizer.zero_grad()
        loss.backward()
        model.optimizer.step()

        total_train_loss += loss.item()

        avg_train_loss = total_train_loss / (i + 1)
        progress_bar.set_postfix(loss=loss.item(), avg_loss=avg_train_loss)

    avg_train_loss = total_train_loss / len(train_dataloader)

    model.eval()
    total_val_loss = 0.0

    with torch.no_grad():
        val_bar = tqdm(enumerate(val_dataloader), desc=f"Epoch {epoch+1} [Val]", total=len(val_dataloader))
        for i, (images, labels) in val_bar:
            images = images.to(model.device)
            labels = labels.to(model.device)

            outputs = model(images)
            loss = model.criterion(outputs, labels)

            total_val_loss += loss.item()

            avg_val_loss = total_val_loss / (i + 1)
            val_bar.set_postfix(loss=loss.item(), avg_loss=avg_val_loss)

    avg_val_loss = total_val_loss / len(val_dataloader)

    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        model.save_weights(path="activity_classifier_weights_best.pth")

    print(f"Epoch {epoch+1}: "
          f"Train Loss = {avg_train_loss:.4f} | "
          f"Val Loss = {avg_val_loss:.4f} | "
          f"Best Val Loss = {best_val_loss:.4f}")


## STEP 3

DomoticaAI model is designed to predict complex actions based on a combination of visual inputs (images) and sensory data (sensor readings), which are integrated to provide an accurate prediction of actions.

### Detailed Model Functionality
The DomoticaAI model is a complex neural network that integrates data from multiple sources to predict which action is about to be performed in a home environment. The flow of data and the model's architecture are designed to handle temporal sequences of images, sensors, and previous actions, enabling the model to learn daily routines.

Model Structure:

- Person Detector: the model starts with the YOLO person detector (pre-trained) to identify people in the images. 
- Activity Classifier
- Sensor Data Embeddings: data from sensors (e.g., temperature, brightness) is transformed through a fully connected (Linear) layer that maps the sensory data into an embedding space. This allows the model to understand how environmental conditions affect people's activities.
- LSTM Networks (Long Short-Term Memory): are used to process the temporal sequence of images and sensory data, allowing the model to "remember" previous steps and use them to predict the next step. LSTMs are critical for understanding the flow of activities over time (e.g., transitioning from "cooking" to "cleaning").

### Detailed Architecture
Here's a breakdown of the DomoticaAI architecture:

1. Input: Images and Sensor Data
    - Images are provided in temporal sequences. For each time step, an image is processed through the Person Detector (YOLO).
    - Sensor data (e.g., temperature, brightness) is associated with the images and transformed into a numerical format, which is then passed to the network.
2. Image Embeddings: each image is passed through an embedding layer that represents the activity of the detected person. This layer maps activity classes (e.g., "sitting", "walking") into a high-dimensional space.
3. Image Temporal LSTM: the images are processed sequentially through an LSTM, which "remembers" information from previous images. This provides a temporal representation of the activity, considering the context of prior images.
4. Action Temporal LSTM: additionally, LSTMs are used to handle previous actions, allowing the model to "track" past actions, which helps in predicting future activities (e.g., after cooking, the person may clean).
5. Representation Fusion: the representations of the images (extracted from the Image LSTM) are combined with those of the sensors (from sensor embeddings). These combined representations are processed through Layer Normalization (to stabilize learning) and Dropout (to prevent overfitting).
6. Output: Action Prediction: Finally, a Fully Connected (FC) Layer maps the final representation to a probability for each action. Another FC layer is used to predict the parameters associated with each action (such as the duration or intensity of the activity).

In [None]:
class DomoticaAIDataset(Dataset):
    def __init__(self, json_path, sensor_keys, context_len=3, transform=None):
        with open(json_path, 'r') as f:
            clusters = json.load(f)
        self.context_len = context_len
        self.transform = transform
        self.sensor_keys = sensor_keys
        self.num_sensor = 2 + len(sensor_keys)  # ora + weekday + custom sensori
        self.records = []
        for cluster in clusters:
            imgs = [inp['img_path'] for inp in cluster['inputs']]
            sensors = [inp.get('sensors', {}) for inp in cluster['inputs']]
            actions = cluster.get('outputs', {}).get('actions', [])
            for act in actions:
                self.records.append({
                    'imgs': imgs,
                    'sensors': sensors,
                    'action_name': act['action_name'],
                    'params': act.get('params', [])
                })
            self.records.append({
                'imgs': imgs,
                'sensors': sensors,
                'action_name': '<STOP>',
                'params': []
            })
        self.a2i = self._build_action_vocab()
        self.p2i = self._build_param_vocab()
        history_tokens = set(self.a2i.keys()) | {'<NONE>'}
        self.prev2i = {tok: idx for idx, tok in enumerate(sorted(history_tokens))}

    def _build_action_vocab(self):
        names = {r['action_name'] for r in self.records}
        return {name: idx for idx, name in enumerate(sorted(names))}

    def _build_param_vocab(self):
        ps = set()
        for r in self.records:
            ps.update(r['params'])
        return {name: idx for idx, name in enumerate(sorted(ps))}

    def __len__(self):
        return len(self.records)

    def __getitem__(self, idx):
        rec = self.records[idx]
        # immagini
        imgs = []
        for path in rec['imgs']:
            img = Image.open(path).convert('RGB')
            img_t = self.transform(img) if self.transform else torch.from_numpy(np.array(img)).permute(2,0,1).float()
            imgs.append(img_t)
        # sensori
        sensor_tensors = []
        for sdict in rec['sensors']:
            ts_str = sdict.get('timestamp') or sdict.get('file_timestamp')
            if ts_str:
                dt = datetime.fromisoformat(ts_str)
                hour = dt.hour + dt.minute/60.0
                weekday = dt.weekday()
            else:
                hour, weekday = 0.0, 0.0
            vec = [hour, float(weekday)]
            for key in self.sensor_keys:
                val = sdict.get(key, 0)
                try:
                    vec.append(float(val))
                except Exception:
                    vec.append(float(hash(val) % 1000))
            sensor_tensors.append(torch.tensor(vec, dtype=torch.float32))
        # history azioni
        start = max(0, idx - self.context_len)
        hist = [r['action_name'] for r in self.records[start:idx]]
        hist = ['<NONE>']*(self.context_len - len(hist)) + hist
        hist_ids = [self.prev2i.get(h, self.prev2i['<NONE>']) for h in hist]
        hist_tensor = torch.tensor(hist_ids, dtype=torch.long)
        # target
        act_id = list(self.a2i.values())[list(self.a2i.keys()).index(rec['action_name'])] if rec['action_name'] in self.a2i else 0
        act_id = self.a2i[rec['action_name']]
        param_vec = torch.zeros(len(self.p2i), dtype=torch.float32)
        for p in rec['params']:
            param_vec[self.p2i[p]] = 1.0
        return {
            'images': imgs,
            'sensor_data': sensor_tensors,
            'prev_actions': hist_tensor,
            'action_id': torch.tensor(act_id, dtype=torch.long),
            'param_vector': param_vec
        }

def collate_fn(batch):
    images_batch, sensor_batch = [], []
    prev_actions_batch, action_ids_batch, param_vectors_batch = [], [], []
    for sample in batch:
        images_batch.append(sample['images'])
        sensor_batch.append(sample['sensor_data'])
        prev_actions_batch.append(sample['prev_actions'])
        action_ids_batch.append(sample['action_id'])
        param_vectors_batch.append(sample['param_vector'])
    return {
        'images': images_batch,
        'sensor_data': sensor_batch,
        'prev_actions': torch.stack(prev_actions_batch),
        'action_ids': torch.stack(action_ids_batch),
        'param_vectors': torch.stack(param_vectors_batch)
    }


In [None]:
from typing import Dict, Optional, List

class DomoticaAI(nn.Module):
    def __init__(self,
                 person_detector,
                 activity_classifier,
                 num_sensor: int,
                 num_prev_actions: int,
                 num_output_actions: int,
                 num_output_params: int,
                 num_input_images: int = 3,
                 embedding_dim: int = 128,
                 prev_action_emb_dim: int = 64,
                 action_emb_dim: int = 64,
                 max_persons: int = 10,
                 device: str = 'cpu',
                 # >>> NEW: vocabolari opzionali <<<
                 action_vocab: Optional[Dict[str, int]] = None,
                 param_vocab: Optional[Dict[str, int]] = None,
                 prev_vocab: Optional[Dict[str, int]] = None):
        super().__init__()
        self.device = torch.device(device)

        self.num_sensor = num_sensor
        self.num_prev_actions = num_prev_actions
        self.num_output_actions = num_output_actions
        self.num_output_params = num_output_params
        self.num_input_images = num_input_images
        self.embedding_dim = embedding_dim
        self.prev_action_emb_dim = prev_action_emb_dim
        self.action_emb_dim = action_emb_dim
        self.max_persons = max_persons

        self.person_detector = person_detector  # già su device
        self.activity_classifier = activity_classifier.to(self.device).eval()

        self.activity_embeddings = nn.ModuleList([
            nn.Embedding(self.activity_classifier.num_classes, embedding_dim)
            for _ in range(num_input_images)
        ])
        self.no_person_embeddings = nn.ParameterList([
            nn.Parameter(torch.zeros(embedding_dim), requires_grad=True)
            for _ in range(num_input_images)
        ])
        self.sensor_embeddings = nn.ModuleList([
            nn.Linear(num_sensor, embedding_dim)
            for _ in range(num_input_images)
        ])

        self.temporal_weights = nn.Parameter(torch.ones(num_input_images))
        self.image_lstm = nn.LSTM(embedding_dim, embedding_dim, batch_first=True)
        self.temporal_lstm = nn.LSTM(embedding_dim, embedding_dim, batch_first=True)

        self.prev_action_embedding = nn.Embedding(num_prev_actions, prev_action_emb_dim)
        self.prev_action_lstm = nn.LSTM(prev_action_emb_dim, 128, batch_first=True)

        combined_dim = embedding_dim + 128
        self.ln = nn.LayerNorm(combined_dim)
        self.drop = nn.Dropout(0.3)
        self.fc_act = nn.Linear(combined_dim, num_output_actions)
        self.act_emb = nn.Embedding(num_output_actions, action_emb_dim)
        self.fc_par = nn.Linear(combined_dim + action_emb_dim, num_output_params)

        self.to(self.device)

        # >>> NEW: allega i vocabolari e le inverse map <<<
        # Se non forniti, crea mapping fittizi (utile solo per compatibilità)
        if action_vocab is None:
            action_vocab = {str(i): i for i in range(num_output_actions)}
        if param_vocab is None:
            param_vocab = {str(i): i for i in range(num_output_params)}
        if prev_vocab is None:
            prev_vocab = {str(i): i for i in range(num_prev_actions)}
        self._attach_vocabs(action_vocab, param_vocab, prev_vocab)

    # --------------------
    # Utility
    # --------------------
    def _attach_vocabs(self,
                       action_vocab: Dict[str, int],
                       param_vocab: Dict[str, int],
                       prev_vocab: Dict[str, int]):
        self.action_vocab: Dict[str, int] = dict(action_vocab)
        self.param_vocab: Dict[str, int] = dict(param_vocab)
        self.prev_vocab: Dict[str, int] = dict(prev_vocab)
        # inverse map
        self.i2a = {idx: name for name, idx in self.action_vocab.items()}
        self.i2p = {idx: name for name, idx in self.param_vocab.items()}
        self.i2prev = {idx: name for name, idx in self.prev_vocab.items()}
        # controlli di consistenza (soft)
        if len(self.action_vocab) != self.num_output_actions:
            print(f"[WARN] action_vocab size ({len(self.action_vocab)}) != num_output_actions ({self.num_output_actions})")
        if len(self.param_vocab) != self.num_output_params:
            print(f"[WARN] param_vocab size ({len(self.param_vocab)}) != num_output_params ({self.num_output_params})")
        if len(self.prev_vocab) != self.num_prev_actions:
            print(f"[WARN] prev_vocab size ({len(self.prev_vocab)}) != num_prev_actions ({self.num_prev_actions})")

    def crop(self, image: torch.Tensor, bbox: torch.Tensor):
        to_pil = T.ToPILImage()
        pil = to_pil(image)
        x1, y1, x2, y2 = bbox.cpu().numpy().astype(int)
        w, h = x2 - x1, y2 - y1
        pad_x, pad_y = int(w*0.3), int(h*0.3)
        x1f, y1f = max(0, x1-pad_x), max(0, y1-pad_y)
        x2f = min(pil.width, x2+pad_x)
        y2f = min(pil.height, y2+pad_y)
        return pil.crop((x1f, y1f, x2f, y2f))

    def process_single_image(self, image: torch.Tensor, sensor_vec: torch.Tensor, timestep: int):
        det = self.person_detector.predict(image, score_threshold=0.5)
        boxes, scores = det['boxes'], det['scores']
        if len(boxes) > self.max_persons:
            idx = torch.topk(scores, self.max_persons).indices
            boxes = boxes[idx]

        acts = []
        for box in boxes:
            sub = self.crop(image, box)
            acts.append(self.activity_classifier.predict(sub))

        if acts:
            cls_t = torch.tensor(acts, device=self.device)
            emb = self.activity_embeddings[timestep](cls_t)
            if emb.dim() == 1:
                emb = emb.unsqueeze(0)
            emb_input = emb.unsqueeze(0)
            _, (h_img, _) = self.image_lstm(emb_input)
            img_repr = h_img[-1, 0, :]
        else:
            no_person_emb = self.no_person_embeddings[timestep]
            no_person_input = no_person_emb.unsqueeze(0).unsqueeze(0)
            _, (h_img, _) = self.image_lstm(no_person_input)
            img_repr = h_img[-1, 0, :]

        sensor_emb = self.sensor_embeddings[timestep](sensor_vec.to(self.device))
        result = img_repr + sensor_emb
        return result

    def forward(self, images: List[List[torch.Tensor]], sensor_data: List[List[torch.Tensor]], prev_action_seq: torch.Tensor):
        batch_size = len(images)
        Tsteps = self.num_input_images
        reps = []

        for i in range(batch_size):
            step_reprs = []
            for t in range(Tsteps):
                img = images[i][t].to(self.device)
                sensor_vec = sensor_data[i][t].to(self.device)
                step_repr = self.process_single_image(img, sensor_vec, t)
                step_reprs.append(step_repr)
            seq = torch.stack(step_reprs, dim=0)
            w = torch.softmax(self.temporal_weights, dim=0).view(-1, 1)
            seq_w = (seq * w).unsqueeze(0)
            _, (h_t, _) = self.temporal_lstm(seq_w)
            reps.append(h_t.squeeze(0).squeeze(0))

        img_r = torch.stack(reps, dim=0)

        prev_emb = self.prev_action_embedding(prev_action_seq.to(self.device))
        _, (h_p, _) = self.prev_action_lstm(prev_emb)
        prev_r = h_p[-1]

        combined = torch.cat([img_r, prev_r], dim=-1)
        combined = self.ln(combined)
        combined = self.drop(combined)

        logits = self.fc_act(combined)
        act_emb = torch.matmul(torch.softmax(logits, dim=-1), self.act_emb.weight)
        params = self.fc_par(torch.cat([combined, act_emb], dim=-1))
        return logits, params

    def predict(self, images, sensor_data, prev_action_seq, threshold: float = 0.5):
        self.eval()
        with torch.no_grad():
            logits, params = self.forward(images, sensor_data, prev_action_seq)
            action_id = logits.argmax(dim=-1).item()
            action_name = self.i2a.get(action_id, str(action_id))
            param_vec = params.squeeze(0)
            # Dizionario {nome_param: score} filtrato per threshold
            filtered = {self.i2p[i]: v.item() for i, v in enumerate(param_vec) if v.item() > threshold}
            return action_id, action_name, filtered

    # >>> NEW: helper per mappare id/score in etichette <<<
    def idx_to_action(self, idx: int) -> str:
        return self.i2a.get(idx, str(idx))

    def vec_to_params(self, param_scores: torch.Tensor, threshold: float = 0.5) -> List[str]:
        param_scores = param_scores.detach().cpu().flatten()
        return [self.i2p[i] for i, v in enumerate(param_scores) if float(v) > threshold]

    # --------------------
    # Serializzazione singolo file
    # --------------------
    def save_singlefile(self, bundle_path: str):
        cfg = {
            'num_sensor': self.num_sensor,
            'num_prev_actions': self.num_prev_actions,
            'num_output_actions': self.num_output_actions,
            'num_output_params': self.num_output_params,
            'num_input_images': self.num_input_images,
            'embedding_dim': self.embedding_dim,
            'prev_action_emb_dim': self.prev_action_emb_dim,
            'action_emb_dim': self.action_emb_dim,
            'max_persons': self.max_persons,
            'device': str(self.device),
            'activity_num_classes': self.activity_classifier.num_classes,
            'activity_arch': 'efficientnet_b0',
            'format_version': 2,  # >>> NEW: bump versione
        }

        core_state = self.state_dict()
        activity_state = self.activity_classifier.classifier.state_dict()

        with open(self.person_detector._weights_path, 'rb') as f:
            yolo_bytes = f.read()

        package = {
            'cfg': cfg,
            'core_state': core_state,
            'activity_state': activity_state,
            'yolo_bytes': yolo_bytes,
            # >>> NEW: includi i vocabolari <<<
            'vocab': {
                'action_vocab': self.action_vocab,
                'param_vocab': self.param_vocab,
                'prev_vocab': self.prev_vocab,
            }
        }

        torch.save(package, bundle_path)
        print(f"[OK] Bundle salvato in: {bundle_path}")

    @classmethod
    def load_singlefile(cls, bundle_path: str):
        package = torch.load(bundle_path, map_location='cpu')  # poi spostiamo su device
        cfg = package['cfg']

        device = cfg.get('device', 'cpu')

        # YOLO weights in tmp file
        with tempfile.NamedTemporaryFile(delete=False, suffix='.pt') as tmp:
            tmp.write(package['yolo_bytes'])
            tmp_path = tmp.name

        try:
            person_detector = YOLOPersonDetector(weights_path=tmp_path, device=device)
        finally:
            try:
                os.remove(tmp_path)
            except Exception:
                pass

        num_cls = cfg.get('activity_num_classes', 500)
        activity_classifier = ActivityClassifier(num_classes=num_cls, device=device)
        activity_classifier.classifier.load_state_dict(package['activity_state'])
        activity_classifier.to(device)

        # >>> NEW: recupera i vocabolari dal pacchetto (se presenti) <<<
        vocabs = package.get('vocab', {})
        action_vocab = vocabs.get('action_vocab')
        param_vocab = vocabs.get('param_vocab')
        prev_vocab = vocabs.get('prev_vocab')

        # Modello core
        model = cls(
            person_detector=person_detector,
            activity_classifier=activity_classifier,
            num_sensor=cfg['num_sensor'],
            num_prev_actions=cfg['num_prev_actions'],
            num_output_actions=cfg['num_output_actions'],
            num_output_params=cfg['num_output_params'],
            num_input_images=cfg['num_input_images'],
            embedding_dim=cfg['embedding_dim'],
            prev_action_emb_dim=cfg['prev_action_emb_dim'],
            action_emb_dim=cfg['action_emb_dim'],
            max_persons=cfg['max_persons'],
            device=device,
            action_vocab=action_vocab,
            param_vocab=param_vocab,
            prev_vocab=prev_vocab,
        )

        model.load_state_dict(package['core_state'], strict=True)
        model.to(device)
        model.eval()
        print(f"[OK] Bundle caricato da: {bundle_path} su device {device}")
        return model

In [None]:
NUM_EPOCHS = 50
LEARNING_RATE = 1e-4
CONTEXT_LEN = 3

dataset = DomoticaAIDataset(
    json_path='annotations.json',
    sensor_keys=['luminosita_categorizzata_2'],
    context_len=CONTEXT_LEN,
    transform=None
)

a2i    = dataset.a2i
p2i    = dataset.p2i
prev2i = dataset.prev2i
num_sensor = dataset.num_sensor

train_size = int(0.8 * len(dataset)) 
val_size = len(dataset) - train_size

train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(
    train_dataset,
    batch_size=8,
    shuffle=True,
    collate_fn=collate_fn,
    num_workers=0,
    pin_memory=False
)

val_loader = DataLoader(
    val_dataset,
    batch_size=8,
    shuffle=False,
    collate_fn=collate_fn,
    num_workers=0,
    pin_memory=False
)


In [None]:
person_detector = YOLOPersonDetector(weights_path='yolo-domotica-ai.pt', device=device)
activity_classifier = ActivityClassifier.load_from_file('activity_classifier_weights_best.pth', device=device)

In [None]:
model = DomoticaAI(
    person_detector=person_detector,
    activity_classifier=activity_classifier,
    num_sensor=num_sensor,
    num_prev_actions=len(prev2i),
    num_output_actions=len(a2i),
    num_output_params=len(p2i),
    num_input_images=3,
    embedding_dim=128,
    prev_action_emb_dim=64,
    action_emb_dim=64,
    max_persons=10,
    device=device,
    action_vocab=a2i,   
    param_vocab=p2i,     
    prev_vocab=prev2i    
).to(device)

In [None]:
class FocalLoss(nn.Module):
    def __init__(self, alpha=0.25, gamma=2, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        BCE_loss = F.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-BCE_loss) 
        loss = self.alpha * (1 - pt) ** self.gamma * BCE_loss
        
        if self.reduction == 'mean':
            return loss.mean()
        elif self.reduction == 'sum':
            return loss.sum()
        else:
            return loss
        
class BinaryFocalLoss(nn.Module):
    def __init__(self, alpha=None, gamma=2.0, reduction='mean'):
        super().__init__()
        self.gamma = gamma
        self.reduction = reduction
        self.register_buffer('alpha', None if alpha is None else torch.as_tensor(alpha, dtype=torch.float32))

    def forward(self, logits, targets):
        bce = F.binary_cross_entropy_with_logits(logits, targets, reduction='none')
        pt  = torch.exp(-bce)                  
        loss = (1 - pt) ** self.gamma * bce

        if self.alpha is not None:
            a = self.alpha
            if a.dim() == 0:                 
                a_pos = a
                a_neg = 1 - a
            else:                             
                a_pos = a.view(1, -1)
                a_neg = (1 - a).view(1, -1)
            alpha_weight = targets * a_pos + (1 - targets) * a_neg
            loss = loss * alpha_weight

        if self.reduction == 'mean':
            return loss.mean()
        elif self.reduction == 'sum':
            return loss.sum()
        return loss



In [None]:
# action_criterion = nn.CrossEntropyLoss()
action_criterion = FocalLoss(alpha=0.25, gamma=2)
# param_criterion  = nn.BCEWithLogitsLoss()
param_criterion = BinaryFocalLoss(alpha=0.25, gamma=2.0)
optimizer        = optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-5)
scheduler        = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)

print(f"Training on {device}, num_sensor={dataset.num_sensor}")

# Funzione per la validazione
def validate(model, val_loader, action_criterion, param_criterion, device):
    model.eval()
    running_loss = 0.0
    running_act_loss = 0.0
    running_param_loss = 0.0

    with torch.no_grad():
        pbar = tqdm(enumerate(val_loader), desc="Validating", total=len(val_loader))
        for i, batch in pbar:  # <-- unpack (i, batch)
            images       = batch['images']
            sensor_data  = batch['sensor_data']
            prev_actions = batch['prev_actions'].to(device)
            action_ids   = batch['action_ids'].to(device)
            param_vecs   = batch['param_vectors'].to(device)

            logits, params = model(images, sensor_data, prev_actions)

            # compute BOTH losses
            loss_action = action_criterion(logits, action_ids)
            loss_param  = param_criterion(params, param_vecs)

            loss = loss_action + 0.5 * loss_param

            running_loss += loss.item()
            running_act_loss += loss_action.item()
            running_param_loss += loss_param.item()

            pbar.set_postfix({
                'loss': f"{loss.item():.4f}",
                'act': f"{loss_action.item():.4f}",
                'par': f"{loss_param.item():.4f}",
                'avg_loss': f"{running_loss / (i+1):.4f}",
                'avg_act': f"{running_act_loss / (i+1):.4f}",
                'avg_param': f"{running_param_loss / (i+1):.4f}",
            })

    avg_loss = running_loss / len(val_loader)
    avg_act_loss = running_act_loss / len(val_loader)
    avg_param_loss = running_param_loss / len(val_loader)
    print(f"Validation avg loss: {avg_loss:.4f}, action loss: {avg_act_loss:.4f}, param loss: {avg_param_loss:.4f}")
    return avg_loss


# Ciclo di allenamento
for epoch in range(NUM_EPOCHS):
    print(f"\n=== Epoch {epoch+1}/{NUM_EPOCHS} ===")
    
    # Training phase
    model.train()
    running_loss = 0.0
    running_act_loss = 0.0
    running_param_loss = 0.0
    pbar = tqdm(enumerate(train_loader), desc="Training", total=len(train_loader))
    for i, batch in pbar:
        images      = batch['images']
        sensor_data = batch['sensor_data']
        prev_actions= batch['prev_actions'].to(device)
        action_ids  = batch['action_ids'].to(device)
        param_vecs  = batch['param_vectors'].to(device)

        optimizer.zero_grad()
        logits, params = model(images, sensor_data, prev_actions)
        loss_action = action_criterion(logits, action_ids)
        loss_param  = param_criterion(params, param_vecs)
        loss = loss_action + 0.5 * loss_param
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        running_loss += loss.item()
        running_act_loss += loss_action.item()
        running_param_loss += loss_param.item()

        pbar.set_postfix({'loss': f"{loss.item():.4f}", 'act': f"{loss_action.item():.4f}", 'par': f"{loss_param.item():.4f}", "avg_loss": f"{running_loss / (i+1):.4f}", "avg_act": f"{running_act_loss / (i+1):.4f}", "avg_param": f"{running_param_loss / (i+1):.4f}"})
    
    # Average loss for training
    avg_train_loss = running_loss / len(train_loader)
    avg_train_act_loss = running_act_loss / len(train_loader)
    avg_train_param_loss = running_param_loss / len(train_loader)

    print(f"Epoch {epoch+1} training avg loss: {avg_train_loss:.4f}, action loss: {avg_train_act_loss:.4f}, param loss: {avg_train_param_loss:.4f}")
    
    # Validation phase
    avg_val_loss = validate(model, val_loader, action_criterion, param_criterion, device)

    scheduler.step()

    print(f"Epoch {epoch+1} avg train loss: {avg_train_loss:.4f}, avg validation loss: {avg_val_loss:.4f}, lr: {scheduler.get_last_lr()[0]:.6f}")

    # Salvataggio dei checkpoint
    if (epoch+1) % 10 == 0:
        torch.save({
            'epoch'               : epoch+1,
            'model_state_dict'    : model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'scheduler_state_dict': scheduler.state_dict(),
        }, f'checkpoint_epoch_{epoch+1}.pth')

In [None]:
model.save_singlefile(bundle_path='model_definitive.pth')

In [None]:
import os
import torch
import torchvision.transforms as T
from PIL import Image

# --- 1) Caricamento modello dal bundle single-file ---
def load_model(bundle_path, device=None):
    device = device or ('cuda' if torch.cuda.is_available() else 'cpu')
    model = DomoticaAI.load_singlefile(bundle_path)  # usa il tuo metodo
    model.to(device).eval()
    return model, device

# --- 2) Preprocess di UNA immagine -> tensor CHW in [0,1] ---
to_tensor = T.ToTensor()

def load_one_image(path):
    if not os.path.exists(path):
        raise FileNotFoundError(f"Immagine non trovata: {path}")
    img = Image.open(path).convert('RGB')
    # niente Normalize: YOLO nel tuo detector si aspetta 0..1
    return to_tensor(img)  # Tensor CxHxW

# --- helper: mappa una lista di nomi azione precedenti -> tensor di ID (B=1, seq_len) ---
def prev_names_to_ids(prev_action_names, model, device, seq_len):
    tok2id = getattr(model, "prev_vocab", None) or {}
    none_id = tok2id.get("<NONE>", 0)
    ids = [tok2id.get(name, none_id) for name in (prev_action_names or [])]
    # tieni solo gli ultimi seq_len elementi
    ids = ids[-seq_len:]
    # pad a sinistra con <NONE> per raggiungere seq_len
    if len(ids) < seq_len:
        ids = [none_id] * (seq_len - len(ids)) + ids
    return torch.tensor([ids], dtype=torch.long, device=device)

# --- 3) Costruzione input nel formato atteso dal tuo forward ---
def make_inputs(image_paths, model, device, prev_action_names=None, prev_seq_len=8, sensor_fill=0.0):
    num_steps = model.num_input_images          # dovrebbe essere 3
    if len(image_paths) != num_steps:
        raise ValueError(f"Servono esattamente {num_steps} immagini, ricevute {len(image_paths)}")

    # images: List[ sample ] dove sample = List[ Tsteps ] di Tensor (C,H,W)
    imgs_per_t = [load_one_image(p).to(device) for p in image_paths]
    images = [imgs_per_t]  # batch size = 1

    # sensor_data: stessa struttura di images ma con vettori (num_sensor,)
    sensors_per_t = [
        torch.full((model.num_sensor,), float(sensor_fill), device=device)
        for _ in range(num_steps)
    ]
    sensor_data = [sensors_per_t]  # batch size = 1

    # prev_action_seq da nomi -> id tramite il vocabolario caricato nel bundle
    prev_action_seq = prev_names_to_ids(prev_action_names, model, device, prev_seq_len)

    return images, sensor_data, prev_action_seq

# --- 4) Inferenza semplice: usa predict() del tuo modello (con vocabs) ---
def run_inference(bundle_path, image_paths, prev_action_names=None, threshold=0.5):
    model, device = load_model(bundle_path)
    images, sensor_data, prev_action_seq = make_inputs(
        image_paths, model, device,
        prev_action_names=prev_action_names,
        prev_seq_len=8,
        sensor_fill=0.0
    )

    # inferenza (nuova predict: (action_id, action_name, params_dict))
    result = model.predict(images, sensor_data, prev_action_seq, threshold=threshold)

    # compat: se hai la vecchia predict che ritorna solo (id, dict)
    if isinstance(result, tuple) and len(result) == 3:
        action_id, action_name, params = result
    else:
        action_id, params = result
        # prova a decodificare il nome via vocabolario
        action_name = getattr(model, "idx_to_action", lambda i: str(i))(action_id)

    print(f"Action ID: {action_id}")
    print(f"Action Name: {action_name}")

    # ordina i parametri per score discendente e stampali belli
    if isinstance(params, dict) and params:
        sorted_params = sorted(params.items(), key=lambda kv: kv[1], reverse=True)
        print(f"Params > {threshold}:")
        for k, v in sorted_params:
            print(f"  - {k}: {v:.3f}")
    else:
        print(f"Nessun parametro sopra {threshold}")

    return action_id, action_name, params

# --------- USO ----------
if __name__ == "__main__":
    bundle_path = "model_definitive.pth"  # il tuo file salvato con save_singlefile
    image_paths = [
        "/Users/eduardobolognini/Desktop/domotica ai/analizzatore immagini 2/cluster_triplet/cluster_0/group_1001/28-06_16-59-02.jpg",
        "/Users/eduardobolognini/Desktop/domotica ai/analizzatore immagini 2/cluster_triplet/cluster_0/group_1001/28-06_17-00-03.jpg",
        "/Users/eduardobolognini/Desktop/domotica ai/analizzatore immagini 2/cluster_triplet/cluster_0/group_1001/28-06_17-01-06.jpg",
    ]

    # opzionale: storia azioni in chiaro (verranno mappate via prev_vocab del bundle)
    prev_action_names = []

    run_inference(bundle_path, image_paths, prev_action_names=prev_action_names, threshold=0.5)