<a href="https://colab.research.google.com/github/Pragu3704/NatyaAI/blob/main/RAPID_Internship.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Mounting Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Feature extraction

Joint angles : Video features


In [None]:
import os
import cv2
import mediapipe as mp
import numpy as np

def calculate_angle(a, b, c):
    a = np.array([a.x, a.y])
    b = np.array([b.x, b.y])
    c = np.array([c.x, c.y])

    radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0])
    angle = np.abs(radians * 180.0 / np.pi)

    if angle > 180.0:
        angle = 360.0 - angle

    return angle


def extract_joint_angles(video_path):
    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5)

    cap = cv2.VideoCapture(video_path)
    angles = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = pose.process(frame_rgb)

        if results.pose_landmarks:
            landmarks = results.pose_landmarks.landmark

            angle_data = []

            # Left arm angles
            left_hip = landmarks[mp_pose.PoseLandmark.LEFT_HIP.value]
            left_shoulder = landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value]
            left_elbow = landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value]
            left_wrist = landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value]

            left_shoulder_angle = calculate_angle(left_hip, left_shoulder, left_elbow)
            left_elbow_angle = calculate_angle(left_elbow, left_shoulder, left_wrist)
            left_wrist_angle = calculate_angle(left_elbow, left_wrist, landmarks[mp_pose.PoseLandmark.LEFT_INDEX.value])
            angle_data.extend([left_shoulder_angle, left_elbow_angle, left_wrist_angle])

            # Right arm angles
            right_hip = landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value]
            right_shoulder = landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value]
            right_elbow = landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value]
            right_wrist = landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value]

            right_shoulder_angle = calculate_angle(right_hip, right_shoulder, right_elbow)
            right_elbow_angle = calculate_angle(right_shoulder, right_elbow, right_wrist)
            right_wrist_angle = calculate_angle(right_elbow, right_wrist, landmarks[mp_pose.PoseLandmark.RIGHT_INDEX.value])
            angle_data.extend([right_shoulder_angle, right_elbow_angle, right_wrist_angle])

            # Left leg angles
            left_knee = landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value]
            left_ankle = landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value]

            left_hip_angle = calculate_angle(left_shoulder, left_hip, left_knee)
            left_knee_angle = calculate_angle(left_hip, left_knee, left_ankle)
            left_ankle_angle = calculate_angle(left_knee, left_ankle, landmarks[mp_pose.PoseLandmark.LEFT_FOOT_INDEX.value])
            angle_data.extend([left_hip_angle, left_knee_angle, left_ankle_angle])

            # Right leg angles
            right_knee = landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value]
            right_ankle = landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value]

            right_hip_angle = calculate_angle(right_shoulder, right_hip, right_knee)
            right_knee_angle = calculate_angle(right_hip, right_knee, right_ankle)
            right_ankle_angle = calculate_angle(right_knee, right_ankle, landmarks[mp_pose.PoseLandmark.RIGHT_FOOT_INDEX.value])
            angle_data.extend([right_hip_angle, right_knee_angle, right_ankle_angle])

            # Append more joint angles as needed
            angles.append(angle_data)

    cap.release()
    pose.close()
    return np.array(angles)

input_folder = '/content/drive/MyDrive/Dataset'
output_folder = '/content/drive/MyDrive/Output_Angles'

if not os.path.exists(output_folder):
    os.makedirs(output_folder)

for video_file in os.listdir(input_folder):
    video_path = os.path.join(input_folder, video_file)
    angles = extract_joint_angles(video_path)

    output_path = os.path.join(output_folder, f"{os.path.splitext(video_file)[0]}_angles.npy")
    np.save(output_path, angles)
    print(f"Processed and saved: {video_file}")

print("All videos processed.")


Feature extraction : Audio



In [None]:
import os
import numpy as np
import librosa
from moviepy.editor import VideoFileClip

def extract_audio_features(audio_path):
    y, sr = librosa.load(audio_path, sr=None)

    # Extracting various audio features
    features = {}

    # Tempo and Beat
    tempo, beat_frames = librosa.beat.beat_track(y=y, sr=sr)
    features['tempo'] = tempo
    features['beat_frames'] = beat_frames

    # MFCCs
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
    features['mfcc'] = mfcc

    # Chromagram
    chroma = librosa.feature.chroma_stft(y=y, sr=sr)
    features['chroma'] = chroma

    # Spectral Contrast
    spectral_contrast = librosa.feature.spectral_contrast(y=y, sr=sr)
    features['spectral_contrast'] = spectral_contrast

    # Tonnetz (tonal centroid features)
    tonnetz = librosa.feature.tonnetz(y=librosa.effects.harmonic(y), sr=sr)
    features['tonnetz'] = tonnetz

    # Mel Spectrogram
    mel_spectrogram = librosa.feature.melspectrogram(y=y, sr=sr)
    features['mel_spectrogram'] = mel_spectrogram

    # RMS (root mean square energy)
    rms = librosa.feature.rms(y=y)
    features['rms'] = rms

    # Zero-Crossing Rate
    zcr = librosa.feature.zero_crossing_rate(y)
    features['zcr'] = zcr

    # Spectral Centroid
    spectral_centroid = librosa.feature.spectral_centroid(y=y, sr=sr)
    features['spectral_centroid'] = spectral_centroid

    # Spectral Bandwidth
    spectral_bandwidth = librosa.feature.spectral_bandwidth(y=y, sr=sr)
    features['spectral_bandwidth'] = spectral_bandwidth

    # Spectral Rolloff
    spectral_rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)
    features['spectral_rolloff'] = spectral_rolloff

    return features

