In [1]:
import os
import moviepy.editor as mp
import librosa
import librosa.display
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import torch

def video_to_save_log_mel_spectrogram(video_path, output_folder, n_fft=2048, hop_length=512, n_mels=128):
    video_name = os.path.splitext(os.path.basename(video_path))[0]
    audio_temp_path = f"{output_folder}/{video_name}.wav"

    # Extract audio from the video and save it as a temporary WAV file
    video_clip = mp.VideoFileClip(video_path)
    audio_clip = video_clip.audio
    audio_clip.write_audiofile(audio_temp_path, codec='pcm_s16le', fps=audio_clip.fps)

    # Load the audio file
    y, sr = librosa.load(audio_temp_path, sr=None)
    os.remove(audio_temp_path)

    # Compute the Mel spectrogram
    mel_spectrogram = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels)

    # Convert to log scale
    log_mel_spectrogram = librosa.power_to_db(mel_spectrogram, ref=np.max)
    print(log_mel_spectrogram.shape)
    # Save the log Mel spectrogram with transparent background as a PNG file
    # save_spectrogram_as_image(log_mel_spectrogram, output_path=os.path.join(output_folder, f"{video_name}.png"))
    
    return log_mel_spectrogram


def save_spectrogram_as_image(spectrogram, output_path):
    plt.figure(figsize=(10, 4))
    librosa.display.specshow(spectrogram, x_axis=None, y_axis=None, cmap='viridis')
    plt.axis('off')  # Turn off axis labels
    plt.title('')
    plt.savefig(output_path, transparent=True, bbox_inches='tight', pad_inches=0)  # Save the spectrogram as a PNG file with a transparent background
    plt.close()  # Close the plot to prevent displaying it

def process_videos_in_directory(input_folder, output_folder):
    videos_to_process = [os.path.join(root, file) for root, _, files in os.walk(input_folder) for file in files if file.endswith(".mp4")]

    for video_path in tqdm(videos_to_process, desc="Processing Videos"):
        input=torch.tensor(video_to_save_log_mel_spectrogram(video_path, output_folder),dtype=torch.float32)
        return input

# Example usage
input_folder = r"C:\Users\DELL\Downloads\train"
output_folder = r"C:\Users\DELL\Downloads\trainspecto"

input=torch.tensor(process_videos_in_directory(input_folder, output_folder),dtype=torch.float32)


Processing Videos:   0%|          | 0/1 [00:00<?, ?it/s]

MoviePy - Writing audio in C:\Users\DELL\Downloads\trainspecto/v=u5SF4SlqNDQ__#00-00-00_00-01-00_label_G-0-0.wav


Processing Videos:   0%|          | 0/1 [00:01<?, ?it/s]

MoviePy - Done.


Processing Videos:   0%|          | 0/1 [00:04<?, ?it/s]

(128, 5175)





In [2]:
import numpy as np
input=np.transpose(input)
print(input.shape)


torch.Size([5175, 128])


In [3]:
input = input[np.newaxis, :, :]
input=input[:,:512,:]
print(input.shape)


torch.Size([1, 512, 128])


In [14]:
# -*- coding: utf-8 -*-
# @Time    : 6/10/21 5:04 PM
# @Author  : Yuan Gong
# @Affiliation  : Massachusetts Institute of Technology
# @Email   : yuangong@mit.edu
# @File    : ast_models.py


import torch
import torch.nn as nn
from torch.cuda.amp import autocast
import os
import wget
# os.environ['TORCH_HOME'] = '../../pretrained_models'
import timm
from timm.models.layers import to_2tuple,trunc_normal_

