# Inizializzazione

In [None]:
!pip install transformers

In [9]:
!pip install datasets




[notice] A new release of pip is available: 24.0 -> 24.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [10]:
!pip install librosa




[notice] A new release of pip is available: 24.0 -> 24.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [7]:
!pip install soundfile




[notice] A new release of pip is available: 24.0 -> 24.1
[notice] To update, run: python.exe -m pip install --upgrade pip


# Import AST Pretrained and test

## Import dataset huggingface

In [None]:
from transformers import AutoFeatureExtractor, ASTForAudioClassification
from datasets import load_dataset
import torch

dataset = load_dataset("hf-internal-testing/librispeech_asr_demo", "clean", split="validation", trust_remote_code=True)
dataset = dataset.sort("id")
sampling_rate = dataset.features["audio"].sampling_rate

## Import AST huggingface

In [123]:
# ast feature extractor
feature_extractor = AutoFeatureExtractor.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")

In [124]:
# ast pretrained
ast_huggingface = ASTForAudioClassification.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")
ast_huggingface

ASTForAudioClassification(
  (audio_spectrogram_transformer): ASTModel(
    (embeddings): ASTEmbeddings(
      (patch_embeddings): ASTPatchEmbeddings(
        (projection): Conv2d(1, 768, kernel_size=(16, 16), stride=(10, 10))
      )
      (dropout): Dropout(p=0.0, inplace=False)
    )
    (encoder): ASTEncoder(
      (layer): ModuleList(
        (0-11): 12 x ASTLayer(
          (attention): ASTSdpaAttention(
            (attention): ASTSdpaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.0, inplace=False)
            )
            (output): ASTSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.0, inplace=False)
            )
          )
          (intermediate): ASTIntermediate(
       

## Test pretrained model

In [125]:
# audio file is decoded on the fly
inputs = feature_extractor(dataset[0]["audio"]["array"], sampling_rate=sampling_rate, return_tensors="pt")

with torch.no_grad():
    logits = ast_huggingface(**inputs).logits

predicted_class_ids = torch.argmax(logits, dim=-1).item()
predicted_label = ast_huggingface.config.id2label[predicted_class_ids]
print(predicted_label)

# compute loss - target_label is e.g. "down"
target_label = ast_huggingface.config.id2label[0]
inputs["labels"] = torch.tensor([ast_huggingface.config.label2id[target_label]])
loss = ast_huggingface(**inputs).loss
round(loss.item(), 2)

Speech


0.17

# Prompt Tuning

## Retrieve Output size

In [126]:
from transformers import ASTModel
import torch

ast_model = ASTModel.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")
ast_model

ASTModel(
  (embeddings): ASTEmbeddings(
    (patch_embeddings): ASTPatchEmbeddings(
      (projection): Conv2d(1, 768, kernel_size=(16, 16), stride=(10, 10))
    )
    (dropout): Dropout(p=0.0, inplace=False)
  )
  (encoder): ASTEncoder(
    (layer): ModuleList(
      (0-11): 12 x ASTLayer(
        (attention): ASTSdpaAttention(
          (attention): ASTSdpaSelfAttention(
            (query): Linear(in_features=768, out_features=768, bias=True)
            (key): Linear(in_features=768, out_features=768, bias=True)
            (value): Linear(in_features=768, out_features=768, bias=True)
            (dropout): Dropout(p=0.0, inplace=False)
          )
          (output): ASTSelfOutput(
            (dense): Linear(in_features=768, out_features=768, bias=True)
            (dropout): Dropout(p=0.0, inplace=False)
          )
        )
        (intermediate): ASTIntermediate(
          (dense): Linear(in_features=768, out_features=3072, bias=True)
          (intermediate_act_fn): GELUAct

In [127]:
# audio file is decoded on the fly
inputs = feature_extractor(dataset[0]["audio"]["array"], sampling_rate=sampling_rate, return_tensors="pt")
with torch.no_grad():
    outputs = ast_model(**inputs)

last_hidden_states = outputs.last_hidden_state
list(last_hidden_states.shape)

[1, 1214, 768]

## Model and testing

In [107]:
from functools import reduce
from operator import mul
import math
import torch
import torch.nn as nn

class AST_PromptTuning(nn.Module):

    # dropout apply dropout after each prompt
    # str = "none" --> only head tuning
    def __init__(self, prompt_tokens: int = 5, prompt_dropout: float = 0.0, prompt_type: str = 'deep'):
        super().__init__()

        # load vit model
        self.encoder = ASTModel.from_pretrained("MIT/ast-finetuned-audioset-10-10-0.4593")

        # hidden_size = depth of the model
        self.classifier = nn.Sequential(
            nn.Linear(self.encoder.config.hidden_size, 384),
            # nn.Linear(self.encoder.config.hidden_size, 192),
            # nn.Linear(self.encoder.config.hidden_size, 96),
            nn.Linear(384, 15)
        )

        # freeze
        for n, p in self.encoder.named_parameters():
            p.requires_grad = False

        self.prompt_type = prompt_type # "shallow" "deep" or None

        if prompt_type is not None:

            # prompt
            self.prompt_tokens = prompt_tokens  # number of prompted tokens
            self.prompt_dropout = nn.Dropout(prompt_dropout)
            self.prompt_dim = self.encoder.config.hidden_size

            # initiate prompt (random)
            val = math.sqrt(6. / float(3 * reduce(mul, (self.encoder.config.patch_size, self.encoder.config.patch_size), 1) + self.prompt_dim))

            # my vector of learnable parameters (how many (prompt_tokens) and dimension (prompt_dim))
            self.prompt_embeddings = nn.Parameter(torch.zeros(1, self.prompt_tokens, self.prompt_dim))

            # xavier_uniform initialization
            nn.init.uniform_(self.prompt_embeddings.data, -val, val)

            if self.prompt_type == 'deep':
                self.total_d_layer = self.encoder.config.num_hidden_layers
                self.deep_prompt_embeddings = nn.Parameter(
                    # - 1 cause shallow already inserted
                    torch.zeros(self.total_d_layer-1, self.prompt_tokens, self.prompt_dim)
                )
                # xavier_uniform initialization
                nn.init.uniform_(self.deep_prompt_embeddings.data, -val, val)

    def train(self, mode=True):
        # set train status for this class: disable all but the prompt-related modules
        if mode:
            # training:
            self.encoder.eval()
            if self.prompt_type is not None:
              # enable dropout and batch normalization
                self.prompt_dropout.train()
        else:
            # eval:
            for module in self.children():
                module.train(mode)

    def incorporate_prompt(self, x, prompt_embeddings, n_prompt: int = 0):
        # x shape: (batch size, n_tokens, hidden_dim)
        # pompt_embeddings shape: (1, n_prompt, hidden_dim)
        B = x.shape[0]

        # peek the class token, add prompts, add sequence

        # concat prompts: (batch size, cls_token + n_prompt + n_patches, hidden_dim)
        x = torch.cat((
            x[:, :1, :],
            self.prompt_dropout(prompt_embeddings.expand(B, -1, -1)),
            x[:, (1+n_prompt):, :]
        ), dim=1)

        return x

    def forward_features(self, x):

        # go through the encoder embeddings
        x = self.encoder.embeddings(x)

        # add prompts
        x = self.incorporate_prompt(x, self.prompt_embeddings)

        if self.prompt_type == 'deep':
            # deep mode
            x = model.encoder.encoder.layer[0](x)[0]
            for i in range(1, self.total_d_layer):
                x = self.incorporate_prompt(x, self.deep_prompt_embeddings[i-1], self.prompt_tokens)
                x = model.encoder.encoder.layer[i](x)[0]
        else:
            # shallow mode
            x = self.encoder.encoder(x)["last_hidden_state"]

        x = self.encoder.layernorm(x)
        #print(x.shape)
        return x

    def forward(self, x):
        if self.prompt_type is not None:
            x = self.forward_features(x)[:, 0, :]
        else:
          # pass x, take the classification token
            x = self.encoder(x)["last_hidden_state"][:, 0, :]

        x = self.classifier(x)
        return x

In [108]:
ast_prompt = AST_PromptTuning(prompt_type=None)
# count number of parameters
print("AST params:", sum(p.numel() for p in ast_prompt.parameters()))
# count number of trainable parameters
print("Head fine-tuning:", sum(p.numel() for p in ast_prompt.parameters() if p.requires_grad))
ast_prompt_shallow = AST_PromptTuning(prompt_type='shallow')
# count number of trainable parameters
print("Shallow prompt-tuning:", sum(p.numel() for p in ast_prompt_shallow.parameters() if p.requires_grad))
ast_prompt_deep = AST_PromptTuning(prompt_type='deep')
# count number of trainable parameters
print("Deep prompt-tuning:", sum(p.numel() for p in ast_prompt_deep.parameters() if p.requires_grad))

AST params: 86488335
Head fine-tuning: 301071
Shallow prompt-tuning: 304911
Deep prompt-tuning: 347151


In [129]:
# audio file is decoded on the fly
inputs = feature_extractor(dataset[0]["audio"]["array"], sampling_rate=sampling_rate, return_tensors="pt")

with torch.no_grad():
    outputs = ast_prompt(inputs['input_values'])

predicted_class_ids = torch.argmax(outputs, dim=-1).item()
predicted_class_ids

14

In [170]:
import torch.nn.functional as F

softmax = F.softmax(outputs, dim=1)
softmax

tensor([[0.0868, 0.0701, 0.0472, 0.0732, 0.0544, 0.0472, 0.0497, 0.0480, 0.0695,
         0.0493, 0.0593, 0.0988, 0.1038, 0.0229, 0.1197]])

# Implementation

## Utilities

In [304]:
import os
import librosa

def load_audio(audio_path):
    audio, sample_rate = librosa.load(audio_path, sr=16000)

    return audio, sample_rate

## TUT17 Dataset

In [319]:
from torch.utils.data import Dataset
import random

# from folder to PyTorch Dataset
class TUT17(Dataset):
    def __init__(self, root_dir, split = 'train', seed = 42, val_frac= 0.1, test_frac= 0.1):
        super().__init__()

        # we use seed because every time we instantiate the dataset we shuffle all the data
        # we call at least 3 times (train, validation, test) --> overlapping area
        # with seed we are sure that the dataset is shuffled always in the same way
        random.seed(seed)
        self.root_dir = root_dir
        
        audio_names = os.listdir(os.path.join(root_dir, 'Audio'))
        
        num_val = int(len(audio_names)*val_frac)
        num_test = int(len(audio_names)*test_frac)
        num_train = len(audio_names) - num_val - num_test

        random.shuffle(audio_names)
    
        # at this step we are only using images names - we are not using images
        if split == 'train':
            self.data = audio_names[:num_train]
        elif split == 'val':
            self.data = audio_names[num_train:num_train+num_val]
        elif split == 'test':
            self.data = audio_names[-num_test:]
        else:
          raise ValueError('Invalid split value.')
    
    # optional
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        audio_path = os.path.join(self.root_dir, 'audio', self.data[idx])

        audio_name = audio_path.split('/')[-1][6:].replace('\\', '/')

        label_path = os.path.join(self.root_dir, 'labels\evaluate.txt')

        with open(label_path, "r") as f:
            while line := f.readline():
                if line.split('\t')[0] == audio_name:
                    f.close()
                    audio, sample_rate = load_audio(audio_path)
                    return {'audio': audio, 'sample_rate': sample_rate, 'label': line.split('\t')[1][:-1]}

In [321]:
train_dataset = TUT17(root_dir = "c:/Users/cerru/Desktop/TUT17", split='train')
val_dataset = TUT17(root_dir = 'c:/Users/cerru/Desktop/TUT17', split='val')
test_dataset = TUT17(root_dir = 'c:/Users/cerru/Desktop/TUT17', split='test')

In [326]:
from torch.utils.data import DataLoader

# Define loaders
train_loader = DataLoader(train_dataset, batch_size=4, num_workers=0, shuffle=True, drop_last=True)
val_loader   = DataLoader(val_dataset,   batch_size=4, num_workers=0, shuffle=False, drop_last=True)
test_loader  = DataLoader(test_dataset,  batch_size=4, num_workers=0, shuffle=False, drop_last=True)

batch = next(iter(train_loader))
print(batch)

{'audio': tensor([[ 7.3242e-03,  1.1750e-02,  1.0064e-02,  ..., -2.6589e-03,
         -3.2216e-03,  0.0000e+00],
        [-1.6393e-04, -2.6656e-04, -1.3281e-04,  ...,  4.0121e-04,
         -8.6546e-06,  0.0000e+00],
        [-2.5062e-04, -9.2742e-04, -1.6309e-03,  ..., -5.0162e-03,
         -5.3743e-03,  0.0000e+00],
        [ 8.9035e-04,  1.2040e-03,  1.9683e-03,  ...,  1.0775e-03,
          3.8713e-04,  0.0000e+00]]), 'sample_rate': tensor([16000, 16000, 16000, 16000]), 'label': ['train', 'residential_area', 'residential_area', 'beach']}


## Train

In [None]:
def train(model, feature_extractor, criterion, epochs, dev, lr=0.001, load_checkpoint = False, save_every = 10, save_path = 'weights'):
    try:
        if not os.path.isdir(save_path):
            os.mkdir(save_path)
            
        #Move model to CUDA
        model = model.to(dev)

        # create optimizer
        optimizer = optim.Adam(model.parameters(), lr = lr)
        
        # load checkpoints
        if load_checkpoint:
            if os.path.isfile(os.path.join(save_path,'weights.pt')):
                print('Loading weights...')
                # it is possible to load a state dict that doesn't match the networck architecture by passing asserting the strict mode
                model.load_state_dict(torch.load(os.path.join(save_path,'weights.pt')))
            if os.path.isfile(os.path.join(save_path,'optim.pt')):
                print('Loading optimizer...')
                optimizer.load_state_dict(torch.load(os.path.join(save_path,'optim.pt')))
            print('Loading completed!')

        # Initialize history
        history_loss = {"train": [], "val": [], "test": []}
        history_accuracy = {"train": [], "val": [], "test": []}

        # Process each epoch
        for epoch in range(epochs):
            # Initialize epoch variables
            sum_loss = {"train": 0, "val": 0, "test": 0}
            sum_accuracy = {"train": 0, "val": 0, "test": 0}
            
            # Process each split
            for split in ["train", "val", "test"]:
                #Select train() or eval() mode
                torch.set_grad_enabled(split == 'train')
                if split == 'train':
                  model.train()
                else:
                  model.eval()
                    
                # Process each batch
                for batch in loaders[split]:
                    # Move to CUDA
                    input_audio = batch['audio'].to(dev)
                    sample_rate = batch['sample_rate'].to(dev)
                    target = batch['label'].squeeze(1).to(dev)
                    
                    # Reset gradients
                    optimizer.zero_grad()
                    
                    # Compute output
                    ast_imput = feature_extractor(input_audio, sampling_rate=sample_rate, return_tensors="pt")
                    output = model(ast_input)

                    # Compute loss 
                    loss = criterion(output, target.long(), weight=weights)
                    
                    # Update loss
                    sum_loss[split] += loss.item()
                    
                    # Check parameter update
                    if split == "train":
                        # Compute gradients
                        loss.backward()
                        # Optimize
                        optimizer.step()
                        
                    # Compute accuracy
                    pred = torch.argmax(output,1)
                    batch_accuracy = (pred == target).sum().item()/target.numel()
                    
                    # Update accuracy
                    sum_accuracy[split] += batch_accuracy

                # checkpoint
                if epoch%save_every == 0 and split == 'train':
                    torch.save(model.state_dict(), os.path.join(save_path, 'weights.pt'))
                    torch.save(optimizer.state_dict(), os.path.join(save_path, 'optim.pt'))
                
                
            # Compute epoch loss/accuracy
            epoch_loss = {split: sum_loss[split]/len(loaders[split]) for split in ["train", "val", "test"]}
            epoch_accuracy = {split: sum_accuracy[split]/len(loaders[split]) for split in ["train", "val", "test"]}
            # Update history
            for split in ["train", "val", "test"]:
                history_loss[split].append(epoch_loss[split])
                history_accuracy[split].append(epoch_accuracy[split])
            # Print info
            print(f"Epoch {epoch+1}:",
                  f"TrL={epoch_loss['train']:.4f},",
                  f"TrA={epoch_accuracy['train']:.4f},",
                  f"VL={epoch_loss['val']:.4f},",
                  f"VA={epoch_accuracy['val']:.4f},",
                  f"TeL={epoch_loss['test']:.4f},",
                  f"TeA={epoch_accuracy['test']:.4f},")
    except KeyboardInterrupt:
        print("Interrupted")
    finally:
        # Plot loss
        plt.title("Loss")
        for split in ["train", "val", "test"]:
            plt.plot(history_loss[split], label=split)
        plt.legend()
        plt.show()
        # Plot accuracy
        plt.title("Accuracy")
        for split in ["train", "val", "test"]:
            plt.plot(history_accuracy[split], label=split)
        plt.legend()
        plt.show()

In [327]:
def conta_istanze_classi(path):
    class_count = {}



    with open(path, 'r') as file:

        for line in file:

            nome_audio, nome_classe = line.strip().split()

            if nome_classe in class_count:
                class_count[nome_classe] += 1
            else:
                class_count[nome_classe] = 1


    return class_count

print(conta_istanze_classi("c:/Users/cerru/Desktop/TUT17/labels/evaluate.txt"))

{'bus': 108, 'residential_area': 108, 'car': 108, 'grocery_store': 108, 'train': 108, 'forest_path': 108, 'park': 108, 'library': 108, 'cafe/restaurant': 108, 'tram': 108, 'city_center': 108, 'office': 108, 'beach': 108, 'home': 108, 'metro_station': 108}
