In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# **Definizione librerie utili**

In [None]:
import cv2
import numpy as np
import math
import os
from pathlib import Path
from sklearn.decomposition import MiniBatchDictionaryLearning

# Path principali delle cartelle in cui sono contenuti i file

In [None]:
path_principale = "/content/drive/Shareddrives/Progetti FVAB 22 23 - VSR & BABELE/Gruppi/Gruppo 22/Progetto BABELE/"
path_train = "Train/"
path_test = "Test/"
path_validation = "Validation/"

# **Salvataggio lista di librerie e versioni installate**


In [None]:
!pip freeze > "/content/drive/Shareddrives/Progetti FVAB 22 23 - VSR & BABELE/Gruppi/Gruppo 22/Progetto BABELE/requirements_estrazioneSVD.txt"

# **Generazione dei file in cui saranno contenute le path dei video**

In [None]:
import os
import glob

# cartella contenente la path del dataset
dataset_path = "/content/drive/Shareddrives/Progetti FVAB 22 23 - VSR & BABELE/Datasets/Dataset_2_BABELE/BABELE_Dataset/Dataset_split_subject_independent/Lips_video_10_seconds_division_train_test_val"

# cartella in cui salvare i file
folder_save = path_principale+"FILE_PATH"

# Loop over all subfolders in folder_read
for subfolder in os.listdir(dataset_path):
    # Create a file with the subfolder's name in folder_save
    file_path = os.path.join(folder_save, subfolder + "_babele_lips.txt")
    
    # Open the file for writing
    with open(file_path, "w") as f:
        # Get a list of all files in the subfolder
        files = glob.glob(os.path.join(dataset_path, subfolder, "*"))
        
        # Write each file's path to the file
        for file in files:
            f.write(file + "\n")


# **Preprocessing dei video tramite codifica sparsa**






## **Impostazione parametri per la codifica sparsa e inizializzazione modello**


In [None]:
# Imposta i parametri per la codifica sparsa
# N.B La spiegazione dei parametri settati è riportata nella prossima cella 
n_components = 1
alpha = 1
batch_size = 3
n_iter = 1000
shuffle = True
random_state = None
positive_dict = False
transform_algorithm = 'omp'
transform_alpha = None
transform_n_nonzero_coefs = None

# Inizializza il modello di apprendimento del dizionario
dict_learning = MiniBatchDictionaryLearning(n_components=n_components,alpha=alpha,
                                            batch_size=batch_size, n_iter=n_iter, shuffle=shuffle,random_state=random_state,
                                            positive_dict=positive_dict, transform_algorithm=transform_algorithm,
                                            transform_alpha=transform_alpha,transform_n_nonzero_coefs=transform_n_nonzero_coefs)



* n_components: Specifica il numero di componenti nel dizionario sparso. In questo caso, è impostato a 1, il che significa che si desidera un singolo componente nel dizionario.
* alpha: Parametro di regolarizzazione per il modello di apprendimento del dizionario. Controlla il livello di sparsità delle rappresentazioni sparse. Un valore più alto di alpha produce rappresentazioni più sparse.
* batch_size: La dimensione del batch utilizzata per l'apprendimento del dizionario. Indica il numero di campioni da utilizzare in ogni iterazione dell'algoritmo di apprendimento.
* n_iter: Il numero di iterazioni per l'apprendimento del dizionario. Specifica quante volte ripetere l'algoritmo di apprendimento.
* shuffle: Indica se i campioni devono essere mescolati prima di ogni iterazione. Se impostato su True, i campioni vengono mescolati casualmente per ogni iterazione.
* random_state: Il seed utilizzato dal generatore di numeri casuali. Se viene fornito un valore, garantisce la riproducibilità dell'apprendimento del dizionario.
* positive_dict: Specifica se il dizionario appreso deve essere vincolato ad avere solo valori non negativi.
* transform_algorithm: L'algoritmo utilizzato per calcolare la codifica sparsa dei campioni. In questo caso, è impostato su 'omp', che sta per Orthogonal Matching Pursuit.
* transform_alpha: Parametro di regolarizzazione per l'algoritmo di codifica sparsa. Controlla la sparsità delle rappresentazioni sparse. Se non viene fornito alcun valore, viene utilizzato il valore di alpha definito precedentemente.
* transform_n_nonzero_coefs: Il numero massimo di coefficienti non nulli nella rappresentazione sparsa di ogni campione. Se non viene fornito alcun valore, non viene impostato alcun limite.



