In [None]:
from google.colab import drive
drive.mount('/content/drive')

# **Transcription and MP3 to MP4**

In [None]:
!pip install SpeechRecognition pydub
!apt-get install ffmpeg -y

In [None]:
import speech_recognition as sr
from pydub import AudioSegment
import os

# Directorio donde están los videos
# Change with the path of the videos (Train, Test, Val)
video_dir = "/content/drive/MyDrive/ElderReact_Data/ElderReact_train"
output_dir = "/content/drive/MyDrive/EderReact/ElderReact_train"

# Crear el directorio de salida si no existe
os.makedirs(output_dir
            , exist_ok=True)

# Inicializar el reconocedor de voz
r = sr.Recognizer()

# Procesar cada archivo de video en el directorio
for video_file in os.listdir(video_dir):
    if video_file.endswith(".mp4"):  # Verificar que sea un archivo .mp4
        video_path = os.path.join(video_dir, video_file)
        audio_output = os.path.join(output_dir, f"{os.path.splitext(video_file)[0]}.wav")
        text_output = os.path.join(output_dir, f"{os.path.splitext(video_file)[0]}.txt")

        # Convertir video a audio
        video = AudioSegment.from_file(video_path, format="mp4")
        audio = video.set_channels(1).set_frame_rate(16000).set_sample_width(2)
        audio.export(audio_output, format="wav")

        # Transcribir el audio
        with sr.AudioFile(audio_output) as source:
            audio_text = r.record(source)

        try:
            # Reconocimiento de texto
            text = r.recognize_google(audio_text, language='en-US')

            # Guardar la transcripción en un archivo de texto
            with open(text_output, "w") as file:
                file.write(text)

            print(f"Transcripción guardada en: {text_output}")
        except sr.UnknownValueError:
            print(f"No se pudo transcribir el audio de: {video_file}")
        except sr.RequestError as e:
            print(f"Error con el servicio de reconocimiento de voz: {e}")

print("¡Transcripción completada para todos los videos!")

# Filter and save in individual files

In [None]:
#change with the path of each file (Train, Test, Val)
ruta = pathlib.Path('/content/drive/MyDrive/EderReact/ElderReact_train')
paths = []
for archivo in ruta.glob('*.wav'):
  paths.append(archivo.name)
  #print(archivo.name)

In [None]:
ruta = pathlib.Path('/content/drive/MyDrive/EderReact/ElderReact_train')
paths_txt = []
for archivo in ruta.glob('*.txt'):
  paths_txt.append(archivo.name)
  #print(archivo.name)

In [None]:
# Crear dataframes individuales para cada tipo de archivo
df_wav = pd.DataFrame({'name': [archivo.split('.')[0] for archivo in paths], 'wav': paths})
df_txt = pd.DataFrame({'name': [archivo.split('.')[0] for archivo in paths_txt], 'txt': paths_txt})


# Unir los dataframes basados en la columna 'name'
train_df = pd.merge(df_wav, df_txt, on='name', how='outer')
train_df = pd.merge(train_df, df_tg, on='name', how='outer')


train_df
train_df.to_csv('train.csv', index=False)

# Map labels

In [None]:
import csv
#1->filename, 2->Anger, 3->Disgust, 4->Fear, 5->Happiness, 6->Sadness, 7->Surprise, 8->Gender, 9->Valence).
with open('/content/train_labels.txt', 'r') as in_file:
    stripped = (line.strip() for line in in_file)
    lines = (line.split(" ") for line in stripped if line)
    with open('log.csv', 'w') as out_file:
        writer = csv.writer(out_file)
        writer.writerow(('filename', 'Anger','Disgust','Fear','Happiness','Sadness','Surprise','Gender','Valence'))
        writer.writerows(lines)

In [None]:
# Ruta train
path_train = '/content/train.csv'
df_files = pd.read_csv(path_train)
# Ruta labels
path_labels = '/content/log.csv'
df_labels = pd.read_csv(path_labels)
#Eliminar extensiones
df_labels['name'] = df_labels['filename'].str.split('.').str[0]

df_combined = pd.merge(df_files, df_labels, on='name', how='inner')

df_combined.to_csv('train_labels.csv', index=False)

