# Creating landmark and mel-spectrogram data

In [12]:
import librosa
import librosa.display
import numpy as np
import soundfile as sf
import moviepy.editor as movp
import os
import warnings
import mediapipe as mp
import cv2
import pandas as pd
# mediapipe
mp_drawing = mp.solutions.drawing_utils #visualizing poses using visual indicators
mp_pose = mp.solutions.pose #pose estimation models (solutions)
# extracting audio from video (ffmpeg)
os.environ["IMAGEIO_FFMPEG_EXE"] = "/opt/homebrew/opt/ffmpeg/bin/ffmpeg"

def Euc_dist(pts, cm, img_h, img_w):
        distances_sq = (pts - cm) ** 2
        euc_distances = distances_sq.sum(axis = 1)
        diag_length = np.sqrt(img_h**2 + img_w**2)
        return euc_distances * diag_length

def landmark_data_gen(vid_dir, landmark_dir):
    # suppress warning messages
    warnings.filterwarnings('ignore')

    landmarks = [f'landmark_{i}' for i in range(0, 33)]
    features = ['x', 'y', 'distance_to_cm']
    comprehensive_landmarks_df_columns = pd.MultiIndex.from_product([landmarks, features], names=['landmark', 'feature'])
    comprehensive_landmarks_df = pd.DataFrame(columns = comprehensive_landmarks_df_columns)

    cap = cv2.VideoCapture(vid_dir)
    with mp_pose.Pose(min_detection_confidence=0.5, # using the pose estimation model
                    min_tracking_confidence=0.5) as pose:
        while cap.isOpened():
            ret, frame = cap.read() # frame is the actual image of each frame

            if not ret: # ending the loop when the video is done playing
                break
            
            image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            image.flags.writeable = False #not modifiable
            results = pose.process(image) #making the actual detection
            image.flags.writeable = True #now modifiable
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) #opencv wants image file in BGR format, we need to rerender the images
            image_h, image_w, _ = image.shape
            
            if results.pose_landmarks: #sometimes cannot capture any landmarks (mediapipe requires full body view for processing)
                landmarks = results.pose_landmarks.landmark #list of pose landmarks

                # convert landmark object into dataframe
                landmarks_df = pd.DataFrame(columns=['x', 'y'])
                for idx, landmark in enumerate(landmarks):
                    new_entry = np.array([landmark.x, landmark.y])
                    landmarks_df.loc[idx,:] = new_entry

                ######## For All landmarks #########
                # for all landmark points, keep track of their relative position w.r.t to center of mass (important for posture reconstruction)
                all_cm_x, all_cm_y = landmarks_df['x'].mean(), landmarks_df['y'].mean()
                landmarks_df['distance_to_cm'] = Euc_dist(np.array(landmarks_df.loc[:, ['x', 'y']]), 
                                                        np.array([all_cm_x, all_cm_y]),
                                                        image_h, image_w)

                ######## Storing all important features #########
                # x, y coordinates, distance of each landmark from the center of mass
                new_data = pd.DataFrame(columns=comprehensive_landmarks_df_columns)
                for landmark_idx in range(33):
                    landmark_name = f"landmark_{landmark_idx}"
                    for feature_idx in range(len(features)):
                        feature_name = features[feature_idx]
                        new_data[(landmark_name, feature_name)] = [landmarks_df.loc[:, feature_name].iloc[landmark_idx]]
                comprehensive_landmarks_df = pd.concat([comprehensive_landmarks_df, new_data], ignore_index=True)

                # finding and visualizing center of mass
                cm_x, cm_y = landmarks_df['x'].mean(), landmarks_df['y'].mean()
                cm_x_coord, cm_y_coord = int(cm_x * image_w), int(cm_y * image_h)
                cv2.circle(img = image, 
                        center = (cm_x_coord, cm_y_coord), 
                        radius=5, 
                        color = (0, 0, 255), 
                        thickness = -1)

            else:
                pass #when landmarks are not detected

            mp_drawing.draw_landmarks(image = image, 
                                    landmark_list = results.pose_landmarks, #coordinate of each landmark detected by the pose estimation model
                                    connections = mp_pose.POSE_CONNECTIONS, #the connections of each pose landmark
                                    landmark_drawing_spec = mp_drawing.DrawingSpec(color = (245, 117, 66), thickness=2, circle_radius=2),
                                    connection_drawing_spec = mp_drawing.DrawingSpec(color = (245, 117, 66), thickness=2, circle_radius=2))

            cv2.imshow('Webcam Feed', image)
            
            if cv2.waitKey(10) & 0xFF == ord('q'): #0xFF is checking which key is being pressed
                break

        np.save(landmark_dir, comprehensive_landmarks_df)
        cap.release() #release the use of webcame
        cv2.destroyAllWindows() #close all cv2 associated windows
        cv2.waitKey(1) #need this line!! else the mediapipe window maynot close properly for running on local vscode (basically giving the kernel enough time to close the windows)

    # Re-enable warnings
    warnings.filterwarnings('default')

