Libraries required for developing the project

In [2]:
!pip install laion-clap
!pip install transformers==4.30.2
!pip install soundfile
!pip install librosa
!pip install torchlibrosa
!pip install ftfy
!pip install braceexpand
!pip install webdataset
!pip install wget
!pip install wandb
!pip install llvmlite
!pip install scipy
!pip install scikit-learn
!pip install pandas
!pip install h5py
!pip install tqdm
!pip install regex
!pip install torch



In [None]:
import laion_clap
import torch
from huggingface_hub import hf_hub_download
import librosa
import os
import numpy as np

model = laion_clap.CLAP_Module(enable_fusion=False, amodel= 'HTSAT-base')
dataset_path = hf_hub_download(repo_id="lukewys/laion_clap", filename="music_speech_audioset_epoch_15_esc_89.98.pt")
model.load_ckpt(dataset_path)
# quantization
def int16_to_float32(x):
    return (x / 32767.0).astype(np.float32)


def float32_to_int16(x):
    x = np.clip(x, a_min=-1., a_max=1.)
    return (x * 32767.).astype(np.int16)

def get_text_embed( batch):
        double_batch = False
        if len(batch) == 1:
            batch = batch * 2
            double_batch = True
            text_data = model.tokenizer(batch)
            embed = model.model.get_text_embedding(text_data)
        if double_batch:
            embed = embed[0].unsqueeze(0)

        return embed.detach()

#text_data = ['pigeons are cooing in the background']
#text_input = get_text_embed(text_data)
#print(text_input.shape)
## Get audio embeddings from audio data
#
## Get text embedings from texts:
#text_data = ["I love the contrastive learning","I love the pretrain model"]
#text_embed = model.get_text_embedding(text_data)
#print(text_embed)
#print(text_embed.shape)
#audio_file = [
#    './audio1.mp3']
#audio_embed = model.get_audio_embedding_from_filelist(x = audio_file, use_tensor=True)
#print(audio_embed.shape)

# Nuova sezione

In [None]:
!pip install pytube
!pip install pydub

In [None]:
from  pytube import YouTube
import os
from pydub import AudioSegment
import csv
import random
import math
import torch
import torchaudio

def cut_audio(input_file, output_file, start_time, end_time):
    audio = AudioSegment.from_file(input_file)
    audio = audio.set_frame_rate(32000)
    cut_audio = audio[start_time:end_time]
    cut_audio.export(output_file, format="mp3")

def get_mixture_audio(audio1,audio2):

    waveform_s1, sample_rate_s1 = torchaudio.load(audio1)
    waveform_s2,sample_rate_s2 = torchaudio.load(audio2)

    E1 = torch.square(torch.norm(waveform_s1,p=2))
    E2 = torch.square(torch.norm(waveform_s2,p=2))

    alpha = torch.sqrt(E1/E2)

    x = waveform_s1 + alpha * waveform_s2
    return x
list_download = os.listdir("./download")
def get_audio_clip(video_id, start, end, download=True):

    if download:

        if f"{video_id}.mp3" not in list_download:

            video_url = f"https://www.youtube.com/watch?v={video_id}"
            selected_video = YouTube(video_url)
            audio = selected_video.streams.filter(only_audio = True).first()
            path_dest = audio.download("./download", filename=f"{video_id}.mp3")
            cut_audio(path_dest, path_dest, start*1000, end*1000)
            print("downloaded "+video_id)

        path_dest = f"./download/{video_id}.mp3"

    else:
        print("clip "+video_id + "already downloaded")
        if f"{video_id}.mp3" not in os.listdir("./download"):
            return ""

        else:
            path_dest = f"./download/{video_id}.mp3"

    return path_dest



def download_all_dataset():
    with open("./drive/MyDrive/Neural-Networks/new_balanced.csv", mode ='r')as file:
        csvFile = csv.reader(file)
        for lines in csvFile:
            video_id = lines[0]
            start = lines[1]
            end = lines[2]
            try:
                get_audio_clip(video_id,float(start),float(end))
            except:
                continue




# get a random row from the file "new_balanced.csv"