def extract_audio_from_video(video_path, output_audio_path):
    video = VideoFileClip(video_path)
    video.audio.write_audiofile(output_audio_path, codec='pcm_s16le')

def process_videos(input_folder, output_audio_folder, output_features_folder):
    if not os.path.exists(output_audio_folder):
        os.makedirs(output_audio_folder)
    if not os.path.exists(output_features_folder):
        os.makedirs(output_features_folder)

    for video_file in os.listdir(input_folder):
        if video_file.endswith('.mp4'):
            video_path = os.path.join(input_folder, video_file)
            audio_path = os.path.join(output_audio_folder, os.path.splitext(video_file)[0] + '.wav')
            feature_path = os.path.join(output_features_folder, os.path.splitext(video_file)[0] + '_features.npz')

            # Extract audio from video
            extract_audio_from_video(video_path, audio_path)

            # Extract audio features
            audio_features = extract_audio_features(audio_path)

            # Save the features
            np.savez(feature_path, **audio_features)
            print(f"Processed and saved features for: {video_file}")

# Paths to your input video folder and output folders for audio and features
input_folder = '/content/drive/MyDrive/Dataset'
output_audio_folder = '/content/drive/MyDrive/Output_audio'
output_features_folder = '/content/drive/MyDrive/Output_features'

# Process all videos in the input folder
process_videos(input_folder, output_audio_folder, output_features_folder)
print("All videos processed, audio extracted, and features saved.")

# Interpolation





Video

In [None]:
#Interpolate data
import numpy as np
from scipy.interpolate import interp1d
import os

def resample_joint_angles(joint_angles, target_frames=900):
    num_original_frames = len(joint_angles)

    if num_original_frames == target_frames:
        return joint_angles
    elif num_original_frames < target_frames:
        # Interpolate to increase frames
        original_indices = np.linspace(0, 1, num=num_original_frames)
        target_indices = np.linspace(0, 1, num=target_frames)
        interpolated_angles = interp1d(original_indices, joint_angles, axis=0, kind='cubic')(target_indices)
        return interpolated_angles
    else:
        # Downsample to reduce frames
        ratio = num_original_frames // target_frames
        downsampled_angles = np.mean(joint_angles[:target_frames * ratio].reshape(-1, ratio, joint_angles.shape[1]), axis=1)
        return downsampled_angles

# Folder paths
processed_folder = '/content/drive/MyDrive/Output_Angles'
resampled_folder = '/content/drive/MyDrive/Interpolated_Angles'
target_frames = 900  # Target number of frames

if not os.path.exists(resampled_folder):
    os.makedirs(resampled_folder)

# Resample all processed joint angles
for filename in os.listdir(processed_folder):
    if filename.endswith('_angles.npy'):
        file_path = os.path.join(processed_folder, filename)
        joint_angles = np.load(file_path)

        # Resample joint angles to the target number of frames
        resampled_angles = resample_joint_angles(joint_angles, target_frames)

        # Save the resampled joint angles
        output_path = os.path.join(resampled_folder, filename)
        np.save(output_path, resampled_angles)
        print(f"Resampled and saved: {filename} (Original frames: {len(joint_angles)}, New frames: {len(resampled_angles)})")

print("All joint angles resampled.")


Audio

In [None]:
import numpy as np
from scipy.interpolate import interp1d
import os

# Path to directories
audio_folder = '/content/drive/MyDrive/Output_features'
output_folder = '/content/drive/MyDrive/Interpolated_Audio'
desired_frames = 900

def adjust_audio_features(audio_folder, output_folder, desired_frames):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for filename in os.listdir(audio_folder):
        if filename.endswith('.npz'):
            file_path = os.path.join(audio_folder, filename)
            output_file_path = os.path.join(output_folder, filename)

            with np.load(file_path) as npz_data:
                adjusted_data = {}

                for key in npz_data:
                    data = npz_data[key]
                    original_shape = data.shape

                    if key == 'tempo':
                        # No interpolation needed for tempo
                        adjusted_data[key] = data
                    elif key == 'beat_frames':
                        # Pad or crop beat_frames
                        num_beats = original_shape[0]
                        if num_beats > desired_frames:
                            # Crop to desired frames
                            adjusted_data[key] = data[:desired_frames]
                        else:
                            # Pad to desired frames
                            adjusted_data[key] = np.pad(data, (0, desired_frames - num_beats), 'constant')
                    elif len(original_shape) == 2:
                        num_frames = original_shape[1]
                        if num_frames == desired_frames:
                            # No adjustment needed
                            adjusted_data[key] = data
                        else:
                            # Interpolate to the desired number of frames
                            x = np.linspace(0, 1, num_frames)
                            new_x = np.linspace(0, 1, desired_frames)
                            interpolator = interp1d(x, data, kind='cubic', axis=1, fill_value='extrapolate')
                            adjusted_data[key] = interpolator(new_x)
                    else:
                        # Handle unexpected shapes
                        print(f"Unexpected shape for {key}: {original_shape}")
                        adjusted_data[key] = data

                # Save the adjusted data
                np.savez(output_file_path, **adjusted_data)
                print(f'Saved adjusted file: {output_file_path}')

# Adjust and save audio feature files
adjust_audio_features(audio_folder, output_folder, desired_frames)


#Normalization