# **Some visualizations**

In [None]:
test = pd.read_csv('/content/drive/MyDrive/EderReact/test_labels.csv')
test

In [None]:
# graficar las emociones de anger,disgust,fear,happiness,sadness,surprise
import matplotlib.pyplot as plt
import seaborn as sns

emociones = ['Anger', 'Disgust', 'Fear', 'Happiness', 'Sadness', 'Surprise']
conteo = test[emociones].sum()

plt.figure(figsize=(10, 6))
sns.barplot(x=conteo.index, y=conteo.values)
plt.xlabel('Emociones')
plt.ylabel('Conteo')

In [None]:
test.isnull().sum()

# Nosie visual and reduction

In [None]:
!pip install noisereduce

In [None]:
from os.path import dirname, join as pjoin
from scipy.io import wavfile
import scipy.io

In [None]:
file_path = '/content/drive/MyDrive/EderReact/ElderReact_train'

In [None]:
ruta = pathlib.Path(file_path)
train = []
for archivo in ruta.glob('*.wav'):
  train.append(archivo)
  #print(archivo.name)

In [None]:
train[0]

In [None]:
import noisereduce as nr
import numpy as np
# load data
rate, data = wavfile.read(train[0])
orig_shape = data.shape
# perform noise reduction
reduced_noise = nr.reduce_noise(y=data,
                                sr=rate)

In [None]:
reduced_noise

In [None]:
plt.figure(figsize=(12, 6))

plt.subplot(2, 1, 1)
plt.plot(data, color='gray')
plt.title("Audio Original")
plt.xlabel("Muestras")
plt.ylabel("Amplitud")

plt.subplot(2, 1, 2)
plt.plot(reduced_noise, color='purple')
plt.title("Audio con Reducción de Ruido")
plt.xlabel("Muestras")
plt.ylabel("Amplitud")

plt.tight_layout()
plt.show()

In [None]:
import librosa
import librosa.display
import numpy as np
import matplotlib.pyplot as plt

# Cargar el audio
file_path = train[0]  # Cambia esto por tu archivo
audio, sr = librosa.load(file_path, sr=None)

# Tomar una parte de silencio (primeros 0.5 segundos)
silence_part = audio[:int(sr * 0.5)]

# Graficar el ruido en el tiempo
plt.figure(figsize=(10, 4))
plt.plot(silence_part, color='purple')
plt.xlabel("Tiempo (muestras)")
plt.ylabel("Amplitud")
plt.title("Forma de Onda del Ruido")
plt.show()

In [None]:
import librosa
import noisereduce as nr
import IPython.display as ipd

file_path = train[0]
audio, sr = librosa.load(file_path, sr=None)

# Reducir ruido
reduced_audio = nr.reduce_noise(y=audio, sr=sr)

#  audio original
print("🔊 Audio Original:")
ipd.display(ipd.Audio(audio, rate=sr))

# audio con reducción de ruido
print("🔊 Audio con Ruido Reducido:")
ipd.display(ipd.Audio(reduced_audio, rate=sr))

# Average Noise

In [None]:
import os
import numpy as np
import librosa
import matplotlib.pyplot as plt

# Ruta donde están los archivos de audio
audio_folder = "/content/drive/MyDrive/EderReact/ElderReact_train"

# Lista de archivos en la carpeta
audio_files = [f for f in os.listdir(audio_folder) if f.endswith(".wav")]

# Lista para almacenar los niveles de ruido
noise_levels = []

for file in audio_files:
    file_path = os.path.join(audio_folder, file)

    # Cargar el audio
    audio, sr = librosa.load(file_path, sr=None)

    # Tomar una parte silenciosa (los primeros 0.5 segundos)
    silence_part = audio[:int(sr * 0.5)]

    # Calcular el nivel de ruido (energía media del silencio)
    noise_energy = np.mean(np.abs(silence_part))

    # Guardar nivel de ruido del archivo
    noise_levels.append(noise_energy)

# Crear gráfico de barras
plt.figure(figsize=(12, 6))
plt.bar(audio_files, noise_levels, color='purple')
plt.xlabel("Archivos de Audio")
plt.ylabel("Nivel de Ruido")
plt.title("Ruido Promedio por Archivo de Audio")
plt.show()