def get_random_pair(file_name):

    with open(file_name, 'r') as file:
        reader = csv.reader(file)
        data = list(reader)

        random_rows = random.sample(data, 2)

        #print(random_rows[0])
        #print(random_rows[1])
        return random_rows




def get_training_element(downloaded=True):
    # we get a random pair of audios from the file
    if not downloaded:
      audios = get_random_pair("./drive/MyDrive/Neural-Networks/new_balanced.csv")
    else:
      audios = get_random_files("/content/Neural-Networks-Mastrandrea-Frangella/download")

    # divide the two audo metadata
    audio1_metadata = audios[0]
    audio2_metadata = audios[1]

    #cast the initial audio time of each track
    start1 = float(audio1_metadata[1])
    start2 = float(audio2_metadata[1])

    #cast the final audio time of each track
    end1 = float(audio1_metadata[2])
    end2 = float(audio2_metadata[2])

    #download the two audio clips, cut them in the defined interval and save

    audio1 = get_audio_clip(audio1_metadata[0],start1,end1)
    audio2 = get_audio_clip(audio2_metadata[0],start2,end2)

    path_clip1 = audio1
    path_clip2 = audio2
    #load the downloaded files

    audio1 = AudioSegment.from_file(audio1)
    audio2 = AudioSegment.from_file(audio2)


    duration1 = audio1.duration_seconds
    duration2 = audio2.duration_seconds

    # now we have to sample 5 random seconds from each clip


    start_time1 = random.uniform(0,(duration1-5))
    start_time2 = random.uniform(0,(duration2-5))

    # we cut the two audios in a random sample of 5 second

    clipped_audio1 = audio1[start_time1*1000:(start_time1+5)*1000]
    clipped_audio2 = audio2[start_time2*1000:(start_time2+5)*1000]

    clipped_audio1.export(path_clip1, format="mp3")
    clipped_audio2.export(path_clip2, format="mp3")

    # we save the two clips and then we combine them

    mixed = get_mixture_audio(path_clip1,path_clip2)
#
    #torchaudio.save("./download/mixed.mp3",mixed,32000)
#
    #out = torch.stft(mixed,n_fft=1024,hop_length=320,return_complex=True)
#
    ##return the text to enter into CLAP
    #
    #
#
    query = audio1_metadata[-1]
#
    #print(query)
#
    #query = query.replace("[","")
    #query = query.replace("]","")
    #query = query.replace(",","")
    #query = query.replace("'","")
    #
    #
    #print(query)
    #
#
    #magnitude_spectrogram = torch.abs(out)
    #phase_spectrogram = torch.angle(out)

    #query = [query]

    return (mixed,query)




def sure_training_item():

    while True:
        try:
            element = get_training_element()
        except:
            continue
        break

    return element




()

In [None]:
!pwd

In [None]:
!zip -r /content/download.zip /content/download


In [None]:
def get_input(modality):
    batch_audio = get_batch()
    s1 = random.sample(batch_audio,10)
    s2 = random.sample(batch_audio,10)
    values = [text_dict[key[:-4]] for key in s1 if key[:-4] in text_dict]
    mixed = []
    for i in range(10):
        audio1 = AudioSegment.from_file(f'./download/{s1[i]}')
        audio1 = audio1.set_frame_rate(32000)
        audio2 = AudioSegment.from_file(f'./download/{s2[i]}')
        audio2 = audio2.set_frame_rate(32000)
        duration1 = audio1.duration_seconds
        duration2 = audio2.duration_seconds

        # now we have to sample 5 random seconds from each clip


        start_time1 = random.uniform(0,(duration1-5))
        start_time2 = random.uniform(0,(duration2-5))

        # we cut the two audios in a random sample of 5 second

        clipped_audio1 = audio1[start_time1*1000:(start_time1+5)*1000]
        clipped_audio2 = audio2[start_time2*1000:(start_time2+5)*1000]

        path_clip1 = "./tmp/audio1.mp3"
        path_clip2 = "./tmp/audio2.mp3"
        clipped_audio1.export(path_clip1, format="mp3")
        clipped_audio2.export(path_clip2, format="mp3")

    # we save the two clips and then we combine them

        mixed.append(get_mixture_audio(path_clip1,path_clip2))
    if modality == 'text':
        return(mixed,values)
    else:
        if random.random() > 0.5:
            return mixed,values
        else:
            return mixed,["./download/"+elem for elem in s1]



