# CONFIGURATION:

In [None]:
from jaad_data import JAAD
import torch
from PIL import Image
from torchvision import transforms
from torchvision import models
import matplotlib.pyplot as plt
import network
import openpose
from openpose import model
from openpose import util
from openpose.body import Body
import copy
from tqdm import tqdm
import pickle
from torch.utils.data import Dataset, DataLoader
import numpy as np
import cv2
import torch.nn.functional as F
from torch.cuda.amp import autocast, GradScaler
from torchmetrics import Accuracy



In [None]:
JAAD_PATH = '../JAAD'
DEEPLAB_PATH = '../best_deeplabv3plus_resnet101_cityscapes_os16.pth'
SUBSET_PATH = '../subset'

RESULTS_MASK_SUB = '../masks_results_sub.pkl'
RESULTS_MASK_BIG = '../masks_results_big.pkl'
RESULTS_MASK_BIG_TEST = '../masks_results_big_test.pkl'
RESULTS_MASK_SUB_TEST = '../masks_results_sub_test.pkl'

RESULTS_POSE_BIG = '../pose_results_big.pkl'
RESULTS_POSE_SUB = '../pose_results_sub.pkl'
RESULTS_POSE_BIG_TEST = '../pose_results_big_test.pkl'
RESULTS_POSE_SUB_TEST = '../pose_results_sub_test.pkl'

POSE_PATH = '../body_pose_model.pth'   


In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)
torch.cuda.empty_cache()


In [None]:
# CONFIG: 
RUN = False
BIG =True

In [None]:
if BIG:
    MASK_CMD = RESULTS_MASK_BIG
    POSE_CMD = RESULTS_POSE_BIG
    MASK_CMD_TEST = RESULTS_MASK_BIG_TEST
    POSE_CMD_TEST = RESULTS_POSE_BIG_TEST
    DT_CMD = JAAD_PATH
else:
    MASK_CMD = RESULTS_MASK_SUB
    POSE_CMD = RESULTS_POSE_SUB
    MASK_CMD_TEST = RESULTS_MASK_SUB_TEST
    POSE_CMD_TEST = RESULTS_POSE_SUB_TEST
    DT_CMD = SUBSET_PATH



# DATASET

In [None]:
# stampa una sequenza di informazioni del dataset

#jaad_dt.generate_database() # in particolare, questo stampa un sacco di informazioni come borse vestiti semafori ecc
#jaad_dt.get_data_stats() # mentre questo stampa informazioni sui numeri di frame, video, pedoni, bbox ecc
#print('Train sequences:', len(seq_train['image'][0][0]))  # stampa la lunghezza della sequenza di training selezionata

In [None]:
# Load the JAAD dataset
jaad_dt = JAAD(data_path=DT_CMD)

data_opts = {
    'fstride': 15,
    'sample_type': 'beh'
}

seq_train = jaad_dt.generate_data_trajectory_sequence('train', **data_opts)  
seq_test = jaad_dt.generate_data_trajectory_sequence('test', **data_opts)  

In [None]:
# print((seq_train['intent'][1]))
# print((seq_train['image'][1]))
# print(len(seq_train['bbox']))

In [None]:
#trasformazioni che vengono usate dentro global context