Sine-Cosine Normalization : video

In [None]:
import numpy as np
import os

def normalize_video_data(input_folder, output_folder):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for file in os.listdir(input_folder):
        if file.endswith('_angles.npy'):
            file_path = os.path.join(input_folder, file)
            try:
                angles = np.load(file_path)
                if angles.size == 0:
                    print(f"Skipping empty file: {file}")
                    continue

                # Convert angles from degrees to radians
                angles_rad = np.deg2rad(angles)

                # Apply sine-cosine normalization
                normalized_angles = np.stack([
                    np.sin(angles_rad),  # Sine of angles
                    np.cos(angles_rad)   # Cosine of angles
                ], axis=-1)

                output_path = os.path.join(output_folder, file)
                np.save(output_path, normalized_angles)
                print(f"Normalized and saved: {file}")
            except Exception as e:
                print(f"Error processing file {file}: {e}")

input_video_folder = '/content/drive/MyDrive/Interpolated_Angles'
normalized_video_folder = '/content/drive/MyDrive/Normalized_Video_Angles'
normalize_video_data(input_video_folder, normalized_video_folder)
print("All video data normalized.")


Min-Max normalization : Video


In [None]:
import numpy as np
import os

def calculate_global_audio_min_max(input_folder, target_columns):
    global_min_max = {}
    print("Calculating global min and max values...")

    for file in os.listdir(input_folder):
        if file.endswith('_features.npz'):
            file_path = os.path.join(input_folder, file)
            try:
                data = np.load(file_path)
                for key in data.keys():
                    feature = data[key]

                    # Ensure feature has 2 dimensions
                    if feature.ndim == 1:
                        feature = feature[np.newaxis, :]  # Add an extra dimension for consistency

                    # Normalize the feature dimension
                    feature = trim_or_pad_feature(feature, target_columns)

                    # Calculate min and max values along the feature dimension
                    min_vals = feature.min(axis=0, keepdims=True)
                    max_vals = feature.max(axis=0, keepdims=True)

                    if key not in global_min_max:
                        global_min_max[key] = [min_vals, max_vals]
                    else:
                        global_min_max[key][0] = np.minimum(global_min_max[key][0], min_vals)
                        global_min_max[key][1] = np.maximum(global_min_max[key][1], max_vals)

            except Exception as e:
                print(f"Error processing file {file_path}: {e}")

    print("Global min and max values calculated.")
    return global_min_max

def normalize_audio_data(input_folder, output_folder, global_min_max, target_columns):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    print("Normalizing audio data...")

    for file in os.listdir(input_folder):
        if file.endswith('_features.npz'):
            file_path = os.path.join(input_folder, file)
            try:
                data = np.load(file_path)

                normalized_data = {}
                for key in data.keys():
                    feature = data[key]

                    # Ensure feature has 2 dimensions
                    if feature.ndim == 1:
                        feature = feature[np.newaxis, :]  # Add an extra dimension for consistency

                    # Trim or pad feature to have the target number of columns
                    feature = trim_or_pad_feature(feature, target_columns)

                    min_vals = global_min_max[key][0]
                    max_vals = global_min_max[key][1]

                    # Normalize the feature
                    normalized_feature = (feature - min_vals) / (max_vals - min_vals)
                    normalized_feature = np.clip(normalized_feature, 0, 1)

                    normalized_data[key] = normalized_feature

                output_path = os.path.join(output_folder, file)
                np.savez(output_path, **normalized_data)
                print(f"Normalized and saved: {file}")

            except Exception as e:
                print(f"Error processing file {file_path}: {e}")

    print("All audio data normalized.")

def trim_or_pad_feature(feature, target_columns):
    """
    Trim or pad the feature array to ensure it has exactly target_columns columns.
    """
    current_columns = feature.shape[1]

    if current_columns > target_columns:
        # Trim columns
        feature = feature[:, :target_columns]
    elif current_columns < target_columns:
        # Pad columns with zeros
        padding = target_columns - current_columns
        feature = np.pad(feature, ((0, 0), (0, padding)), mode='constant')

    return feature

input_audio_folder = '/content/drive/MyDrive/Interpolated_Audio'
normalized_audio_folder = '/content/drive/MyDrive/Normalized_Features'
target_columns = 900

global_audio_min_max = calculate_global_audio_min_max(input_audio_folder, target_columns)
normalize_audio_data(input_audio_folder, normalized_audio_folder, global_audio_min_max, target_columns)


# Synchronization of Data


In [None]:
import os
import numpy as np