def batch():
  batch = []
  for i in range(10):
    batch.append(sure_training_item())
  print(batch)


def get_random_files(directory, count=20):
    files = os.listdir(directory)
    random_files = random.sample(files, count)
    return random_files

def get_batch():
    directory_path = './download'
    random_files = get_random_files(directory_path, 20)
    return random_files

In [None]:
text_dict = {}
with open('./drive/MyDrive/Neural-Networks/new_balanced.csv', mode ='r')as file:
        csvFile = csv.reader(file)
        for lines in csvFile:
            label = lines[4][1:-1]
            label = label.replace("[","")
            label = label.replace("]","")
            label = label.replace(",","")
            label = label.replace("'","")
            text_dict[lines[0]]=label


In [None]:
batch = get_input('text')
text_embeddings = model.get_text_embedding(batch[1])
print(batch)

In [None]:

pat = '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'
!git clone https://{pat}@github.com/LorenzoFrangella/Neural-Networks-Mastrandrea-Frangella

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

def init_bn(bn):
    bn.bias.data.fill_(0.0)
    bn.weight.data.fill_(1.0)

def init_layer(layer):
    nn.init.xavier_uniform_(layer.weight)

    if hasattr(layer, "bias"):
        if layer.bias is not None:
            layer.bias.data.fill_(0.0)

In [None]:
class FilmModule(nn.Module):
    def __init__(self,input_size,output_size):
        super(FilmModule, self).__init__()
        self.input_size = input_size
        self.output_size = output_size
        self.linear = nn.Sequential(
            nn.Linear(input_size, output_size * 2),
            nn.ReLU(inplace=True),
            nn.Linear(output_size * 2, output_size),
            nn.ReLU(inplace=True)
        )

    def forward(self,data,embedding_vector):
        print(self.input_size)
        x = self.linear(embedding_vector)
        x = data + x[...,None,None]

        return x


Film1_1 = FilmModule(512,32)

random_embedding = torch.rand(1,512)
random_value = torch.rand(32,513,501)

print(Film1_1(random_value,random_embedding))



In [None]:
class EncoderBlock(nn.Module):
    def __init__(self,input_channels, output_channels, embedding_size, momentum,downsample):
        super(EncoderBlock, self).__init__()
        self.downsample = downsample
        self.Film1 = FilmModule(embedding_size,input_channels)
        self.Film2 = FilmModule(embedding_size,output_channels)


        self.bn1 = nn.BatchNorm2d(input_channels,momentum=momentum)

        self.conv1 = nn.Conv2d(
            in_channels=input_channels,
            out_channels=output_channels,
            kernel_size=(3,3),
            stride=(1,1),
            dilation=(1,1),
            padding=(1,1),
            bias=False
            )

        self.bn2 = nn.BatchNorm2d(output_channels,momentum=momentum)

        self.conv2 = nn.Conv2d(
            in_channels=output_channels,
            out_channels=output_channels,
            kernel_size=(3,3),
            stride=(1,1),
            dilation=(1,1),
            padding=(1,1),
            bias=False
        )

        if input_channels != output_channels:
            self.residual_convolution = nn.Conv2d(
                in_channels=input_channels,
                out_channels=output_channels,
                kernel_size=(1,1),
                stride=(1,1),
                padding=(0,0),
            )
            self.has_residual_connection = True
        else:
            self.has_residual_connection = False

        self.init_weights()


    def init_weights(self):
        init_bn(self.bn1)
        init_bn(self.bn2)
        init_layer(self.conv1)
        init_layer(self.conv2)

        if self.has_residual_connection:
            init_layer(self.residual_convolution)



    def forward(self,input_tensor,embedding_vector):
        x = self.bn1(input_tensor)
        x = self.Film1(x,embedding_vector)
        x = F.leaky_relu(x,negative_slope=0.01)
        x = self.conv1(x)
        print(x.shape)
        x = self.bn2(x)
        x = self.Film2(x,embedding_vector)
        x = F.leaky_relu(x,negative_slope=0.01)
        x = self.conv2(x)

        if self.has_residual_connection:
            y = self.residual_convolution(input_tensor)
            x = x + y

        x_pool = F.avg_pool2d(x,self.downsample)

        return x, x_pool