# Duration of audios

In [None]:
import seaborn as sns

plt.figure(figsize=(8, 5))
sns.boxplot(x=df_duraciones['Duración (s)'], color='mediumpurple')
plt.title('Distribución de Duraciones de Audios')
plt.xlabel('Duración (segundos)')
plt.tight_layout()
plt.show()

In [None]:
plt.figure(figsize=(8, 5))
sns.histplot(df_duraciones['Duración (s)'], bins=20, kde=True, color='indigo')
plt.title('Histograma de Duraciones de Audios')
plt.xlabel('Duración (segundos)')
plt.ylabel('Cantidad de Audios')
plt.tight_layout()
plt.show()


# Text preprpocessing

map text

In [None]:
# archivo train
test_path = pathlib.Path('/content/drive/MyDrive/EderReact/ElderReact_train')
test_text = []
for archivo in test_path.glob('*.txt'):
  contenido = archivo.read_text()
  test_text.append({'file': archivo.name, 'text': contenido})
  #print(archivo.name)
  #print(archivo.read_text())

In [None]:
test_text = pd.DataFrame(test_text)
test_text

In [None]:
test_text_copy['text'] = test_text_copy["text"].str.lower()
test_text_copy.head()

In [None]:
!pip install contractions

In [None]:
import contractions
test_text_copy['text'] = test_text_copy['text'].apply(contractions.fix)
test_text_copy['text']

In [None]:
PUNCT_TO_REMOVE = string.punctuation
def remove_punctuation(text):
    """custom function to remove the punctuation"""
    return text.translate(str.maketrans('', '', PUNCT_TO_REMOVE))

test_text_copy["text"] = test_text_copy["text"].apply(lambda text: remove_punctuation(text))
test_text_copy.head()

In [None]:
!pip install nltk

In [None]:
import nltk

from nltk.stem import PorterStemmer, WordNetLemmatizer
from nltk.corpus import wordnet

# Download necessary NLTK data
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('averaged_perceptron_tagger_eng') # Download the correct package here.
nltk.download('punkt_tab')
text_lemm = []
for text in test_text_copy['text']:
    # Tokenize the text
    words = nltk.word_tokenize(text)
    lemmatizer = WordNetLemmatizer()
    # Function to get the part of speech tag for lemmatization
    def get_wordnet_pos(word):
        tag = nltk.pos_tag([word])[0][1][0].upper()
        tag_dict = {"J": wordnet.ADJ, "N": wordnet.NOUN, "V": wordnet.VERB, "R": wordnet.ADV}
        return tag_dict.get(tag, wordnet.NOUN)

    # Apply lemmatization
    lemmatized_words = [lemmatizer.lemmatize(word, get_wordnet_pos(word)) for word in words]
    # Join the lemmatized words back into a single string
    lemmatized_text = ' '.join(lemmatized_words)
    text_lemm.append(lemmatized_text)
    print(lemmatized_text)

In [None]:
test_text_copy['texto_lematizado'] = text_lemm
test_text_copy

In [None]:
test_labels = pd.read_csv('/content/drive/MyDrive/EderReact/train_labels.csv')
test_labels

# **EMBEDDINGS**

Pre save all texts and their labels ina csv file

# Text embeddings

In [None]:
import torch
from transformers import BertTokenizer, BertModel
import pandas as pd

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')


df = pd.read_csv('C:/Users/carod/Documents/todos.csv')

# Lista para almacenar los embeddings
embeddings_list = []

# Iterar sobre cada texto del DataFrame
for idx, row in df.iterrows():
    text = row["text"]  # Obtener el texto
    name = row["file"]  # Obtener el nombre del archivo

    # Tokenización
    encoding = tokenizer(text, padding=True, truncation=True, return_tensors='pt', add_special_tokens=True)

    # Obtener los embeddings de la última capa
    with torch.no_grad():
        output = model(**encoding)

    # Extraer el embedding de la última capa (CLS token)
    embedding = output.last_hidden_state[:, 0, :].squeeze().numpy()  # (1, 768) → (768,)

    # Guardar en una lista
    embeddings_list.append([name] + embedding.tolist())