def audio_data_gen(vid_dir, audio_dir, mel_dir):

    video = movp.VideoFileClip(vid_dir)

    # Extract the audio from the video
    audio_path = audio_dir
    video.audio.write_audiofile(audio_path)

    # create spectrogram based on audio
    y, sr = librosa.load(audio_path)
    mel_spectrogram = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=2048, hop_length=512).T # [mel_bins, n_segments] --> [n_segments, mel_bins]
    np.save(mel_dir, mel_spectrogram) # saving the numpy array

def data_generation(main_dir):
    vid_folder = os.path.join(main_dir, 'video')

    for vid_file in os.listdir(vid_folder):
        if vid_file != ".DS_Store":
            vid_dir = os.path.join(vid_folder, vid_file)
            vid_name = vid_file.split('.')[0]

            # create landmark data
            landmark_dir = os.path.join(main_dir, 'landmark', f'{vid_name}_landmark.npy')
            landmark_data_gen(vid_dir, landmark_dir)

            # create mel_spectrogram data
            audio_dir = os.path.join(main_dir, 'audio', f'{vid_name}_audio.wav')
            mel_dir = os.path.join(main_dir, 'mel_spectrogram', f'{vid_name}_mel_spectrogram.npy')
            audio_data_gen(vid_dir, audio_dir, mel_dir)

In [None]:
data_generation('/Users/liuchenshu/Documents/Research/Human Posture/mediadata')

# Creating DataLoader

## First Option: Unify the length for each batch (not across the entire dataset)

In [16]:
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd
from torch.nn.utils.rnn import pad_sequence
import os

class posture2melDataset(Dataset):
    def __init__(self, landmark_dir, mel_dir):
        self.landmark_dir = landmark_dir
        self.mel_dir = mel_dir
    
    def __len__(self): # return number of samples
        return len(self.landmark_dir)
    
    def __getitem__(self, idx):
        landmark_data = np.load(self.landmark_dir[idx])
        mel_data = np.load(self.mel_dir[idx])
        landmark_tensor = torch.tensor(landmark_data, dtype = torch.float32)
        mel_tensor = torch.tensor(mel_data, dtype = torch.float32)
        return landmark_tensor, mel_tensor

# for unifying sequence length in batch
def collate_fn(batch):
    landmark_data = [item[0] for item in batch]
    mel_data = [item[1] for item in batch]
    # make sure the sequences are padded according to the longest sequence in the batch
    landmark_data_padded = pad_sequence(landmark_data, batch_first=True) # [batch_size, max_seq_length, n_features]
    mel_data_padded = pad_sequence(mel_data, batch_first=True)

    # for ignoring padding when feeding into transformer (i.d.ing where the paddings are)
    landmark_mask = (landmark_data_padded == 0)
    mel_mask = (mel_data_padded == 0)

    return landmark_data_padded, mel_data_padded, landmark_mask, mel_mask

main_dir = '/Users/liuchenshu/Documents/Research/Human Posture/mediadata'  
landmark_paths = [os.path.join(main_dir, 'landmark', landmark_name) for landmark_name in os.listdir(os.path.join(main_dir, 'landmark')) if landmark_name != '.DS_Store']
print(landmark_paths)
mel_paths = [os.path.join(main_dir, 'mel_spectrogram', landmark_name) for landmark_name in os.listdir(os.path.join(main_dir, 'mel_spectrogram')) if landmark_name != '.DS_Store']
print(mel_paths)

dataset = posture2melDataset(landmark_paths, mel_paths)
data_loader = DataLoader(dataset, batch_size = 4, shuffle = True, collate_fn=collate_fn) #collate_fn make sure the window length is same in each batch