def synchronize_data(video_folder, audio_folder, output_folder):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    video_files = [file for file in os.listdir(video_folder) if file.endswith('_angles.npy')]
    total_files = len(video_files)
    synchronized_files = 0
    unsynchronized_files = []

    for i, file in enumerate(video_files):
        video_path = os.path.join(video_folder, file)
        audio_path = os.path.join(audio_folder, file.replace('_angles.npy', '_features.npz'))

        if os.path.exists(audio_path):
            try:
                video_data = np.load(video_path)
                audio_data = np.load(audio_path)

                # Ensure video and audio data have the expected shapes
                if len(video_data.shape) != 2 or video_data.shape[1] != 12:
                    raise ValueError(f"Unexpected shape for video data in {file}: {video_data.shape}")

                for key in audio_data.keys():
                    if len(audio_data[key].shape) != 2:
                        raise ValueError(f"Unexpected shape for audio feature {key} in {file}: {audio_data[key].shape}")

                synchronized_data = {
                    'video': video_data,
                    'audio': {key: audio_data[key] for key in audio_data.keys()}
                }

                output_path = os.path.join(output_folder, file.replace('_angles.npy', '_synchronized.npz'))
                np.savez(output_path, **synchronized_data)
                synchronized_files += 1
                print(f"Synchronized and saved: {file} ({i + 1}/{total_files})")
            except Exception as e:
                unsynchronized_files.append(file)
                print(f"Error processing {file}: {e}")
        else:
            unsynchronized_files.append(file)
            print(f"Could not find matching audio for: {file} ({i + 1}/{total_files})")

    print(f"\nTotal files found: {total_files}")
    print(f"Total files synchronized: {synchronized_files}")
    print(f"Files that could not be synchronized: {len(unsynchronized_files)}")
    if unsynchronized_files:
        print("\nList of files that could not be synchronized:")
        for file in unsynchronized_files:
            print(file)

# Folders
video_folder = '/content/drive/MyDrive/NEW_Normalized_Video_Angles'
audio_folder = '/content/drive/MyDrive/Normalized_Features'
output_folder = '/content/drive/MyDrive/NEW_Synchronized_Data'

# Synchronize data
synchronize_data(video_folder, audio_folder, output_folder)
print("All data synchronized.")

# Some important libraries


In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np
from sklearn.metrics import mean_absolute_error, r2_score
import matplotlib.pyplot as plt
from torch.optim.lr_scheduler import CosineAnnealingLR
import random
from tqdm import tqdm

# Data Pre-Processing


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

class SynchronizedAudioDanceDataset(Dataset):
    def __init__(self, root_dir):
        self.root_dir = root_dir
        self.file_list = [f for f in os.listdir(root_dir) if f.endswith('.npz')]

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        file_path = os.path.join(self.root_dir, self.file_list[idx])
        Sync_data = np.load(file_path, allow_pickle=True)
        audio_data = Sync_data['audio'].item()
        video_data = Sync_data['video']

        audio_features = {}
        for key in ['tempo','mel_spectrogram', 'mfcc', 'beat_frames', 'chroma']:
            try:
                tensor = torch.tensor(audio_data[key]).float()
                tensor = torch.nan_to_num(tensor)
                tensor = torch.clamp(tensor, -1e6, 1e6)
                if tensor.shape[-1] > 900:
                    tensor = nn.functional.interpolate(tensor.unsqueeze(0), size=900, mode='linear', align_corners=False).squeeze(0)
                audio_features[key] = tensor
            except Exception as e:
                print(f"Error processing {key}: {e}")
                print(f"Shape of {key}: {np.array(audio_data[key]).shape}")

        video_features = torch.tensor(video_data).float()
        if video_features.shape[0] > 900:
            video_features = nn.functional.interpolate(video_features.unsqueeze(0).unsqueeze(0),
                                                      size=(900, 12, 2),
                                                      mode='trilinear',
                                                      align_corners=False).squeeze(0).squeeze(0)

        return audio_features, video_features

class AugmentedSynchronizedAudioDanceDataset(SynchronizedAudioDanceDataset):
    def __init__(self, root_dir, augment_prob=0.7):
        super().__init__(root_dir)
        self.augment_prob = augment_prob

    def __getitem__(self, idx):
        audio_features, video_features = super().__getitem__(idx)

        if random.random() < self.augment_prob:
            audio_features, video_features = self.augment(audio_features, video_features)

        return audio_features, video_features
    def time_warp(self, audio_features, video_features, max_time_warp=50): # Added self argument
        t = video_features.shape[1]
        if t <= max_time_warp:
            return audio_features, video_features

        t1 = random.randint(max_time_warp, t - max_time_warp)
        scale = random.uniform(0.8, 1.25)
        t1_new = int(t1 * scale)

        # Warp the first part
        video_warped_1 = nn.functional.interpolate(video_features[:, :t1].permute(0, 3, 1, 2), size=(t1_new, 12), mode='bilinear', align_corners=False).permute(0, 2, 3, 1)

        # Keep the second part unchanged
        video_warped_2 = video_features[:, t1:]

        # Interpolate or truncate to maintain original length
        if t1_new + video_warped_2.shape[1] > t:
            video_features = torch.cat([video_warped_1, video_warped_2], dim=1)[:, :t]
        else:
            padding = t - (t1_new + video_warped_2.shape[1])
            video_features = nn.functional.pad(torch.cat([video_warped_1, video_warped_2], dim=1), (0, 0, 0, 0, 0, padding))

        for key in audio_features:
            if key not in ['tempo', 'beat_frames']:
                # Warp the first part
                audio_warped_1 = nn.functional.interpolate(audio_features[key][:, :t1].unsqueeze(1), size=t1_new, mode='cubic', align_corners=False).squeeze(1)

                # Keep the second part unchanged
                audio_warped_2 = audio_features[key][:, t1:]

                # Interpolate or truncate to maintain original length
                if t1_new + audio_warped_2.shape[1] > t:
                    audio_features[key] = torch.cat([audio_warped_1, audio_warped_2], dim=1)[:, :t]
                else:
                    padding = t - (t1_new + audio_warped_2.shape[1])
                    audio_features[key] = nn.functional.pad(torch.cat([audio_warped_1, audio_warped_2], dim=1), (0, padding))

        return audio_features, video_features

    # Add this to your augment method in AugmentedSynchronizedAudioDanceDataset
    def augment(self, audio_features, video_features):
        augmentations = [
            self.frequency_masking,
            self.add_noise,
            self.random_invert,
            self.random_scale,
            self.time_warp  # Add this new augmentation
        ]

        num_augmentations = random.randint(1, 2)
        chosen_augmentations = random.sample(augmentations, num_augmentations)

        for aug_func in chosen_augmentations:
            audio_features, video_features = aug_func(audio_features, video_features)

        return audio_features, video_features

    def frequency_masking(self, audio_features, video_features, max_mask_size=20):
        for key in ['mfcc', 'chroma', 'spectral_contrast', 'tonnetz', 'mel_spectrogram']:
            if key in audio_features:
                feature_size = audio_features[key].shape[0]
                if feature_size > 1:
                    mask_size = min(random.randint(1, max_mask_size), feature_size - 1)
                    start = random.randint(0, feature_size - mask_size)
                    audio_features[key][start:start+mask_size, :] = 0

        return audio_features, video_features

    def add_noise(self, audio_features, video_features, noise_level=0.005):
        for key in audio_features:
            noise = torch.randn_like(audio_features[key]) * noise_level
            audio_features[key] += noise

        video_noise = torch.randn_like(video_features) * noise_level
        video_features += video_noise

        return audio_features, video_features

    def random_invert(self, audio_features, video_features):
        if random.random() < 0.5:
            for key in audio_features:
                if key not in ['tempo', 'beat_frames']:
                    audio_features[key] = -audio_features[key]

            video_features = -video_features

        return audio_features, video_features

    def random_scale(self, audio_features, video_features, max_scale=0.2):
        scale_factor = 1 + random.uniform(-max_scale, max_scale)

        for key in audio_features:
            if key not in ['tempo', 'beat_frames']:
                audio_features[key] *= scale_factor

        video_features *= scale_factor

        return audio_features, video_features