train_transforms = transforms.Compose([
    transforms.Resize((512, 512)),  # Ridimensiona le immagini a 256x256
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [None]:
# Trasformazioni per le immaginin che vengono usate nel local context e nel pose extractor
transform_lc = transforms.Compose([
    #transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# GLOBAL CONTEXT EXTRACTION:

In [None]:
if RUN:
    deeplab_model = network.modeling.__dict__['deeplabv3plus_resnet101'](num_classes=19)
    deeplab_model.load_state_dict(torch.load(DEEPLAB_PATH)['model_state'])
    deeplab_model.to(device)
    deeplab_model.eval()

In [None]:
GC_trans = transforms.Compose([
    transforms.Resize((224, 224)),  # Ridimensiona le immagini a 256x256
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [None]:
def get_segmentation_mask(image_path, model, preprocess):
    """ funzione che prende in input le path delle imagini, il modello e la funzione di preprocessamento 
    e restituisce la maschera segmentata dell'immagine resizata a 224x224"""
    
    # Load the image
    input_image = Image.open(image_path).convert("RGB")
    input_tensor = preprocess(input_image).to(device)
    input_batch = input_tensor.unsqueeze(0)  # Create a batch with a single image
    
    # Pass the image through the model
    with torch.no_grad():
        output = model(input_batch)
        
    # Check if output is a tensor or a dictionary
    if isinstance(output, dict):
        output = output['out'][0]
    elif isinstance(output, torch.Tensor):
        output = output[0]
    else:
        raise ValueError(f"Unexpected output type: {type(output)}")
    
    # Convert the output to a mask
    output_predictions = output.argmax(0)
    
    # Aggiungi una dimensione batch e canale alla maschera per il ridimensionamento
    output_predictions = output_predictions.unsqueeze(0).unsqueeze(0).float()
    #print(output_predictions.shape)
    tr = transforms.ToPILImage()
    pic = tr(output_predictions.squeeze(1))
    pic= pic.convert("RGB")
    resized_mask = GC_trans(pic)

    #print(resized_mask.shape)
    # resized_maskk = tr(resized_mask)
    # plt.imshow(resized_maskk)
    # plt.show()
    return resized_mask

In [None]:
# def process_video_frames(seq_train, model, preprocess):
#     """funzione che prende in input la sequenza di training, il modello e la funzione di preprocessamento, restituisce una
#     lista di segmentation mask per ogni frame di ogni video della sequenza di training"""
    
#     all_masks = []
#     for video_frames in seq_train['image']:
#         video_masks = []

#         for frame_path in video_frames:
#             mask = get_segmentation_mask(frame_path, model, preprocess)
#             #visualize_mask(frame_path, mask)
#             video_masks.append(mask)
#         all_masks.append(video_masks)
#     return all_masks

In [None]:
def process_video_frames(seq_train, model, preprocess):
    """funzione che prende in input la sequenza di training, il modello e la funzione di preprocessamento, restituisce una
    lista di segmentation mask per ogni frame di ogni video della sequenza di training"""
    
    all_masks = []
    for video_frames in tqdm(seq_train['image'], desc="Processing videos"):
        video_masks = []

        for frame_path in tqdm(video_frames, desc="Processing frames", leave=False):
            mask = get_segmentation_mask(frame_path, model, preprocess)
            #visualize_mask(frame_path, mask)
            video_masks.append(mask)
        all_masks.append(video_masks)
    
    return all_masks


In [None]:
# Visualizza la maschera semantica
def visualize_mask(image_path, mask):
    image = Image.open(image_path).convert("RGB")
    image = image.resize((256, 256))  # Ridimensiona per la visualizzazione
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.imshow(image)
    plt.title("Original Image")
    plt.subplot(1, 2, 2)
    plt.imshow(mask, cmap='jet')
    plt.title("Semantic Mask")
    plt.show()


In [None]:
if RUN:
    all_video_masks = process_video_frames(seq_train, deeplab_model, train_transforms)
    seq_train['masks'] = all_video_masks
    all_video_masks_test = process_video_frames(seq_test, deeplab_model, train_transforms)
    seq_test['masks'] = all_video_masks_test
    del deeplab_model
    torch.cuda.empty_cache()
    # Apri il file in modalità scrittura binaria e salva il dizionario
    with open(MASK_CMD, 'wb') as f:
        pickle.dump(seq_train['masks'], f)
    with open(MASK_CMD_TEST, 'wb') as f:
        pickle.dump(seq_test['masks'], f)
else:
    #recover data:
    with open(MASK_CMD, 'rb') as f:
        seq_train['masks'] = pickle.load(f)
    with open(MASK_CMD_TEST, 'rb') as f:
        seq_test['masks'] = pickle.load(f)
    # Verifica che i risultati siano stati caricati correttamente
    #print(seq_train['masks'])

# LOCAL CONTEXT:

In [None]:
def crop_image(img, bbox):
    """ funzione che croppa i frames sul bounding boxes, le imagini sono in formato PIL"""

    x1, y1, x2, y2 = bbox
    return img.crop((x1, y1, x2, y2))

In [None]:
def crop_image_cv2(img, bbox):
    """ funzione che croppa i frames sul bounding boxes, le imagini sono in formato cv2"""
    
    x1, y1, x2, y2 = bbox
    return img[int(y1):int(y2), int(x1):int(x2)]

In [None]:
"""Trasformation for the local context's images, enhance the quality of the images by appling gaussian filter, unsharp mask e bilateral filter"""
all_images = []
for i in tqdm(range(len(seq_train['image'])), desc="Processing videos"):
    aux_list = []
    for j in tqdm(range(len(seq_train['image'][i])), desc="Processing frames", leave=False):
        
        # Open the images from the paths
        img_path = seq_train['image'][i][j]
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # Convert to RGB
        
        bbox = seq_train['bbox'][i][j]
        # Compute local context cropping images around the bboxes
        cropped_images = crop_image_cv2(img, bbox)
        
        # Enhance the image using various filters
        blurred_image = cv2.GaussianBlur(cropped_images, (5, 5), 0)

        sharpness = 1.5  # Sharpness factor
        blurred_for_sharp = cv2.GaussianBlur(blurred_image, (0, 0), 5)
        sharpened_image = cv2.addWeighted(blurred_image, 1.0 + sharpness, blurred_for_sharp, -sharpness, 0)

        # Noise reduction using bilateral filter
        denoised_image = cv2.bilateralFilter(sharpened_image, 9, 75, 75)

        aux_list.append(denoised_image)
    all_images.append(aux_list)

# print(type(all_images))
# print(all_images[0][0])
# plt.figure(figsize=(8, 8))
# plt.imshow(all_images[0][0])
# plt.axis('off')
# plt.title("Original (Denormalized) Image")
# plt.show()

In [None]:
all_images_test = []
for i in tqdm(range(len(seq_test['image'])), desc="Processing videos"):
    aux_list = []
    for j in tqdm(range(len(seq_test['image'][i])), desc="Processing frames", leave=False):
        
        # Open the images from the paths
        img_path = seq_test['image'][i][j]
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # Convert to RGB
        
        bbox = seq_test['bbox'][i][j]
        # Compute local context cropping images around the bboxes
        cropped_images = crop_image_cv2(img, bbox)
        
        # Enhance the image using various filters
        blurred_image = cv2.GaussianBlur(cropped_images, (5, 5), 0)

        sharpness = 1.5  # Sharpness factor
        blurred_for_sharp = cv2.GaussianBlur(blurred_image, (0, 0), 5)
        sharpened_image = cv2.addWeighted(blurred_image, 1.0 + sharpness, blurred_for_sharp, -sharpness, 0)

        # Noise reduction using bilateral filter
        denoised_image = cv2.bilateralFilter(sharpened_image, 9, 75, 75)

        aux_list.append(denoised_image)
    all_images_test.append(aux_list)


# POSE KEYPOINTS: 

In [None]:
# """block of code to extract the pose from the images"""
# body_estimation = Body('../body_pose_model.pth')
# test_image = 'pic3.png'
# oriImg = cv2.imread(test_image)  # B,G,R order
# candidate, subset = body_estimation(oriImg)
# canvas = copy.deepcopy(oriImg)
# canvas = util.draw_bodypose(canvas, candidate, subset)
# plt.imshow(canvas[:, :, [2, 1, 0]])
# plt.axis('off')
# plt.show()

In [None]:
def extract_pose_sequence(frames, body_model):
    """funzione che prende in input i frames e il modello di openpose e restituisce una lista di tensori di pose 
    per ciascuna persona nel tempo"""
    
    pose_sequences = []  # Lista di pose per ciascuna persona nel tempo
    for frame in frames:
        candidate, subset = body_model(frame)

        # block to visualize the poses on the images by printing both
        
        # canvas = copy.deepcopy(frame)
        # canvas = util.draw_bodypose(canvas, candidate, subset)
        # plt.imshow(canvas[:, :, [2, 1, 0]])  # OpenCV usa BGR, matplotlib usa RGB
        # plt.axis('off')
        # plt.show()

        frame_poses = []
        for person in subset:
            if person[-1] >= 4:  # Almeno 4 punti chiave rilevati
                pose = []
                for i in range(18):
                    if person[i] != -1:
                        x, y = candidate[int(person[i])][:2]
                    else:
                        x, y = -1, -1  # Punti chiave mancanti
                    pose.extend([x, y])
                frame_poses.append(pose)
        
        pose_sequences.append(frame_poses)

    # Trasponi la lista di liste per ottenere le sequenze temporali per ciascuna persona
    person_pose_sequences = list(map(list, zip(*pose_sequences)))
    person_pose_sequences = [torch.tensor(person_poses, dtype=torch.float32) for person_poses in person_pose_sequences]
    return person_pose_sequences

In [None]:

if RUN:
    body_model = Body(POSE_PATH)

    # Caricamento dei frame e estrazione delle pose
    all_poses = []
    all_poses_test = []
    #itera tra i video e prendi i frames
    for pics in tqdm(all_images, desc="Extracting poses from image sequences"):
        pose_sequences = extract_pose_sequence(pics, body_model)
        all_poses.append(pose_sequences)

    # Aggiungi le pose estratte alla sequenza di allenamento
    seq_train['poses'] = all_poses

    for pics in tqdm(all_images_test, desc="Extracting poses from image sequences of test set"):
        pose_sequences = extract_pose_sequence(pics, body_model)
        all_poses_test.append(pose_sequences)
    # Aggiungi le pose estratte alla sequenza di allenamento
    seq_test['poses'] = all_poses_test
    del body_model
    torch.cuda.empty_cache()
    # Apri il file in modalità scrittura binaria e salva il dizionario

    with open(POSE_CMD, 'wb') as f:
        pickle.dump(seq_train['poses'], f)
    with open(POSE_CMD_TEST, 'wb') as f:
        pickle.dump(seq_test['poses'], f)
else:
    #recover data:
    with open(POSE_CMD, 'rb') as f:
        seq_train['poses'] = pickle.load(f)
    with open(POSE_CMD_TEST, 'rb') as f:
        seq_test['poses'] = pickle.load(f)
    # Verifica che i risultati siano stati caricati correttamente
    #print(seq_train['masks'])

In [None]:
# # Trasforma le pose in tensor con la forma desiderata [batch_size, num_frames, 36]
# def pad_and_stack(pose_sequences, num_frames):
#     padded_sequences = []
#     for person_poses in pose_sequences:
#         if person_poses.shape[0] < num_frames:
#             padding = torch.full((num_frames - person_poses.shape[0], 36), -1)
#             person_poses = torch.cat((person_poses, padding), dim=0)
#         padded_sequences.append(person_poses)
#     if padded_sequences:
#         return torch.stack(padded_sequences)
#     return torch.tensor([])

# num_frames = max((len(seq) for batch in all_poses for seq in batch), default=0)
# batch_size = len(all_poses)

# if num_frames > 0:
#     # Aggrega tutte le pose in un tensor
#     all_poses_tensor = []
#     for batch in all_poses:
#         batch_poses = pad_and_stack(batch, num_frames)
#         if batch_poses.size(0) > 0:
#             all_poses_tensor.append(batch_poses)

#     if all_poses_tensor:
#         all_poses_tensor = torch.stack(all_poses_tensor)
#         print(all_poses_tensor.shape)  # Dovrebbe essere [batch_size, num_frames, 36]

#         # Aggiungi le pose tensorizzate alla sequenza di allenamento
#         seq_train['poses'] = all_poses_tensor
#     else:
#         print("Nessuna pose valida trovata nelle sequenze di immagini.")
# else:
#     print("Nessuna sequenza di pose trovata.")

# MODEL:

In [None]:
class VisionBranchLocal(torch.nn.Module):
    """definizione del modello per il local context, prende in input le immagini croppate e restituisce un tensore,
    le immagini croppate vengono fatte passare dentro una VGG16, una GRU e un attention block"""

    def __init__(self, vgg16):
        super(VisionBranchLocal, self).__init__()
        self.vgg16 = vgg16
        self.avgpool = torch.nn.AvgPool2d(kernel_size=14)  # Pooling layer con kernel 14x14
        self.gru = torch.nn.GRU(input_size=512, hidden_size=256, num_layers=2, batch_first=True)
        self.fc = torch.nn.Linear(256, 2)    # Fully connected layer
        self.attn = torch.nn.Linear(256, 1)  # Attention layer
        self.tanh = torch.nn.Tanh()

    def forward(self, cropped_images):
        seq_len, c, h, w = cropped_images.size()
        
        # Estrai feature dalle immagini con VGG16
        vgg_features = []
        for i in range(seq_len):            

            img = cropped_images[i]            
            vgg_feat_img = self.vgg16.features(img)
            pooled_feat_img = self.avgpool(vgg_feat_img)  # Applica il pooling
            vgg_feat_img = pooled_feat_img.view(pooled_feat_img.size(0), -1)  # Flatten features
            vgg_features.append(vgg_feat_img)
        
        vgg_features = torch.stack(vgg_features, dim=1).permute(2,1,0)

        gru_out, _ = self.gru(vgg_features)
        attn_weights = torch.softmax(self.attn(gru_out), dim=1)
        context_vector = torch.sum(attn_weights * gru_out, dim=1)
        
       # print("SIZE vgg features:",vgg_features.shape)
       # print("SIZE context v local context:",context_vector.shape)
        #out = self.sigmoid(self.fc(gru_out[:, -1, :]))   
        out = self.tanh((context_vector))
        return out


In [None]:
class VisionBranchGlobal(torch.nn.Module):
    """definizione del modello per il global context, prende in input le maskere semantiche e restituisce un tensore,
    le maskere semantiche vengono fatte passare dentro una VGG16, una GRU e un attention block"""

    def __init__(self, vgg16):
        super(VisionBranchGlobal, self).__init__()
        self.vgg16 = vgg16
        self.avgpool = torch.nn.AvgPool2d(kernel_size=14)  # Pooling layer con kernel 14x14
        self.gru = torch.nn.GRU(input_size=512, hidden_size=256, num_layers=2, batch_first=True)
        self.fc = torch.nn.Linear(256, 2)    # Fully connected layer
        self.attn = torch.nn.Linear(256, 1)  # Attention layer
        self.tanh = torch.nn.Tanh()

    def forward(self, masks):
        seq_len = masks.size()[0]
        #print("size forward:",seq_len)
        # Estrai feature dalle immagini con VGG16
        vgg_features = []
        for i in range(seq_len):            
            img = masks[i]            
            vgg_feat_img = self.vgg16.features(img)
            pooled_feat_img = self.avgpool(vgg_feat_img)  # Applica il pooling
            vgg_feat_img = pooled_feat_img.view(pooled_feat_img.size(0), -1)  # Flatten features
            vgg_features.append(vgg_feat_img)
        
        vgg_features = torch.stack(vgg_features, dim=1).permute(2,1,0)

        gru_out, _ = self.gru(vgg_features)
        attn_scores = self.attn(gru_out)  # shape: (batch_size, seq_length, 1)
       # print("SIZE attention scores GLO:", attn_scores.shape)
        
        attn_weights = torch.softmax(attn_scores, dim=1)  # shape: (batch_size, seq_length, 1)
        #print("SIZE attention weights GLO:", attn_weights.shape)
        
        context_vector = torch.sum(attn_weights * gru_out, dim=1)  # shape: (batch_size, 256)
        #print("SIZE context vector GLO:", context_vector.shape)
        
        out = self.tanh((context_vector))
        return out


In [None]:
class NVisionBranch(torch.nn.Module):
    """classe relativa al non-vision brach, prende in input le pose e le bbox in formato tensore, esse vengono fatte passare
    dentro una GRU e un attention block, l'ordine influenza la prestazioni"""
    
    def __init__(self):
        super(NVisionBranch, self).__init__()
        self.gru = torch.nn.GRU(input_size=36, hidden_size=256, num_layers=2, batch_first=True)
        self.gru2 = torch.nn.GRU(input_size=256+4, hidden_size=256, num_layers=2, batch_first=True)
        #self.fc = torch.nn.Linear(256, 2)  
        self.attn = torch.nn.Linear(256, 1)  # Attention layer
        self.tanh = torch.nn.Tanh()

    def forward(self, poses,bbox):
        gru_out, _ = self.gru(poses)
        #print("SIZE outuyput gru posa:",gru_out.shape)
        #print("bbox:",bbox.shape)
        LP = torch.cat((gru_out,bbox),dim=-1)
        #print("SIZE outuyput gru posa + bbox:",LP.shape)
        gru_out, _ = self.gru2(LP)
       # print("SIZE output gru:",gru_out.shape)

        # Attention mechanism
        #features = torch.stack([gru_out[:,i,:] for i in range(gru_out.size(1))], dim=1)
        attn_scores = self.attn(gru_out)  # shape: (batch_size, seq_length, 1)
       # print("SIZE attention scores:", attn_scores.shape)
        
        attn_weights = torch.softmax(attn_scores, dim=1)  # shape: (batch_size, seq_length, 1)
        #print("SIZE attention weights:", attn_weights.shape)
        
        context_vector = torch.sum(attn_weights * gru_out, dim=1)  # shape: (batch_size, 256)
        #print("SIZE context vector:", context_vector.shape)
        
        #out = self.sigmoid(self.fc(gru_out[:, -1, :]))   
        out = self.tanh(context_vector)
        return out


In [None]:
# class Attention(torch.nn.Module):
#     def __init__(self, hidden_dim):
#         super(Attention, self).__init__()
#         self.ws = nn.Linear(hidden_dim, hidden_dim)
#         self.wc = nn.Linear(hidden_dim * 2, hidden_dim)
        
#     def forward(self, hidden_states, final_hidden):
#         # hidden_states: (T, hidden_dim)
#         # final_hidden: (hidden_dim)
        
#         # Expand final_hidden to match the dimensions of hidden_states
#         final_hidden_exp = final_hidden.unsqueeze(0).expand(hidden_states.size(0), -1)  # (T, hidden_dim)
        
#         # Compute attention scores
#         scores = torch.matmul(hidden_states, self.ws(final_hidden_exp).transpose(0, 1))  # (T, T)
#         scores = scores[-1]  # We only need the scores for the last hidden state
#         attn_weights = F.softmax(scores, dim=0)  # (T)
        
#         # Compute the context vector as the weighted sum of hidden states
#         context_vector = torch.sum(attn_weights.unsqueeze(-1) * hidden_states, dim=0)  # (hidden_dim)
        
#         # Combine the context vector with the final hidden state
#         combined = torch.cat((context_vector, final_hidden), dim=-1)  # (2 * hidden_dim)
#         attention_output = torch.tanh(self.wc(combined))  # (hidden_dim)
        
#         return attention_output
# class PedestrianIntentModel(nn.Module):
#     def __init__(self, vision_branch_local, vision_branch_global, non_vision_branch, hidden_dim):
#         super(PedestrianIntentModel, self).__init__()
#         self.vision_branch_local = vision_branch_local
#         self.vision_branch_global = vision_branch_global
#         self.non_vision_branch = non_vision_branch
        
#         self.attention = Attention(hidden_dim)
#         self.fc = nn.Linear(hidden_dim, 1)  # Output per frame
#         self.sigmoid = nn.Sigmoid()

#     def forward(self, cropped_images, bboxes, masks, poses):
#         vision_out_local = self.vision_branch_local(cropped_images)  # (T, h_local)
#         vision_out_global = self.vision_branch_global(masks)  # (T, h_global)
#         non_vision_out = self.non_vision_branch(poses, bboxes)  # (T, h_non_vision)
        
#         # Concatenare gli output dei due vision branch
#         vision_out = torch.cat((vision_out_local, vision_out_global), dim=-1)  # (T, h_local + h_global)
#         # Concatenare con l'output del non-vision branch
#         final_fusion = torch.cat((vision_out, non_vision_out), dim=-1)  # (T, h_total)
        
#         # Usare l'ultima hidden state del final_fusion per l'attenzione
#         final_hidden = final_fusion[-1]  # (h_total)
        
#         # Applicare il modulo di attenzione
#         attention_output = self.attention(final_fusion, final_hidden)  # (hidden_dim)
        
#         # Applicare il fully connected layer per ogni frame
#         out = self.sigmoid(self.fc(attention_output))  # (1)
        
#         return out

In [None]:
class PedestrianIntentModel(torch.nn.Module):
    """definizione del modello finale, prende in input gli output del vision brach e del non vision branch, viene fatta
    una concatenazione che in seguito passa dentro un attention e un fully connected layer, l'output è la predizione,
    (passa non passa)"""

    def __init__(self, vision_branch_local,vision_branch_global,non_vision_branch):
        super(PedestrianIntentModel, self).__init__()
        self.vision_branch_local = vision_branch_local
        self.vision_branch_global = vision_branch_global
        self.non_vision_branch = non_vision_branch
        self.attn = torch.nn.Linear(768, 768)  # Attention layer

        self.fc = torch.nn.Linear(768, 1) # Output: crossing or not crossing
        self.sigmoid = torch.nn.Sigmoid()
        
    def forward(self, cropped_images, bboxes,masks,poses):
        vision_out_local = self.vision_branch_local(cropped_images)
        vision_out_global = self.vision_branch_global(masks)
        non_vision_out = self.non_vision_branch(poses,bboxes)
        #print("size local,global,nv:",vision_out_local.shape,vision_out_global.shape,non_vision_out.shape)
        vision_out = torch.cat((vision_out_local, vision_out_global), dim=-1)
        final_fusion = torch.cat((vision_out, non_vision_out), dim=-1)
        #print("SIZE final fusion:",final_fusion.shape)
        #print("SIZE vision ouyt :",vision_out.shape)
        attn_scores =self.attn(final_fusion)
        #print("SIZE attn scores:",attn_scores.shape)
        attn_weights = torch.softmax(attn_scores, dim=-1)
        #print("SIZE att weights final:",attn_weights.shape) 
        context_vector = torch.sum(attn_weights * final_fusion, dim=0)
        #print("SIZE context vector:",context_vector.shape)
        #print("SIZE context vector after:",context_vector.shape)
        torch.cuda.empty_cache()
        out = self.sigmoid(self.fc(context_vector))
        return out

In [None]:
class VGG16_FeatureExtractor(torch.nn.Module):

    def __init__(self):
        super(VGG16_FeatureExtractor, self).__init__()
        self.features = torch.nn.Sequential(*list(vgg16.features.children())[:24]) # block4_pool è il 23° livello
    
    def forward(self, x):
        x = self.features(x)
        return x

In [None]:
# Carica il modello VGG19 pre-addestrato
vgg16 = models.vgg16(pretrained=True)

#cut the model at the 24th layer:
vgg16_fe = VGG16_FeatureExtractor()
vgg16_fe


# define the models of each branches
model_local = VisionBranchLocal(vgg16_fe).to(device)
model_global = VisionBranchGlobal(vgg16_fe).to(device)
model_non_vision = NVisionBranch().to(device)
model = PedestrianIntentModel(model_local,model_global,model_non_vision).to(device)
#model


# DATASET & DATALOADER

In [None]:
class JAADDataset(Dataset):
    """definizione della classe per il custom dataset, prende in input la seq_train, le immagini e le trasformazioni,
    restituisce il tensore delle immagini croppate, le bboxes, le maschere, le pose e la lables"""

    def __init__(self, seq_data, all_images, transform=None):
        self.seq_data = seq_data
        self.all_images = all_images
        self.transform = transform

    def __len__(self):
        return len(self.seq_data['image'])

    def __getitem__(self, idx):
        bbox_sequence = self.seq_data['bbox'][idx]
        masks = self.seq_data['masks'][idx]
        poses = self.seq_data['poses'][idx]
        all_images = self.all_images[idx]
        #open the images from the paths 
        #images = [Image.open(img_path).convert("RGB") for img_path in img_paths]

        #compute local context cropping images arroun the bboxes
        #cropped_images = [crop_image(img, bbox) for img, bbox in zip(images, bbox_sequence)]
        # #mask_paths = self.seq_data['masks'][idx]

        cropped_images = [cv2.resize(img,(224,224), interpolation=cv2.INTER_CUBIC) for img in all_images]
        #masks = [Image.fromarray(mask.numpy()) for mask in mask_paths]
        if self.transform:
            #images = [self.transform(img) for img in images]
            tensor_images = [self.transform(img) for img in cropped_images]
            #
            #masks = [mask.Resize(224, 224) for mask in masks]
        #print(seq_train['poses'][idx])
        #print(len(poses))
        bboxes = torch.tensor(self.seq_data['bbox'][idx], dtype=torch.float32)
        intents = torch.tensor(self.seq_data['intent'][idx], dtype=torch.float32)
        return  tensor_images, bboxes, masks, poses, intents

In [None]:
train_dataset = JAADDataset(seq_train,all_images=all_images, transform=transform_lc)
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)
test_dataset = JAADDataset(seq_test,all_images=all_images_test, transform=transform_lc)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=True)

# TRAINING:

In [None]:
criterion = torch.nn.BCELoss()
optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=0.001)

In [None]:
def denormalize(tensor, mean, std):
    for t, m, s in zip(tensor, mean, std):
        t.mul_(s).add_(m)
    return tensor

In [None]:
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]
#def show_pic(image):    
    #numpy_data = image.cpu().numpy()
    # Create a new figure
    #plt.figure(figsize=(8, 8))
    # Display the image
    #plt.imshow(numpy_data)  # Use 'gray' colormap for grayscale images
    # Remove axes ticks
    #plt.axis('off')
    # Add a title if needed
    #plt.title("Your Image Title")
    # Show the plot
    #plt.show()

In [None]:
def accuracy(net, loader, device):
  acc = Accuracy(task='binary', num_classes=2).to(device)
  pose_placeholder = torch.zeros((36,), dtype=torch.float32) 

  for tensor_images, bboxes, masks, poses, intents in loader:
      poses = [pose_placeholder for _ in range(len(tensor_images))]
      poses = torch.stack(poses, dim=0)  # Now the shape is (batch_size, 36)
      #poses = poses.squeeze(0)
      poses = poses.view(len(tensor_images), -1, 36).permute(1,0,2)  # Reshape to (batch_size, numeroFrames, 36)
      #print("TRAINING:")
      #print("poses",len(poses))
      # Convert poses to tensor and reshape
      #print("poses shape:",poses.shape)   

      # Move tensors to device
      tensor_images = torch.stack(tensor_images, dim=1).squeeze(0).permute(0, 1, 2,3).to(device)  # Converte la lista di immagini in un tensor
      masks = torch.stack(masks,dim=1).squeeze(0).float().to(device)  # Converte la lista di maschere in un tensor
      bboxes = bboxes.to(device)
      poses = poses.to(device)
      intents = intents.squeeze(0)[0].to(device)
      ypred = net(tensor_images,bboxes,masks,poses)
      tensor_images.cpu()
      masks.cpu()
      bboxes.cpu()
      poses.cpu()
      del tensor_images, bboxes, masks, poses
      torch.cuda.empty_cache()
      _ = acc(ypred, intents)
      intents.cpu()
      del intents
      torch.cuda.empty_cache()
  return acc.compute()

In [None]:

num_epochs = 1

# free the memory
torch.cuda.empty_cache()

# Placeholder tensor for empty poses
pose_placeholder = torch.zeros((36,), dtype=torch.float32) 

for epoch in range(num_epochs):
    print(f'Epoch {epoch+1}/{num_epochs}')
    model.train()

    for tensor_images, bboxes, masks, poses, intents in train_loader:
        #case when there are no poses in the frame 
        # if len(poses) == 0:
        #     print("Empty poses detected, adding placeholder tensor.")
        #     poses = [pose_placeholder for _ in range(len(tensor_images))]
        #     poses = torch.stack(poses, dim=0)  # Now the shape is (batch_size, 36)
        #     #poses = poses.squeeze(0)
        #     poses = poses.view(len(tensor_images), -1, 36).permute(1,0,2)  # Reshape to (batch_size, numeroFrames, 36)
        # else:
        #     #poses = [torch.tensor(p, dtype=torch.float32) for p in poses]
        #     poses = torch.stack(poses, dim=0)  # Now the shape is (batch_size, 36)
        #     poses = poses.squeeze(0)
        poses = [pose_placeholder for _ in range(len(tensor_images))]
        poses = torch.stack(poses, dim=0)  # Now the shape is (batch_size, 36)
        #poses = poses.squeeze(0)
        poses = poses.view(len(tensor_images), -1, 36).permute(1,0,2)  # Reshape to (batch_size, numeroFrames, 36)
        #print("TRAINING:")
        #print("poses",len(poses))
        # Convert poses to tensor and reshape
        #print("poses shape:",poses.shape)   

        # Move tensors to device
        tensor_images = torch.stack(tensor_images, dim=1).squeeze(0).permute(0, 1, 2,3).to(device)  # Converte la lista di immagini in un tensor
        masks = torch.stack(masks,dim=1).squeeze(0).float().to(device)  # Converte la lista di maschere in un tensor
        bboxes = bboxes.to(device)
        poses = poses.to(device)
        intents = intents.squeeze(0)[0].to(device)
        #print("ti",tensor_images.shape)
        # print("BBOX:", bboxes.shape)
        # print("MASKS:", masks.shape)
        # print("POSES:", poses.shape)
        # print("INTENTS:", intents.shape)

        optimizer.zero_grad()

        outputs = model(tensor_images, bboxes, masks, poses)
        tensor_images.cpu()
        masks.cpu()
        bboxes.cpu()
        poses.cpu()
        loss = criterion(outputs, intents)
        loss.backward()
        optimizer.step()
        del tensor_images, masks, bboxes, poses, intents, outputs
        torch.cuda.empty_cache()
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
    model.eval()
    with torch.no_grad():
        print(f'Accuracy at epoch {epoch}: {accuracy(model, test_loader, device)}')
    #print(f'Accuracy at epoch {epoch}: {accuracy(model, test_loader, device)}')



# las

In [None]:
print(len(seq_test['image'][0][0]))