## **Lista contenente le path dei file dei video**

In [None]:
files_path = ["/content/drive/Shareddrives/Progetti FVAB 22 23 - VSR & BABELE/Gruppi/Gruppo 22/Progetto BABELE/FILE_PATH/Test_babele_lips.txt",
              "/content/drive/Shareddrives/Progetti FVAB 22 23 - VSR & BABELE/Gruppi/Gruppo 22/Progetto BABELE/FILE_PATH/Train_babele_lips.txt",
              "/content/drive/Shareddrives/Progetti FVAB 22 23 - VSR & BABELE/Gruppi/Gruppo 22/Progetto BABELE/FILE_PATH/Validation_babele_lips.txt"]

## **Generazione dataset**

In [None]:
from tqdm import tqdm
import pandas as pd

for path_file in files_path:

  with open(path_file, mode='r') as file:

    video_data = []  # Lista di tutti i dati dei video
    
    linee = file.readlines()

    csv_filename = os.path.basename(path_file)
    csv_filename_without_extension = csv_filename.split(".")[0] #per la creazione del dataset voglio un nome senza .txt
    estensione = "_SVD.csv"

    if "Validation" in path_file:
      csv_path = path_principale+"CSV_DATASET_CODIFICA_SPARSA/"+path_validation + csv_filename_without_extension + estensione
    elif "Train" in path_file:
      csv_path = path_principale+"CSV_DATASET_CODIFICA_SPARSA/"+path_train + csv_filename_without_extension + estensione
    elif "Test" in path_file:
       csv_path = path_principale+"CSV_DATASET_CODIFICA_SPARSA/"+path_test + csv_filename_without_extension + estensione
    else:
      print("Qualcosa è andato storto!")
      break
   
    for video_path in tqdm(linee, desc="Elaborazione video " + csv_filename_without_extension, unit="video"):
        video_path = video_path.strip() # Rimuovi il carattere di a capo dalla stringa

        # Apre il video in input
        cap = cv2.VideoCapture(str(video_path))

        filename = os.path.basename(video_path) #Nome del file video
        filename_without_extension = filename.split(".")[0] #per la creazione del dataset voglio un nome senza .avi
        components = filename.split("_")[:-1] #splitto l'array in un altro per "_" e elimino ".avi"
       
        target = int(components[0]) #Estrazione classe (lingua)

        frame_data = {}
        frame_data['video-frame'] = filename_without_extension # Alla fine collego al nome del file il frame analizzato
        colonna = 0

        # Inizializza la lista dei dati del video
        video_data_video = []

        # Loop attraverso i frame del video
        while True:
            # Leggi il frame successivo
            ret, frame = cap.read()
          
            # Se non ci sono più frame, esci dal loop
            if not ret:
                break
            
            # Converti l'immagine in scala di grigi
            gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            
            # Appiattisci l'immagine in un array 1D
            flat_frame = gray_frame.flatten()
            
            # Apprendi il dizionario sparsificato dal frame corrente
            dict_learning.partial_fit([flat_frame])

        # Recupera il dizionario appreso
        dictionary = dict_learning.components_[0]
        array = np.array(dictionary)
        dizionario = {i: v for i, v in enumerate(array)}
        frame_data.update(dizionario)


        frame_data['target']= target
        
        # Aggiungi il dizionario alla lista dei dati del video
        video_data_video.append(frame_data)

        # Aggiungi i dati del video alla lista di tutti i dati
        video_data.extend(video_data_video)

        # Chiudi il video
        cap.release()

    # Crea il dataframe dai dati di tutti i video e visualizzalo
    df = pd.DataFrame(video_data)
  
    # df_PCA = riduzione_PCA(df) 
    # Visualizza il DataFrame 
    display(df) 

    print(100*"-")
    print("Generazione del file csv ", csv_filename_without_extension, estensione)
    print(100*"-")
    df.to_csv(csv_path, index=False)

    file.close()