# Use the augmented dataset

dataset = AugmentedSynchronizedAudioDanceDataset('/content/drive/MyDrive/Synchronised_Data')
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size

train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


# The Training








Model Architecture

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=900):
        super().__init__()
        self.encoding = torch.zeros(1, max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        self.encoding[0, :, 0::2] = torch.sin(position * div_term)
        self.encoding[0, :, 1::2] = torch.cos(position * div_term)

    def forward(self, x):
        return x + self.encoding[:, :x.size(1), :].to(x.device)

class DanceGenerator(nn.Module):
    def __init__(self, audio_dim, video_dim, hidden_dim, num_heads, num_layers, target_seq_len, lstm_layers=2, lstm_dropout=0.1):
        super().__init__()
        self.audio_encoder = nn.Linear(audio_dim, hidden_dim)
        self.positional_encoding = PositionalEncoding(hidden_dim)

        # LSTM layers
        self.lstm = nn.LSTM(hidden_dim, hidden_dim, num_layers=lstm_layers, batch_first=True, dropout=lstm_dropout)

        # Transformer Encoder
        encoder_layer = nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=num_heads, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        # Transformer Decoder
        decoder_layer = nn.TransformerDecoderLayer(d_model=hidden_dim, nhead=num_heads, batch_first=True)
        self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)

        self.fc = nn.Linear(hidden_dim, video_dim * 2)
        self.target_seq_len = target_seq_len
        self.dropout = nn.Dropout(0.1)
        self.layer_norm = nn.LayerNorm(hidden_dim)

    def forward(self, audio_features):
        audio_features = audio_features.permute(0, 2, 1)
        encoded_audio = self.audio_encoder(audio_features)
        encoded_audio = self.dropout(encoded_audio)
        encoded_audio = self.positional_encoding(encoded_audio)

        # LSTM processing
        lstm_output, _ = self.lstm(encoded_audio)

        # Encoder
        memory = self.transformer_encoder(lstm_output)

        # Initialize target sequence for decoder
        batch_size = audio_features.size(0)
        target = torch.zeros(batch_size, self.target_seq_len, encoded_audio.size(-1)).to(audio_features.device)
        target = self.positional_encoding(target)

        # Decoder
        output = self.transformer_decoder(target, memory)
        output = self.layer_norm(output)

        # Final output layer
        output = self.fc(output)
        output = output.view(output.size(0), output.size(1), -1, 2)

        return output

Hyperparameters

In [None]:
audio_dim = 128 + 13 + 12 + 1 + 1
video_dim = 12
hidden_dim = 256
num_heads = 4
num_layers = 6
batch_size = 4
num_epochs = 50
learning_rate = 0.01
accuracy_threshold = 0.18
target_seq_len = 900

# New hyperparameters for LSTM
lstm_layers = 3
lstm_dropout = 0.1

model = DanceGenerator(audio_dim, video_dim, hidden_dim, num_heads, num_layers, target_seq_len,
                       lstm_layers=lstm_layers, lstm_dropout=lstm_dropout).to(device)
criterion = nn.MSELoss()
optimizer = optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=1e-5)
scheduler = CosineAnnealingLR(optimizer, T_max=num_epochs, eta_min=1e-6)

Loss and evaluation Functions

In [None]:
def check_invalid_values(tensor, name):
    if torch.isnan(tensor).any():
        print(f"NaN values found in {name}")
    if torch.isinf(tensor).any():
        print(f"Infinite values found in {name}")

def calculate_accuracy(predictions, targets, threshold):
    abs_errors = np.abs(predictions - targets)
    accuracy = np.mean(abs_errors <= threshold)
    return accuracy