# Crear un DataFrame con los embeddings
columns = ["name"] + [f"dim_{i+1}" for i in range(768)]
embeddings_df = pd.DataFrame(embeddings_list, columns=columns)

# Guardar en un CSV
embeddings_df.to_csv("embeddings.csv", index=False)

print("Embeddings guardados en 'embeddings.csv'")

# Audio Embeddings

In [None]:
import librosa
import numpy as np
import torch
import csv
from os import listdir
from transformers import Wav2Vec2FeatureExtractor, WavLMForXVector

# Ruta de los archivos de audio
ruta1 = 'C:/Users/carod/Documents/Solo_mitadTrain'
archivos = sorted([f for f in listdir(ruta1) if f.endswith('.wav')])

# Cargar el modelo y el feature extractor una sola vez
feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained('microsoft/wavlm-base-plus-sv')
model = WavLMForXVector.from_pretrained('microsoft/wavlm-base-plus-sv')

# Nombre del archivo CSV
nombre_archivo = "vectores_train2.csv"

# Definir tamaño del lote
batch_size = 10
num_batches = len(archivos) // batch_size + (1 if len(archivos) % batch_size != 0 else 0)

# Escribir encabezado (nombre del archivo + 512 dimensiones)
with open(nombre_archivo, 'w', newline='') as archivo:
    escritor_csv = csv.writer(archivo)
    escritor_csv.writerow(["archivo"] + [f"dim_{i}" for i in range(512)])

# Procesar archivos en lotes
for batch in range(num_batches):
    start_idx = batch * batch_size
    end_idx = min((batch + 1) * batch_size, len(archivos))

    datos_concatenados = []
    nombres_archivos = []  # Lista para guardar los nombres

    print(f"Procesando lote {batch + 1}/{num_batches}...")

    for a in archivos[start_idx:end_idx]:
        try:
            r = f"{ruta1}/{a}"
            y, sr = librosa.load(r, sr=16000)

            # Forzar una longitud estándar (ejemplo: 5 segundos = 80000 muestras)
            max_length = 80000  # 5 segundos a 16kHz
            if len(y) < max_length:
                y = np.pad(y, (0, max_length - len(y)))  # Rellenar con ceros
            else:
                y = y[:max_length]  # Cortar si es más largo

            datos_concatenados.append(y)
            nombres_archivos.append(a)  # Guardar el nombre del archivo

        except Exception as e:
            print(f"⚠️ Error con archivo {a}: {e}")

    if not datos_concatenados:
        continue  # Si no hay datos válidos, saltar el lote

    # Convertir a tensores y obtener embeddings
    datos_convertidos = [np.array(datos) for datos in datos_concatenados]
    inputs = feature_extractor(datos_convertidos, padding=True, return_tensors="pt")

    with torch.no_grad():  # Desactivar gradientes para ahorrar memoria
        embeddings = model(**inputs).embeddings
        embeddings1 = torch.nn.functional.normalize(embeddings, dim=-1).cpu()

    vectores1 = embeddings1.tolist()

    # Guardar los vectores en el archivo CSV con el nombre del archivo
    with open(nombre_archivo, 'a', newline='') as archivo:
        escritor_csv = csv.writer(archivo)
        for nombre, vector in zip(nombres_archivos, vectores1):
            escritor_csv.writerow([nombre] + vector)

    # Liberar memoria
    del datos_concatenados, nombres_archivos, datos_convertidos, inputs, embeddings, embeddings1
    torch.cuda.empty_cache()

print("Procesamiento completado.")

# Characteristics from video using Mediapipe Facemesh

In [None]:
import pathlib

carpeta_train = "C:/Users/carod/Documents/ElderReact_dev"

train_videos = list(pathlib.Path(carpeta_train).glob('*.mp4'))

import cv2
import mediapipe as mp
import numpy as np
import pandas as pd
import os

# Inicializar MediaPipe FaceMesh
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=False, max_num_faces=1, refine_landmarks=True)

# Puntos de referencia
EYEBROW_RIGHT = [143, 156, 70, 63, 105, 66, 107]
EYEBROW_LEFT = [336, 296, 334, 293, 300, 383, 372]
FOREHEAD = [10]