# **Applicazione PCA per  ridurre i dataset**

## **Calcolo numero di componenti ottimale**
Si cerca di mantenere una percentuale significativa di varianza, ad esempio il 95%, ma questo può variare in base alle esigenze specifiche del problema.

In [None]:
from sklearn.decomposition import PCA

def numero_componenti(df):
  # Seleziona solo le colonne numeriche per la PCA
  df_numeric = df.select_dtypes(include=[np.number])

  # Crea l'oggetto PCA e fitalo ai dati numerici
  pca = PCA()
  pca.fit(df_numeric)

  # Calcola la varianza cumulativa spiegata dalle componenti principali
  variance_cumulative = np.cumsum(pca.explained_variance_ratio_)

  # Calcola il numero di componenti principali per mantenere il 95% di varianza
  n_components = np.argmax(variance_cumulative >= 0.95) + 1

  print(f'Il numero di componenti principali del dataset per mantenere il 95% di varianza è {n_components}')

  return  n_components

## **Funzione che applica la PCA**

In [None]:
from sklearn.decomposition import PCA

def riduzione_PCA(df,components=0):
  df_numeric = df.drop(['video-frame'], axis=1)
  df_numeric = df_numeric.drop(['target'], axis=1)

  if components == 0:
    n_components = numero_componenti(df_numeric)
  else:
    n_components = components

  pca = PCA(n_components=n_components)
  df_pca = pca.fit_transform(df_numeric)

  df_result = pd.concat([df['video-frame'],  pd.DataFrame(df_pca), df['target']], axis=1)

  return df_result


## **Caricamento dataset  originali**

In [None]:
import pandas as pd

#Recupero dei vari csv
df_train = pd.read_csv(path_principale+"CSV_DATASET_CODIFICA_SPARSA/Train/Train_babele_lips_SVD.csv")
df_test = pd.read_csv(path_principale+"CSV_DATASET_CODIFICA_SPARSA/Test/Test_babele_lips_SVD.csv")
df_validation = pd.read_csv(path_principale+"CSV_DATASET_CODIFICA_SPARSA/Validation/Validation_babele_lips_SVD.csv")


## **Calcolo numero delle componenti per  la varianza al  95% per ogni dataset**

In [None]:
display(riduzione_PCA(df_train)) # varianza calcolata 8 

display(riduzione_PCA(df_test))  # varianza calcolata 20

display(riduzione_PCA(df_validation))  # varianza calcolata 4

## **Riduzione PCA a [4,8,20,40,60,80,120,160] componenti**
Nota: la scelta delle componenti si basa su di un range che va da 0 al min(n_samples, n_features), esempio: n_samples = 160 e n_features= 60000, abbiamo che il range in cui ci possiamo spostare è da 0 a 160

In [None]:
num_componenti = [4,8,20,40,60,80,120,160]

for componente in num_componenti:
  estensione = "_SVD_"+str(componente)+"C.csv"

  train_path = path_principale+"CSV_DATASET_CODIFICA_SPARSA/Train/Train_babele_lips" + estensione
  test_path = path_principale+"CSV_DATASET_CODIFICA_SPARSA/Test/Test_babele_lips" + estensione
  val_path = path_principale+"CSV_DATASET_CODIFICA_SPARSA/Validation/Validation_babele_lips" + estensione

  print(100*"-")
  dtr = riduzione_PCA(df_train,componente)
  print("Generazione del file csv ", "Train_babele_lips", estensione)
  dtr.to_csv(train_path, index=False)
  print(100*"-")

  print(100*"-")
  dte = riduzione_PCA(df_test,componente)
  print("Generazione del file csv ", "Test_babele_lips", estensione)
  dte.to_csv(test_path, index=False)
  print(100*"-")

  print(100*"-")
  dva = riduzione_PCA(df_validation,componente)
  print("Generazione del file csv ", "Validation_babele_lips", estensione)
  dva.to_csv(val_path, index=False)
  print(100*"-")