# override the timm package to relax the input shape constraint.
class PatchEmbed(nn.Module):
    def __init__(self, img_size=224, patch_size=16, in_chans=3, embed_dim=512):
        super().__init__()

        img_size = to_2tuple(img_size)
        patch_size = to_2tuple(patch_size)
        num_patches = (img_size[1] // patch_size[1]) * (img_size[0] // patch_size[0])
        self.img_size = img_size
        self.patch_size = patch_size
        self.num_patches = num_patches

        self.proj = nn.Conv2d(in_chans, embed_dim, kernel_size=patch_size, stride=patch_size)

    def forward(self, x):
        x = self.proj(x).flatten(2).transpose(1, 2)
        return x

class ASTModel(nn.Module):
    """
    The AST model.
    :param label_dim: the label dimension, i.e., the number of total classes, it is 527 for AudioSet, 50 for ESC-50, and 35 for speechcommands v2-35
    :param fstride: the stride of patch spliting on the frequency dimension, for 16*16 patchs, fstride=16 means no overlap, fstride=10 means overlap of 6
    :param tstride: the stride of patch spliting on the time dimension, for 16*16 patchs, tstride=16 means no overlap, tstride=10 means overlap of 6
    :param input_fdim: the number of frequency bins of the input spectrogram
    :param input_tdim: the number of time frames of the input spectrogram
    :param imagenet_pretrain: if use ImageNet pretrained model
    :param audioset_pretrain: if use full AudioSet and ImageNet pretrained model
    :param model_size: the model size of AST, should be in [tiny224, small224, base224, base384], base224 and base 384 are same model, but are trained differently during ImageNet pretraining.
    """
    def __init__(self, label_dim=527, fstride=10, tstride=10, input_fdim=128, input_tdim=1024, imagenet_pretrain=True, audioset_pretrain=False, model_size='base384', verbose=True):

        super(ASTModel, self).__init__()
        assert timm.__version__ == '0.4.5', 'Please use timm == 0.4.5, the code might not be compatible with newer versions.'

        if verbose == True:
            print('---------------AST Model Summary---------------')
            print('ImageNet pretraining: {:s}, AudioSet pretraining: {:s}'.format(str(imagenet_pretrain),str(audioset_pretrain)))
        # override timm input shape restriction
        timm.models.vision_transformer.PatchEmbed = PatchEmbed
        self.v = timm.create_model('vit_deit_base_distilled_patch16_384', pretrained=imagenet_pretrain)
        self.original_num_patches = self.v.patch_embed.num_patches
        self.oringal_hw = int(self.original_num_patches ** 0.5)
        self.original_embedding_dim = self.v.pos_embed.shape[2]
        self.mlp_head = nn.Sequential(nn.LayerNorm(self.original_embedding_dim),nn.Linear(self.original_embedding_dim,128))

    
        f_dim, t_dim = self.get_shape(fstride, tstride, input_fdim, input_tdim)
        num_patches = f_dim * t_dim
        self.v.patch_embed.num_patches = num_patches
        if verbose == True:
            print('frequncey stride={:d}, time stride={:d}'.format(fstride, tstride))
            print('number of patches={:d}'.format(num_patches))
        
        new_proj = torch.nn.Conv2d(1, self.original_embedding_dim, kernel_size=(16, 16), stride=(fstride, tstride))
        if imagenet_pretrain == True:
            new_proj.weight = torch.nn.Parameter(torch.sum(self.v.patch_embed.proj.weight, dim=1).unsqueeze(1))
            new_proj.bias = self.v.patch_embed.proj.bias
        self.v.patch_embed.proj = new_proj
        new_pos_embed = nn.Parameter(torch.zeros(1, self.v.patch_embed.num_patches , self.original_embedding_dim))
        self.v.pos_embed = new_pos_embed
        trunc_normal_(self.v.pos_embed, std=.02)
        # if AudioSet pretraining is not used (but ImageNet pretraining may still apply)
        # if audioset_pretrain == False:
        #     if model_size == 'tiny224':
        #         self.v = timm.create_model('vit_deit_tiny_distilled_patch16_224', pretrained=imagenet_pretrain)
        #     elif model_size == 'small224':
        #         self.v = timm.create_model('vit_deit_small_distilled_patch16_224', pretrained=imagenet_pretrain)
        #     elif model_size == 'base224':
        #         self.v = timm.create_model('vit_deit_base_distilled_patch16_224', pretrained=imagenet_pretrain)
        #     elif model_size == 'base384':
        #         self.v = timm.create_model('vit_deit_base_distilled_patch16_384', pretrained=imagenet_pretrain)
        #     else:
        #         raise Exception('Model size must be one of tiny224, small224, base224, base384.')
        #     self.original_num_patches = self.v.patch_embed.num_patches
        #     self.oringal_hw = int(self.original_num_patches ** 0.5)
        #     self.original_embedding_dim = self.v.pos_embed.shape[2]
        #     self.mlp_head = nn.Sequential(nn.LayerNorm(self.original_embedding_dim), nn.Linear(self.original_embedding_dim, label_dim))

        #     # automatcially get the intermediate shape
        #     f_dim, t_dim = self.get_shape(fstride, tstride, input_fdim, input_tdim)
        #     num_patches = f_dim * t_dim
        #     self.v.patch_embed.num_patches = num_patches
        #     if verbose == True:
        #         print('frequncey stride={:d}, time stride={:d}'.format(fstride, tstride))
        #         print('number of patches={:d}'.format(num_patches))

            # # the linear projection layer
            # new_proj = torch.nn.Conv2d(1, self.original_embedding_dim, kernel_size=(16, 16), stride=(fstride, tstride))
            # if imagenet_pretrain == True:
            #     new_proj.weight = torch.nn.Parameter(torch.sum(self.v.patch_embed.proj.weight, dim=1).unsqueeze(1))
            #     new_proj.bias = self.v.patch_embed.proj.bias
            # self.v.patch_embed.proj = new_proj

            # # the positional embedding
            # if imagenet_pretrain == True:
            #     # get the positional embedding from deit model, skip the first two tokens (cls token and distillation token), reshape it to original 2D shape (24*24).
            #     new_pos_embed = self.v.pos_embed[:, 2:, :].detach().reshape(1, self.original_num_patches, self.original_embedding_dim).transpose(1, 2).reshape(1, self.original_embedding_dim, self.oringal_hw, self.oringal_hw)
            #     # cut (from middle) or interpolate the second dimension of the positional embedding
            #     if t_dim <= self.oringal_hw:
            #         new_pos_embed = new_pos_embed[:, :, :, int(self.oringal_hw / 2) - int(t_dim / 2): int(self.oringal_hw / 2) - int(t_dim / 2) + t_dim]
            #     else:
            #         new_pos_embed = torch.nn.functional.interpolate(new_pos_embed, size=(self.oringal_hw, t_dim), mode='bilinear')
            #     # cut (from middle) or interpolate the first dimension of the positional embedding
            #     if f_dim <= self.oringal_hw:
            #         new_pos_embed = new_pos_embed[:, :, int(self.oringal_hw / 2) - int(f_dim / 2): int(self.oringal_hw / 2) - int(f_dim / 2) + f_dim, :]
            #     else:
            #         new_pos_embed = torch.nn.functional.interpolate(new_pos_embed, size=(f_dim, t_dim), mode='bilinear')
            #     # flatten the positional embedding
            #     new_pos_embed = new_pos_embed.reshape(1, self.original_embedding_dim, num_patches).transpose(1,2)
            #     # concatenate the above positional embedding with the cls token and distillation token of the deit model.
            #     self.v.pos_embed = nn.Parameter(torch.cat([self.v.pos_embed[:, :2, :].detach(), new_pos_embed], dim=1))
            # else:
                # if not use imagenet pretrained model, just randomly initialize a learnable positional embedding
                # TODO can use sinusoidal positional embedding instead
        #         new_pos_embed = nn.Parameter(torch.zeros(1, self.v.patch_embed.num_patches + 2, self.original_embedding_dim))
        #         self.v.pos_embed = new_pos_embed
        #         trunc_normal_(self.v.pos_embed, std=.02)

        # # now load a model that is pretrained on both ImageNet and AudioSet
        # elif audioset_pretrain == True:
        #     if audioset_pretrain == True and imagenet_pretrain == False:
        #         raise ValueError('currently model pretrained on only audioset is not supported, please set imagenet_pretrain = True to use audioset pretrained model.')
        #     if model_size != 'base384':
        #         raise ValueError('currently only has base384 AudioSet pretrained model.')
        #     device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        #     if os.path.exists('../../pretrained_models/audioset_10_10_0.4593.pth') == False:
        #         # this model performs 0.4593 mAP on the audioset eval set
        #         audioset_mdl_url = 'https://www.dropbox.com/s/cv4knew8mvbrnvq/audioset_0.4593.pth?dl=1'
        #         wget.download(audioset_mdl_url, out='../../pretrained_models/audioset_10_10_0.4593.pth')
        #     sd = torch.load('../../pretrained_models/audioset_10_10_0.4593.pth', map_location=device)
        #     audio_model = ASTModel(label_dim=527, fstride=10, tstride=10, input_fdim=128, input_tdim=1024, imagenet_pretrain=False, audioset_pretrain=False, model_size='base384', verbose=False)
        #     audio_model = torch.nn.DataParallel(audio_model)
        #     audio_model.load_state_dict(sd, strict=False)
        #     self.v = audio_model.module.v
        #     self.original_embedding_dim = self.v.pos_embed.shape[2]
        #     self.mlp_head = nn.Sequential(nn.LayerNorm(self.original_embedding_dim), nn.Linear(self.original_embedding_dim, label_dim))

        #     f_dim, t_dim = self.get_shape(fstride, tstride, input_fdim, input_tdim)
        #     num_patches = f_dim * t_dim
        #     self.v.patch_embed.num_patches = num_patches
        #     if verbose == True:
        #         print('frequncey stride={:d}, time stride={:d}'.format(fstride, tstride))
        #         print('number of patches={:d}'.format(num_patches))

        #     new_pos_embed = self.v.pos_embed[:, 2:, :].detach().reshape(1, 1212, 768).transpose(1, 2).reshape(1, 768, 12, 101)
        #     # if the input sequence length is larger than the original audioset (10s), then cut the positional embedding
        #     if t_dim < 101:
        #         new_pos_embed = new_pos_embed[:, :, :, 50 - int(t_dim/2): 50 - int(t_dim/2) + t_dim]
        #     # otherwise interpolate
            # else:
            #     new_pos_embed = torch.nn.functional.interpolate(new_pos_embed, size=(12, t_dim), mode='bilinear')
            # if f_dim < 12:
            #     new_pos_embed = new_pos_embed[:, :, 6 - int(f_dim/2): 6 - int(f_dim/2) + f_dim, :]
            # # otherwise interpolate
            # elif f_dim > 12:
            #     new_pos_embed = torch.nn.functional.interpolate(new_pos_embed, size=(f_dim, t_dim), mode='bilinear')
            # new_pos_embed = new_pos_embed.reshape(1, 768, num_patches).transpose(1, 2)
            # self.v.pos_embed = nn.Parameter(torch.cat([self.v.pos_embed[:, :2, :].detach(), new_pos_embed], dim=1))

    def get_shape(self, fstride, tstride, input_fdim=128, input_tdim=1024):
        test_input = torch.randn(1, 1, input_fdim, input_tdim)
        test_proj = nn.Conv2d(1, self.original_embedding_dim, kernel_size=(16, 16), stride=(fstride, tstride))
        test_out = test_proj(test_input)
        f_dim = test_out.shape[2]
        t_dim = test_out.shape[3]
        return f_dim, t_dim

    @autocast()
    def forward(self, x):
        """
        :param x: the input spectrogram, expected shape: (batch_size, time_frame_num, frequency_bins), e.g., (12, 1024, 128)
        :return: prediction
        """
        # expect input x = (batch_size, time_frame_num, frequency_bins), e.g., (12, 1024, 128)
        x = x.unsqueeze(1)
        x = x.transpose(2, 3)

        B = x.shape[0]
        x = self.v.patch_embed(x)
        # cls_tokens = self.v.cls_token.expand(B, -1, -1)
        # dist_token = self.v.dist_token.expand(B, -1, -1)
        # x = torch.cat((cls_tokens, dist_token, x), dim=1)
        x = x + self.v.pos_embed
        x = self.v.pos_drop(x)
        for blk in self.v.blocks:
            x = blk(x)
        x = self.v.norm(x)
        # x = (x[:, 0] + x[:, 1]) / 2

        x = self.mlp_head(x)
        return x

if __name__ == '__main__':
    # input_tdim = 100
    # ast_mdl = ASTModel(input_tdim=input_tdim)
    # # input a batch of 10 spectrogram, each with 100 time frames and 128 frequency bins
    # test_input = torch.rand([10, input_tdim, 128])
    # test_output = ast_mdl(test_input)
    # # output should be in shape [10, 527], i.e., 10 samples, each with prediction of 527 classes.
    # print(test_output.shape)

    input_tdim = 512
    ast_mdl = ASTModel(input_tdim=input_tdim)
    # input a batch of 10 spectrogram, each with 512 time frames and 128 frequency bins
    # test_input = torch.rand([10, input_tdim, 128])
    test_output = ast_mdl(input)
    # output should be in shape [10, 50], i.e., 10 samples, each with prediction of 50 classes.
    print(test_output.shape)

---------------AST Model Summary---------------
ImageNet pretraining: True, AudioSet pretraining: False
frequncey stride=10, time stride=10
number of patches=600
torch.Size([1, 600, 128])