['/Users/liuchenshu/Documents/Research/Human Posture/mediadata/landmark/A good time on vacation _Alexander A ！_landmark.npy', '/Users/liuchenshu/Documents/Research/Human Posture/mediadata/landmark/Love you _Alexander A !_landmark.npy', "/Users/liuchenshu/Documents/Research/Human Posture/mediadata/landmark/Let's have a party！ #dance _Alexander A !_landmark.npy", '/Users/liuchenshu/Documents/Research/Human Posture/mediadata/landmark/Shining in the darkness of the night_landmark.npy', '/Users/liuchenshu/Documents/Research/Human Posture/mediadata/landmark/Let me tell you a secret…_landmark.npy', '/Users/liuchenshu/Documents/Research/Human Posture/mediadata/landmark/Break through yourself ！_landmark.npy', '/Users/liuchenshu/Documents/Research/Human Posture/mediadata/landmark/The Walker Out of the Night！ Alexander A !_landmark.npy', '/Users/liuchenshu/Documents/Research/Human Posture/mediadata/landmark/scorching summer day _Alexander A ！_landmark.npy', '/Users/liuchenshu/Documents/Research/H

In [None]:
for landmark, mel_spectrogram, landmark_mask, mel_mask in data_loader:
    print(landmark.shape)
    print(mel_spectrogram.shape)
    break

## Second Option: Unify length according to longest sequence across the entire dataset

In [69]:
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd
import os

class PaddedDataset(Dataset): # unifying across the entire dataset
    def __init__(self, input_file_dir, output_file_dir):
        # Load the input and output datasets
        self.input_file_dir = input_file_dir
        self.output_file_dir = output_file_dir
        self.inputs = []
        for input_dir in self.input_file_dir:
            self.inputs.append(np.load(input_dir))
        self.outputs = []
        for output_dir in self.output_file_dir:
            self.outputs.append(np.load(output_dir))

        # Calculate the maximum length across the whole dataset for inputs and outputs (NUCLEAR option!!!)
        self.max_input_len = max(inp.shape[0] for inp in self.inputs)
        self.max_output_len = max(out.shape[0] for out in self.outputs)

    def __len__(self):
        return len(self.input_file_dir)

    def __getitem__(self, idx):
        input_seq = torch.tensor(self.inputs[idx], dtype=torch.float32)
        output_seq = torch.tensor(self.outputs[idx], dtype=torch.float32)

        # Pad the input sequence to max_input_len
        padded_input = torch.cat([input_seq, torch.zeros(self.max_input_len - input_seq.size(0), input_seq.size(1))], dim=0)

        # Create input mask
        input_mask = (padded_input.sum(dim=1) != 0)  # Shape: (max_input_len, )

        # Pad the output sequence to max_output_len
        padded_output = torch.cat([output_seq, torch.zeros(self.max_output_len - output_seq.size(0), output_seq.size(1))], dim=0)

        # Create output mask
        output_mask = (padded_output.sum(dim=1) != 0)  # Shape: (max_output_len, )

        return {
            'input': padded_input,
            'target': padded_output,
            'input_mask': input_mask,
            'target_mask': output_mask
        }

main_dir = '/Users/liuchenshu/Documents/Research/Human Posture/mediadata'  
landmark_paths = [os.path.join(main_dir, 'landmark', landmark_name) for landmark_name in os.listdir(os.path.join(main_dir, 'landmark')) if landmark_name != '.DS_Store']
mel_paths = [os.path.join(main_dir, 'mel_spectrogram', landmark_name) for landmark_name in os.listdir(os.path.join(main_dir, 'mel_spectrogram')) if landmark_name != '.DS_Store']
dataset = PaddedDataset(landmark_paths, mel_paths)

# Create the DataLoader
dataloader = DataLoader(dataset, batch_size=1)

# Example of iterating over the DataLoader
for idx, batch in enumerate(dataloader):
    print(f"batch {idx}")
    inputs = batch['input']  # Shape: (batch_size, max_input_len, input_dim)
    targets = batch['target']  # Shape: (batch_size, max_output_len, output_dim)
    input_masks = batch['input_mask']  # Shape: (batch_size, max_input_len, 1)
    target_masks = batch['target_mask']  # Shape: (batch_size, max_output_len, 1)
    print(inputs.shape, targets.shape, input_masks.shape, target_masks.shape)


batch 0
torch.Size([1, 1867, 99]) torch.Size([1, 1352, 128]) torch.Size([1, 1867]) torch.Size([1, 1352])
batch 1
torch.Size([1, 1867, 99]) torch.Size([1, 1352, 128]) torch.Size([1, 1867]) torch.Size([1, 1352])
batch 2
torch.Size([1, 1867, 99]) torch.Size([1, 1352, 128]) torch.Size([1, 1867]) torch.Size([1, 1352])
batch 3
torch.Size([1, 1867, 99]) torch.Size([1, 1352, 128]) torch.Size([1, 1867]) torch.Size([1, 1352])
batch 4
torch.Size([1, 1867, 99]) torch.Size([1, 1352, 128]) torch.Size([1, 1867]) torch.Size([1, 1352])
batch 5
torch.Size([1, 1867, 99]) torch.Size([1, 1352, 128]) torch.Size([1, 1867]) torch.Size([1, 1352])
batch 6
torch.Size([1, 1867, 99]) torch.Size([1, 1352, 128]) torch.Size([1, 1867]) torch.Size([1, 1352])
batch 7
torch.Size([1, 1867, 99]) torch.Size([1, 1352, 128]) torch.Size([1, 1867]) torch.Size([1, 1352])
batch 8
torch.Size([1, 1867, 99]) torch.Size([1, 1352, 128]) torch.Size([1, 1867]) torch.Size([1, 1352])
batch 9
torch.Size([1, 1867, 99]) torch.Size([1, 1352, 

# Transformer + GAN Mel-spectrogram reconstruction

In [70]:
import torch
import torch.nn as nn
import math

class PositionalEncoding(nn.Module):
    def __init__(self, hidden_dim, max_len=5000):
        super(PositionalEncoding, self).__init__()
        
        # Create constant 'pe' matrix with values dependent on position and i
        pe = torch.zeros(max_len, hidden_dim)  # Initialize a positional encoding matrix of shape (max_len, hidden_dim)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)  # A column vector [0, 1, ..., max_len-1]
        div_term = torch.exp(torch.arange(0, hidden_dim, 2).float() * (-math.log(10000.0) / hidden_dim))
        
        # Apply sine to even indices in the encoding (2i)
        pe[:, 0::2] = torch.sin(position * div_term)
        
        # Apply cosine to odd indices in the encoding (2i+1)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        # Add a batch dimension at the beginning of the positional encoding matrix
        pe = pe.unsqueeze(0)
        
        # Store the positional encodings in the buffer (they are constants, not learnable parameters)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # Add positional encoding to the input embeddings (x)
        # x is expected to be of shape (batch_size, sequence_length, hidden_dim)
        x = x + self.pe[:, :x.size(1), :]  # Add positional encodings to the input embeddings
        return x

In [71]:
import torch
import torch.nn as nn

class TransformerEncoder(nn.Module):
    def __init__(self, hidden_dim, nhead, num_layers, n_features, max_len = 2000):
        super(TransformerEncoder, self).__init__()
        self.embedding = nn.Linear(n_features, hidden_dim)  # Embedding layer
        self.pos_encoder = PositionalEncoding(hidden_dim, max_len)
        self.transformer_encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=nhead), 
            num_layers=num_layers
        )

    def forward(self, src, src_key_padding_mask=None):
        src_emb = self.embedding(src)  # Shape: (batch_size, n_frame1, hidden_dim)
        src_emb = self.pos_encoder(src_emb) # adding the position encoding to the semantic encoding
        src_emb = src_emb.transpose(0, 1)  # Transpose to (n_frame1, batch_size, hidden_dim)
        memory = self.transformer_encoder(src_emb, src_key_padding_mask = src_key_padding_mask)
        return memory.transpose(0, 1)  # Transpose back to (batch_size, n_frame1, hidden_dim)

class TransformerDecoder(nn.Module):
    def __init__(self, hidden_dim, nhead, num_layers, mel_dim, max_len = 2000):
        super(TransformerDecoder, self).__init__()
        self.embedding = nn.Linear(mel_dim, hidden_dim)  # Embedding layer
        self.pos_encoder = PositionalEncoding(hidden_dim, max_len)
        t

    def forward(self, tgt, memory, tgt_key_padding_mask = None, memory_key_padding_mask = None):
        tgt_emb = self.embedding(tgt)  # Shape: (batch_size, n_frame2, hidden_dim)
        tgt_emb = self.pos_encoder(tgt_emb)
        tgt_emb = tgt_emb.transpose(0, 1)  # Transpose to (n_frame2, batch_size, hidden_dim)
        print(f"Memory shape from within decoder function: {memory.shape}")  # Should be (n_frame1, batch_size, hidden_dim)
        print(f"Tgt Embedding shape from within decoder function: {tgt_emb.shape}")  # Should be (n_frame2, batch_size, hidden_dim)
        output = self.transformer_decoder(tgt_emb, memory, 
                                          tgt_key_padding_mask = tgt_key_padding_mask, 
                                          memory_key_padding_mask=memory_key_padding_mask)
        return output.transpose(0, 1)  # Transpose back to (batch_size, n_frame2, hidden_dim)

# class TransformerModel(nn.Module):
#     def __init__(self, input_dim, mel_dim, hidden_dim, nhead, num_encoder_layers, num_decoder_layers):
#         super(TransformerModel, self).__init__()
#         self.encoder = TransformerEncoder(hidden_dim, nhead, num_encoder_layers)
#         self.decoder = TransformerDecoder(hidden_dim, nhead, num_decoder_layers)
#         self.fc_out = nn.Linear(hidden_dim, mel_dim)  # Final output layer

#     def forward(self, src, tgt): # tgt is the groundtruth output
#         # Forward through encoder and decoder
#         memory = self.encoder(src)
#         output = self.decoder(tgt, memory)
        
#         # Output layer to generate mel spectrograms
#         mel_output = self.fc_out(output)  # Shape: (batch_size, n_frame2, mel_dim)
#         return mel_output

In [82]:
import torch.optim as optim

device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")
device = "cpu"
# Define model parameters
hidden_dim = 256  # Example dimension, adjust as needed
nhead = 8  # Number of attention heads
num_layers = 4  # Number of transformer layers
mel_dim = 128
n_features = 99

# Instantiate model
encoder = TransformerEncoder(hidden_dim, nhead, num_layers, n_features).to(device)
decoder = TransformerDecoder(hidden_dim, nhead, num_layers, mel_dim).to(device)
transformer_model = nn.Sequential(encoder, decoder).to(device)  # Or define a custom model class that combines them

# Define loss function and optimizer
criterion = nn.MSELoss()  # Assuming you're predicting continuous values (modify as needed)
optimizer = optim.Adam(transformer_model.parameters(), lr=0.001)



In [83]:
import torch

torch.mps.empty_cache()


In [84]:
num_epochs = 10  # Define number of epochs
transformer_model.train()  # Set model to training mode

for epoch in range(num_epochs):
    total_loss = 0  # Variable to accumulate loss

    for batch in dataloader:
        # Get padded inputs and targets
        input_seq = batch['input'].to(device)  # Shape: (batch_size, max_input_len, input_dim)
        target_seq = batch['target'].to(device)  # Shape: (batch_size, max_output_len, output_dim)
        input_mask = batch['input_mask'].to(device)  # Input mask
        target_mask = batch['target_mask'].to(device)  # Output mask
        print(f"input mask shape{input_mask.shape}")

        # Move tensors to the appropriate device (CPU/GPU)
        input_seq = input_seq.to(device)
        target_seq = target_seq.to(device)

        # Zero the gradients
        optimizer.zero_grad()

        with torch.no_grad():
            # Forward pass through the model
            memory = encoder(input_seq, src_key_padding_mask = input_mask)  # Encoder output
            print(f"memory shape {memory.shape}")
            print(f"target shape {target_seq.shape}")
            output = decoder(target_seq, memory, tgt_key_padding_mask = target_mask, memory_key_padding_mask = input_mask)  # Decoder output

        # Calculate loss
        loss = criterion(output, target_seq)  # Modify if needed based on your output processing
        total_loss += loss.item()

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

    # Print average loss for the epoch
    avg_loss = total_loss / len(dataloader)
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_loss:.4f}')


input mask shapetorch.Size([1, 1867])
memory shape torch.Size([1, 1867, 256])
target shape torch.Size([1, 1352, 128])
Memory shape from within decoder function: torch.Size([1, 1867, 256])
Tgt Embedding shape from within decoder function: torch.Size([1352, 1, 256])


RuntimeError: shape '[1, 8, 32]' is invalid for input of size 477952