# Fusione dataset BABELE (features + CODIFICA SPARSA)

Dopo aver estratto i valori della CODIFICA SPARSA dal video (prendiamo la codifica sparsa da 20 componenti), questi vengono uniti al dataset di features di 60 frames per ogni video.

In [None]:
import pandas as pd

#Recupero dei vari csv
df_train_svd = pd.read_csv(path_principale+"CSV_DATASET_CODIFICA_SPARSA/Train/Train_babele_lips_SVD_20C.csv")
df_test_svd = pd.read_csv(path_principale+"CSV_DATASET_CODIFICA_SPARSA/Test/Test_babele_lips_SVD_20C.csv")
df_validation_svd = pd.read_csv(path_principale+"CSV_DATASET_CODIFICA_SPARSA/Validation/Validation_babele_lips_SVD_20C.csv")

print(50*"-"+"Train - CODIFICA_SPARSA"+"-"*50)
print(df_train_svd)
print(50*"-"+"Test - CODIFICA_SPARSA"+"-"*50)
print(df_test_svd)
print(50*"-"+"Validation - CODIFICA_SPARSA"+"-"*50)
print(df_validation_svd)


df_train_features = pd.read_csv(path_principale+"CSV_DATASET/Train/Train_babele_cityblock_60FPS.csv")
df_test_features = pd.read_csv(path_principale+"CSV_DATASET/Test/Test_babele_cityblock_60FPS.csv")
df_validation_features = pd.read_csv(path_principale+"CSV_DATASET/Validation/Validation_babele_cityblock_60FPS.csv")

print(50*"-"+"Train - features"+"-"*50)
print(df_train_features)
print(50*"-"+"Test - features"+"-"*50)
print(df_test_features)
print(50*"-"+"Validation - features"+"-"*50)
print(df_validation_features)

# Unione dei dataset di validation
# Rimuove la colonna 'video-frame'e 'target' dal dataframe SVD
df_train_features.drop(['video-frame'], axis=1, inplace=True)
df_test_features.drop(['video-frame'], axis=1, inplace=True)
df_validation_features.drop(['video-frame'], axis=1, inplace=True)

df_train_svd.drop(['target'], axis=1, inplace=True)
df_test_svd.drop(['target'], axis=1, inplace=True)
df_validation_svd.drop(['target'], axis=1, inplace=True)

# Unione dei dataset di train
df_train_merged = pd.concat([df_train_svd, df_train_features], axis=1)

# Unione dei dataset di test
df_test_merged = pd.concat([df_test_svd, df_test_features], axis=1)

# Unione dei dataset di validation
df_validation_merged = pd.concat([df_validation_svd, df_validation_features], axis=1)


print(50*"-"+"Train - features + codifica sparsa"+"-"*50)
display(df_train_merged)
print(50*"-"+"Test - features + codifica sparsa"+"-"*50)
display(df_test_merged)
print(50*"-"+"Validation - features + codifica sparsa"+"-"*50)
display(df_validation_merged)

print("Generazione del file csv -- Train_features+svd")
df_train_merged.to_csv(path_principale+"CSV_DATASET_MERGED/Train_features+svd.csv", index=False)
print("Generazione del file csv -- Test_features+svd")
df_test_merged.to_csv(path_principale+"CSV_DATASET_MERGED/Test_features+svd.csv", index=False)
print("Generazione del file csv -- Validation_features+svd")
df_validation_merged.to_csv(path_principale+"CSV_DATASET_MERGED/Validation_features+svd.csv", index=False)