Encoder1 = EncoderBlock(32,64,512,0.01,(2, 2))
input = torch.rand(12,32,513,313)
embedding = torch.rand(1,512)
res = Encoder1(input,embedding)
print(res[0].shape,res[1].shape)

In [None]:
class DecoderBlock(nn.Module):

    def __init__(self,input_size, output_size,embedding_size,momentum,upsample):
        super(DecoderBlock, self).__init__()
        self.upsample = upsample

        self.conv1 = torch.nn.ConvTranspose2d(
            in_channels=input_size,
            out_channels=output_size,
            kernel_size=self.upsample,
            stride=self.upsample,
            padding=(0,0),
            bias=False,
            dilation=(1,1)

        )

        self.bn1 = nn.BatchNorm2d(input_size,momentum=momentum)

        #self.conv_block2 = ConvBlockRes(
        #    out_channels * 2, out_channels, kernel_size, momentum, has_film,

        self.Film1 = FilmModule(embedding_size,input_size)
        self.Film2 = FilmModule(embedding_size,output_size*2)
        self.Film3 = FilmModule(embedding_size,output_size)

        self.bn2 = nn.BatchNorm2d(output_size*2,momentum=momentum)
        self.bn3 = nn.BatchNorm2d(output_size,momentum=momentum)

        self.conv2 = nn.Conv2d(
            in_channels=output_size*2,
            out_channels=output_size,
            kernel_size=(3,3),
            stride=(1,1),
            dilation=(1,1),
            padding=(1,1),
            bias=False
        )

        self.conv3 = nn.conv2d(
            in_channels=output_size,
            out_channels=output_size,
            kernel_size=(3,3),
            stride=(1,1),
            dilation=(1,1),
            padding=(1,1),
            bias=False
        )

        if input_size != output_size:
            self.residual_convolution = nn.Conv2d(
                in_channels=input_size,
                out_channels=output_size,
                kernel_size=(1,1),
                stride=(1,1),
                padding=(0,0),
            )
            self.has_residual_connection = True
        else:
            self.has_residual_connection = False

        self.bn4 = nn.BatchNorm2d(input_size,momentum=momentum)




        self.init_weights()

    def init_weights(self):
        init_bn(self.bn1)
        init_bn(self.bn2)
        init_bn(self.bn3)

        init_layer(self.conv1)
        init_layer(self.conv2)
        init_layer(self.conv3)

        if self.has_residual_connection:
            init_layer(self.residual_convolution)

    def forward(self,input_tensor,concat_tensor,embedding_vector):
        x = self.bn1(input_tensor)
        x = self.Film1(x,embedding_vector)
        x = F.leaky_relu(x)

        x = self.conv1(x)

        x = torch.cat((x,concat_tensor), dim=1)

        x = self.bn2(x)
        x = self.Film2(x,embedding_vector)
        x = F.leaky_relu(x,negative_slope=0.01)
        x = self.conv2(x)
        x = self.bn3(x)
        x = self.Film3(x,embedding_vector)
        x = F.leaky_relu(x,negative_slope=0.01)
        x = self.conv3(x)

        if self.has_residual_connection:
            y = self.residual_convolution(input_tensor)
            x = x + y

        return x





In [None]:
class ResUnet(nn.Module):

    def __init__(self, input_size, output_size):
        super(ResUnet, self).__init__()

        self.input_size = input_size;
        self.output_size = output_size;

        self.momentum = 0.01


        # instanziare la preconv che è una conv2d

        # definire la classe degli encoder block
        # definire la classe dei decoder block

        self.batch_norm0 = nn.BatchNorm2d(513,momentum=self.momentum)


        self.preconvolution = nn.Conv2d(
            input_channels=input_size,
            kernel_size=(1,1),
            stride=(1,1),
            padding=(0,0),
            bias=True
        )


        self.after_conv = nn.Conv2d(
            in_channels=32,
            out_channels=output_size * 3,
            kernel_size=(1, 1),
            stride=(1, 1),
            padding=(0, 0),
            bias=True,
        )


    def forward(self,input):