UPPER_MOUTH = [78, 191, 80, 81, 82, 13, 312, 311, 310, 415, 308]
LOWER_MOUTH = [78, 95, 88, 178, 87, 14, 317, 402, 318, 324, 308]

UPPER_EYELID_RIGHT = [159, 160, 161]
LOWER_EYELID_RIGHT = [145, 144, 153]
UPPER_EYELID_LEFT = [386, 385, 384]
LOWER_EYELID_LEFT = [374, 380, 373]

CHEEKS = [93, 323]  # Puntos de las mejillas

# Función para calcular distancia euclidiana
def euclidean_distance(pt1, pt2):
    return np.linalg.norm(np.array(pt1) - np.array(pt2))

# Lista con las rutas de los videos
video_paths = train_videos

# Ruta del CSV donde se guardarán los resultados
output_csv = "C:/Users/carod/Documents/resultadosVal.csv-"

# Procesar cada video en la lista
for video_path in video_paths:
    video_name = os.path.basename(video_path)  # Extraer el nombre del archivo
    cap = cv2.VideoCapture(video_path)
    data = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        h, w, _ = frame.shape
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = face_mesh.process(rgb_frame)

        if results.multi_face_landmarks:
            for face_landmarks in results.multi_face_landmarks:
                landmarks = [(int(pt.x * w), int(pt.y * h)) for pt in face_landmarks.landmark]

                # Distancia de las cejas a la frente
                forehead_right_distances = [euclidean_distance(landmarks[p], landmarks[FOREHEAD[0]]) for p in EYEBROW_RIGHT]
                forehead_left_distances = [euclidean_distance(landmarks[p], landmarks[FOREHEAD[0]]) for p in EYEBROW_LEFT]

                avg_forehead_right = np.mean(forehead_right_distances)
                avg_forehead_left = np.mean(forehead_left_distances)

                # Distancia de las mejillas al párpado inferior
                cheek_right_distance = np.mean([euclidean_distance(landmarks[CHEEKS[0]], landmarks[p]) for p in LOWER_EYELID_RIGHT])
                cheek_left_distance = np.mean([euclidean_distance(landmarks[CHEEKS[1]], landmarks[p]) for p in LOWER_EYELID_LEFT])

                # Apertura de los ojos
                eye_right_opening = np.mean([euclidean_distance(landmarks[u], landmarks[l]) for u, l in zip(UPPER_EYELID_RIGHT, LOWER_EYELID_RIGHT)])
                eye_left_opening = np.mean([euclidean_distance(landmarks[u], landmarks[l]) for u, l in zip(UPPER_EYELID_LEFT, LOWER_EYELID_LEFT)])

                # Apertura de la boca
                mouth_opening = np.mean([euclidean_distance(landmarks[u], landmarks[l]) for u, l in zip(UPPER_MOUTH, LOWER_MOUTH)])

                data.append([
                    avg_forehead_right, avg_forehead_left, cheek_right_distance, cheek_left_distance,
                    eye_right_opening, eye_left_opening, mouth_opening
                ])

    cap.release()

    # Crear DataFrame con los datos del video
    df = pd.DataFrame(data, columns=[
        'forehead_right_distance', 'forehead_left_distance',
        'cheek_right_distance', 'cheek_left_distance',
        'eye_right_opening', 'eye_left_opening', 'mouth_opening'
    ])

    # Calcular la media de cada columna
    if not df.empty:  # Verificar que el DataFrame no esté vacío
        mean_vector = df.mean().to_frame().T  # Convertir a DataFrame de una sola fila
        mean_vector['video_name'] = video_name  # Agregar el nombre del video como columna

        # Guardar en CSV (si el archivo no existe, crea uno nuevo con encabezado, si existe, agrega datos)
        if not os.path.exists(output_csv):
            mean_vector.to_csv(output_csv, index=False)  # Escribir con encabezado
        else:
            mean_vector.to_csv(output_csv, mode='a', header=False, index=False)  # Agregar sin encabezado

    print(f"Procesamiento completado para {video_name}")

print("Todos los videos han sido procesados. Resultados guardados en", output_csv)