def calculate_rhythm_matching(predictions, targets):
    predictions = np.array(predictions)
    targets = np.array(targets)
    return np.mean(np.abs(np.diff(predictions, axis=1) - np.diff(targets, axis=1)))

def calculate_style_consistency(predictions, targets):
    predictions = np.array(predictions)
    targets = np.array(targets)
    return np.mean(np.std(predictions, axis=1) / np.std(targets, axis=1))

def diversity_loss(output):
    diff = output[:, 1:] - output[:, :-1]
    return -torch.mean(torch.abs(diff))

def temporal_loss(output, target, frame_window=30):
    temporal_diff = torch.abs(output[:, 1:] - output[:, :-1] - (target[:, 1:] - target[:, :-1]))
    direction_change_penalty = torch.zeros_like(temporal_diff)

    for i in range(2, frame_window + 1):
        direction_change_penalty[:, i-1:] += torch.abs(output[:, i:] - 2 * output[:, i-1:-1] + output[:, i-2:-2])

    return torch.mean(temporal_diff) + torch.mean(direction_change_penalty)

# Now use this updated temporal loss in your combined loss function
def combined_loss(output, target):
    mse_loss = criterion(output, target)
    temp_loss = temporal_loss(output, target)
    div_loss = diversity_loss(output)
    return mse_loss + 0.1 * temp_loss + 0.05 * div_loss

Defining terms

In [None]:
train_losses = []
test_losses = []
train_mae = []
test_mae = []
train_r2 = []
test_r2 = []
train_accuracy = []
test_accuracy = []
train_rhythm_matching = []
test_rhythm_matching = []
train_style_consistency = []
test_style_consistency = []


The training loop

In [None]:
for epoch in range(num_epochs):
    model.train()
    total_train_loss = 0
    train_outputs = []
    train_targets = []

    for audio_features, video_features in tqdm(train_dataloader, desc=f"Training Epoch {epoch+1}", unit="batch"):
        optimizer.zero_grad()

        combined_audio = torch.cat([
            audio_features['tempo'].view(-1, 1, 900),
            audio_features['mel_spectrogram'].view(-1, 128, 900),
            audio_features['mfcc'].view(-1, 13, 900),
            audio_features['chroma'].view(-1, 12, 900),
            audio_features['beat_frames'].view(-1, 1, 900),
        ], dim=1).to(device)

        output = model(combined_audio)
        check_invalid_values(output, 'output')

        loss = combined_loss(output, video_features.to(device))
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        total_train_loss += loss.item()

        train_outputs.append(output.cpu().detach().numpy())
        train_targets.append(video_features.numpy())

    avg_train_loss = total_train_loss / len(train_dataloader)
    train_losses.append(avg_train_loss)

    train_outputs = np.concatenate(train_outputs, axis=0).reshape(-1, 900, 12, 2)
    train_targets = np.concatenate(train_targets, axis=0).reshape(-1, 900, 12, 2)

    train_outputs_flat = train_outputs.reshape(-1, 2)
    train_targets_flat = train_targets.reshape(-1, 2)

    train_mae.append(mean_absolute_error(train_targets_flat, train_outputs_flat))
    train_r2.append(r2_score(train_targets_flat, train_outputs_flat))
    train_accuracy.append(calculate_accuracy(train_outputs, train_targets, accuracy_threshold))
    train_rhythm_matching.append(calculate_rhythm_matching(train_outputs, train_targets))
    train_style_consistency.append(calculate_style_consistency(train_outputs, train_targets))

    model.eval()
    total_test_loss = 0
    test_outputs = []
    test_targets = []

    with torch.no_grad():
        for audio_features, video_features in tqdm(test_dataloader, desc=f"Testing Epoch {epoch+1}", unit="batch"):
            combined_audio = torch.cat([
                audio_features['tempo'].view(-1,1,900),
                audio_features['mel_spectrogram'].view(-1, 128, 900),
                audio_features['mfcc'].view(-1, 13, 900),
                audio_features['chroma'].view(-1, 12, 900),
                audio_features['beat_frames'].view(-1, 1, 900),
            ], dim=1).to(device)

            output = model(combined_audio)
            check_invalid_values(output, 'output')

            loss = combined_loss(output, video_features.to(device))
            total_test_loss += loss.item()

            test_outputs.append(output.cpu().detach().numpy())
            test_targets.append(video_features.numpy())

    avg_test_loss = total_test_loss / len(test_dataloader)
    test_losses.append(avg_test_loss)

    test_outputs = np.concatenate(test_outputs, axis=0).reshape(-1, 900, 12, 2)
    test_targets = np.concatenate(test_targets, axis=0).reshape(-1, 900, 12, 2)

    test_outputs_flat = test_outputs.reshape(-1, 2)
    test_targets_flat = test_targets.reshape(-1, 2)

    test_mae.append(mean_absolute_error(test_targets_flat, test_outputs_flat))
    test_r2.append(r2_score(test_targets_flat, test_outputs_flat))
    test_accuracy.append(calculate_accuracy(test_outputs, test_targets, accuracy_threshold))
    test_rhythm_matching.append(calculate_rhythm_matching(test_outputs, test_targets))
    test_style_consistency.append(calculate_style_consistency(test_outputs, test_targets))

    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {avg_train_loss:.4f}, Test Loss: {avg_test_loss:.4f}")
    print(f"Train MAE: {train_mae[-1]:.4f}, Test MAE: {test_mae[-1]:.4f}")
    print(f"Train R2: {train_r2[-1]:.4f}, Test R2: {test_r2[-1]:.4f}")
    print(f"Train Accuracy: {train_accuracy[-1]:.4f}, Test Accuracy: {test_accuracy[-1]:.4f}")
    print(f"Train Rhythm Matching: {train_rhythm_matching[-1]:.4f}, Test Rhythm Matching: {test_rhythm_matching[-1]:.4f}")
    print(f"Train Style Consistency: {train_style_consistency[-1]:.4f}, Test Style Consistency: {test_style_consistency[-1]:.4f}")

    scheduler.step()

Plotting of graphs

In [None]:
plt.figure()
plt.plot(train_losses, label='Train Loss')
plt.plot(test_losses, label='Test Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Plotting the MAE
plt.figure()
plt.plot(train_mae, label='Train MAE')
plt.plot(test_mae, label='Test MAE')
plt.xlabel('Epochs')
plt.ylabel('Mean Absolute Error')
plt.legend()
plt.show()

# Plotting the R2 Score
plt.figure()
plt.plot(train_r2, label='Train R2')
plt.plot(test_r2, label='Test R2')
plt.xlabel('Epochs')
plt.ylabel('R2 Score')
plt.legend()
plt.show()

# Plotting the Accuracy
plt.figure()
plt.plot(train_accuracy, label='Train Accuracy')
plt.plot(test_accuracy, label='Test Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

# Plotting the Rhythm Matching
plt.figure()
plt.plot(train_rhythm_matching, label='Train Rhythm Matching')
plt.plot(test_rhythm_matching, label='Test Rhythm Matching')
plt.xlabel('Epochs')
plt.ylabel('Rhythm Matching')
plt.legend()
plt.show()

# Plotting the Style Consistency
plt.figure()
plt.plot(train_style_consistency, label='Train Style Consistency')
plt.plot(test_style_consistency, label='Test Style Consistency')
plt.xlabel('Epochs')
plt.ylabel('Style Consistency')
plt.legend()
plt.show()

Saving the Model

In [None]:
torch.save(model, '/content/drive/MyDrive/OutputModelsAndNPYs/LSTM.pth')
torch.save(model.state_dict(), '/content/drive/MyDrive/OutputModelsAndNPYs/LSTM_state_dict.pth')

# Simulation Of The Model

Functions for exection



In [None]:
def preprocess_audio(audio_path):
    # Load your audio features here
    # For this example, we'll assume the features are pre-extracted and stored in a dictionary format
    # Replace this with your actual audio feature extraction code
    audio_data = np.load(audio_path, allow_pickle=True)
    audio_features = {
        'beat_frames': torch.tensor(audio_data['beat_frames']).float(),
        'mfcc': torch.tensor(audio_data['mfcc']).float(),
        'chroma': torch.tensor(audio_data['chroma']).float(),
        'spectral_contrast': torch.tensor(audio_data['spectral_contrast']).float(),
        'mel_spectrogram': torch.tensor(audio_data['mel_spectrogram']).float(),
    }
    for key in audio_features:
        audio_features[key] = torch.nan_to_num(audio_features[key])
        audio_features[key] = torch.clamp(audio_features[key], -1e6, 1e6)
    return audio_features

# Function to generate dance moves
def generate_dance_moves(model, audio_features, target_seq_len):
    with torch.no_grad():
        combined_audio = torch.cat([
            audio_features['mel_spectrogram'].view(-1, 128, 900),
            audio_features['mfcc'].view(-1, 13, 900),
            audio_features['chroma'].view(-1, 12, 900),
            audio_features['beat_frames'].view(-1, 1, 900),
        ], dim=1).to(device)

        output = model(combined_audio)
        output = output.cpu().numpy()
        return output

Model execution

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = DanceGenerator(audio_dim, video_dim, hidden_dim, num_heads, num_layers, target_seq_len,
                       lstm_layers=lstm_layers, lstm_dropout=lstm_dropout).to(device)
model.eval()

# Preprocess the input audio file
input_audio_path = '/content/drive/MyDrive/Normalized_Features/P5_features.npz'  # Replace with your input audio features file path
audio_features = preprocess_audio(input_audio_path)

# Generate dance moves
generated_dance_moves = generate_dance_moves(model, audio_features, 900)

# Save the generated dance moves to a .npy file
output_file_path = '/content/drive/MyDrive/OutputModelsAndNPYs/LSTM_Dance.npy'
np.save(output_file_path, generated_dance_moves)
print(f"Generated dance moves saved to {output_file_path}")

Functions for simulation

In [None]:
def calculate_joint_position(start_pos, angle, length, invert_y=False):
    """ Calculate the joint position given an angle and length. """
    angle_rad = np.radians(angle)
    x = start_pos[0] + length * np.cos(angle_rad)
    y = start_pos[1] - length * np.sin(angle_rad) if not invert_y else start_pos[1] + length * np.sin(angle_rad)
    return (int(x), int(y))

def regenerate_pose_sin_cos(angle_data_file, output_video_path, frame_rate=30):
    angles_data = np.load(angle_data_file, allow_pickle=True)
    print(f"Angles data loaded: {angles_data.shape}")
    print(angles_data)
    angles_data=angles_data[0]
    # Convert sine-cosine pairs back to angles in degrees
    angles_deg = np.degrees(np.arctan2(angles_data[..., 1], angles_data[..., 0]))
    print(angles_deg)
    # Define lengths of the limbs
    upper_arm_length, lower_arm_length, hand_length = 60, 50, 20
    upper_leg_length, lower_leg_length, foot_length = 70, 60, 30
    torso_length, neck_length = 100, 20
    head_radius = 20
    hip_width, shoulder_width = 60, 80

    # Create a blank image for visualization
    height, width = 720, 1280
    out = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*'mp4v'), frame_rate, (width, height))

    total_frames = angles_deg.shape[0]
    print(f"Total frames: {total_frames}")

    for frame_idx in range(total_frames):
        if frame_idx % 100 == 0:
            print(f"Processing frame {frame_idx}/{total_frames}")

        frame = np.zeros((height, width, 3), dtype=np.uint8)
        center_x, center_y = width // 2, height // 2

        angles = angles_deg[frame_idx]

        # Unpack all angles
        left_shoulder_angle, left_elbow_angle, left_wrist_angle, \
        right_shoulder_angle, right_elbow_angle, right_wrist_angle, \
        left_hip_angle, left_knee_angle, left_ankle_angle, \
        right_hip_angle, right_knee_angle, right_ankle_angle = angles

        # Define the base positions
        base_pos = (center_x, center_y + 200)  # Move the base position lower

        # Calculate hip positions
        left_hip_pos = calculate_joint_position(base_pos, left_hip_angle, hip_width // 2)
        right_hip_pos = calculate_joint_position(base_pos, right_hip_angle, hip_width // 2)

        # Calculate torso position (midpoint between hips)
        torso_pos = ((left_hip_pos[0] + right_hip_pos[0]) // 2,
                     (left_hip_pos[1] + right_hip_pos[1]) // 2 - torso_length)

        # Calculate shoulder positions
        left_shoulder_pos = (torso_pos[0] - shoulder_width // 2, torso_pos[1])
        right_shoulder_pos = (torso_pos[0] + shoulder_width // 2, torso_pos[1])

        # Calculate the head and neck positions
        neck_pos = (torso_pos[0], torso_pos[1] - neck_length)
        head_pos = (neck_pos[0], neck_pos[1] - head_radius)

        # Calculate arm positions
        left_elbow_pos = calculate_joint_position(left_shoulder_pos, -left_shoulder_angle, upper_arm_length)
        left_wrist_pos = calculate_joint_position(left_elbow_pos, left_elbow_angle, lower_arm_length)
        left_hand_pos = calculate_joint_position(left_wrist_pos, left_wrist_angle, hand_length)

        right_elbow_pos = calculate_joint_position(right_shoulder_pos, -right_shoulder_angle, upper_arm_length)
        right_wrist_pos = calculate_joint_position(right_elbow_pos, -right_elbow_angle, lower_arm_length)
        right_hand_pos = calculate_joint_position(right_wrist_pos, -right_wrist_angle, hand_length)

        # Calculate leg positions
        left_knee_pos = calculate_joint_position(left_hip_pos, left_hip_angle, upper_leg_length)
        left_ankle_pos = calculate_joint_position(left_knee_pos, left_knee_angle, lower_leg_length)
        left_foot_pos = calculate_joint_position(left_ankle_pos, left_ankle_angle, foot_length)

        right_knee_pos = calculate_joint_position(right_hip_pos, -right_hip_angle, upper_leg_length)
        right_ankle_pos = calculate_joint_position(right_knee_pos, -right_knee_angle, lower_leg_length)
        right_foot_pos = calculate_joint_position(right_ankle_pos, -right_ankle_angle, foot_length)

        # Draw the joints and connections
        joints = [
            (left_shoulder_pos, left_elbow_pos),
            (left_elbow_pos, left_wrist_pos),
            (left_wrist_pos, left_hand_pos),
            (right_shoulder_pos, right_elbow_pos),
            (right_elbow_pos, right_wrist_pos),
            (right_wrist_pos, right_hand_pos),
            (left_hip_pos, left_knee_pos),
            (left_knee_pos, left_ankle_pos),
            (left_ankle_pos, left_foot_pos),
            (right_hip_pos, right_knee_pos),
            (right_knee_pos, right_ankle_pos),
            (right_ankle_pos, right_foot_pos),
            (left_shoulder_pos, right_shoulder_pos),
            (left_hip_pos, right_hip_pos),
            (left_shoulder_pos, left_hip_pos),
            (right_shoulder_pos, right_hip_pos),
            (torso_pos, neck_pos),
            (neck_pos, head_pos),
            #(base_pos, left_hip_pos),
            #(base_pos, right_hip_pos)
        ]

        for start_pos, end_pos in joints:
            cv2.line(frame, start_pos, end_pos, (0, 255, 0), 2)
            cv2.circle(frame, start_pos, 5, (0, 0, 255), -1)
            cv2.circle(frame, end_pos, 5, (0, 0, 255), -1)

        # Draw the head as a circle
        cv2.circle(frame, head_pos, head_radius, (255, 0, 0), 2)

        # Display the frame number
        cv2.putText(frame, f'Frame: {frame_idx+1}/{total_frames}',
                    (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)

        out.write(frame)

    out.release()
    print(f"Video generation complete. Output saved to {output_video_path}")

Simulation

In [None]:
regenerate_pose_sin_cos(r"C:\Users\User\Downloads\LSTM_dance.npy", 'okay.mp4', frame_rate=30)

# ***Thank you!***