# Instalando o ambiente virutal

In [None]:
#!pip install virtualenv

In [None]:
#import os
#os.environ['PATH'] += os.pathsep + os.path.expanduser('~/.local/bin')


In [None]:
#!~/.local/bin/virtualenv pavf

In [None]:
#!pavf/bin/pip install ipykernel
#!pavf/bin/python -m ipykernel install --user --name=pavf

In [None]:
#!pavf/bin/pip install pandas numpy tqdm torchaudio scikit-learn librosa ipython transformers


# Treinamento

In [None]:
import pandas as pd
import numpy as np

from pathlib import Path
from tqdm import tqdm

import torchaudio
from sklearn.model_selection import train_test_split

import os
import sys

In [None]:
#!pavf/bin/pip install git+https://github.com/huggingface/datasets.git git+https://github.com/huggingface/transformers.git jiwer torchaudio librosa wandb

In [None]:
# %%capture

# !pip install git+https://github.com/huggingface/datasets.git
# !pip install git+https://github.com/huggingface/transformers.git
# !pip install jiwer
# !pip install torchaudio
# !pip install librosa

# Monitor the training process
# !pip install wandb

In [None]:
%env LC_ALL=C.UTF-8
%env LANG=C.UTF-8
%env TRANSFORMERS_CACHE=./cache
%env HF_DATASETS_CACHE=./cache
%env CUDA_LAUNCH_BLOCKING=1

# Download and preprocess data

In [None]:
#!pavf/bin/pip uninstall numpy -y
#!pavf/bin/pip install numpy

In [None]:
#!UNZIP_DISABLE_ZIPBOMB_DETECTION=TRUE unzip -o ./IRMAS-TrainingData.zip

In [None]:
import os
import pandas as pd

# Path to the training dataset
dataset_path = './IRMAS/IRMAS-TrainingData'

# List to store file paths and labels
data = []

# Iterate over each subfolder (instrument class) in the dataset
for instrument_folder in os.listdir(dataset_path):
    instrument_path = os.path.join(dataset_path, instrument_folder)
    if not os.path.isdir(instrument_path):
        continue  # Skip if not a directory
    
    # Extract instrument name from folder name
    instrument_name = instrument_folder.split('(')[0]
    # Iterate over files in the instrument folder
    for file_name in os.listdir(instrument_path):
        if file_name.endswith('.wav'):
            file_path = os.path.join(instrument_path, file_name)
            data.append({'file_path': file_path, 'label': instrument_name})

# Create a DataFrame from the data list
df = pd.DataFrame(data)

# Display the DataFrame
print(df.head())


In [None]:
import os
import pandas as pd

# Paths to the dataset parts
dataset_paths = [
    './IRMAS/IRMAS-TestingData-Part1/Part1',
    './IRMAS/IRMAS-TestingData-Part2/IRTestingData-Part2',
    './IRMAS/IRMAS-TestingData-Part3/Part3'
]

# Initialize an empty list to store the data
data = []

# Loop through each dataset directory
for dataset_path in dataset_paths:
    # Loop through the files in the current dataset directory
    for filename in os.listdir(dataset_path):
        if filename.endswith('.txt'):
            txt_path = os.path.join(dataset_path, filename)
            wav_filename = filename.replace('.txt', '.wav')
            wav_path = os.path.join(dataset_path, wav_filename)

            # Read the content of the txt file
            with open(txt_path, 'r') as file:
                labels = file.read().split()

            # Join labels with a comma
            label_str = ','.join(labels)
            data.append({'file_path': wav_path, 'label': label_str})

# Create a DataFrame from the collected data
df_test = pd.DataFrame(data, columns=['file_path', 'label'])

In [None]:
#pip install --upgrade pip

In [None]:
#!pavf/bin/pip install numpy==1.23.5 librosa==0.9.2 torchaudio==0.12.1 soxr==0.3.4


In [None]:
import torchaudio
import librosa
import IPython.display as ipd
import numpy as np

idx = np.random.randint(0, len(df))
sample = df.iloc[idx]
path = sample["file_path"]
label = sample["label"]


print(f"ID Location: {idx}")
print(f"      Label: {label}")
print()

speech, sr = torchaudio.load(path)
speech = speech[0].numpy().squeeze()
speech = librosa.resample(np.asarray(speech), orig_sr=sr,target_sr=16000 )
ipd.Audio(data=np.asarray(speech), autoplay=True, rate=16000)

In [None]:
label_list = df.label.unique().tolist()
label_list

Modifica a coluna de caminho do arquivo de áudio a partir do diretório atual

In [None]:
#!pavf/bin/pip install matplotlib

In [None]:
df.label.hist()

In [None]:
#%%capture
#!pip install torchmetrics
#!pip install torch-audiomentations

In [None]:
import os
import random

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader

import torchaudio

# Não altere esses valores
torch.manual_seed(0)
random.seed(0)

In [None]:
TARGET_SAMPLE_RATE = 16000 # TODO

In [None]:
df[["label","file_path"]].iloc[0]


In [None]:
import torch

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")


In [None]:
input_column = "file_path"
output_column = "label"

label_list.sort()  # Let's sort it for determinism
num_labels = len(label_list)
print(f"A classification problem with {num_labels} classes: {label_list}")

In [None]:
#!pip install torchinfo

In [None]:
from transformers import AutoConfig, Wav2Vec2Processor

In [None]:
model_name_or_path = "ALM/wav2vec2-large-audioset"
pooling_mode = "mean"

In [None]:

config = AutoConfig.from_pretrained(
    model_name_or_path,
    num_labels=num_labels,
    problem_type="multi_label_classification",
    label2id={label: i for i, label in enumerate(label_list)},
    id2label={i: label for i, label in enumerate(label_list)},
    finetuning_task="wav2vec2_clf",
)
setattr(config, 'pooling_mode', pooling_mode)

In [None]:
processor = Wav2Vec2Processor.from_pretrained(model_name_or_path, config=config)
target_sampling_rate = processor.feature_extractor.sampling_rate
print(f"The target sampling rate: {target_sampling_rate}")

# DATASET

In [None]:
from sklearn.model_selection import train_test_split
# Salvar os dataframes para arquivos CSV
save_path = "./instruments/data/"

df.to_csv(f"{save_path}/train.csv", sep="\t", encoding="utf-8", index=False)
df_test.to_csv(f"{save_path}/test.csv", sep="\t", encoding="utf-8", index=False)

In [None]:
from datasets import load_dataset

data_files = {
    "train": f"{save_path}/train.csv", 
    "validation": f"{save_path}/test.csv",
}

dataset = load_dataset("csv", data_files=data_files, delimiter="\t")
train_dataset = dataset["train"]
eval_dataset = dataset["validation"]

In [None]:
def speech_file_to_array_fn(path):
    speech_array, sampling_rate = torchaudio.load(path)
    resampler = torchaudio.transforms.Resample(sampling_rate, target_sampling_rate)
    speech = resampler(speech_array)
    speech_mono = torch.mean(speech, dim=0).squeeze().numpy()
    return speech_mono

# Função para converter rótulo em tensor de múltiplos rótulos
def label_to_id(label, label_list):
    target_tensor = torch.zeros(len(label_list), dtype=torch.long)
    if isinstance(label, str):
        label = [label]
    for l in label:
        if l in label_list:
            target_tensor[label_list.index(l)] = 1
    return target_tensor

# Função de pré-processamento
def preprocess_function(examples):
    speech_list = [speech_file_to_array_fn(path) for path in examples[input_column]]
    target_list = [label_to_id(label.split(','), label_list) for label in examples[output_column]]
    
    result = processor(speech_list, sampling_rate=target_sampling_rate)
    result["labels"] = target_list
    return result

In [None]:
# Aplicar a função de pré-processamento nos datasets
train_dataset = train_dataset.map(
    preprocess_function,
    batch_size=1000,
    batched=True,
)
eval_dataset = eval_dataset.map(
    preprocess_function,
    batch_size=1000,
    batched=True,
)

idx = 1001
print(f"Training labels: {train_dataset[idx]['labels']} - {train_dataset[idx]['label']}")


In [None]:
print(f"Training labels: {eval_dataset[idx]['labels']} - {eval_dataset[idx]['label']}")

In [None]:
print(f"Training input_values: {eval_dataset[idx]['input_values']}")


In [None]:
print(f"Training input_values: {train_dataset[idx]['input_values']}")
print(f"Training attention_mask: {train_dataset[idx]['attention_mask']}")
print(f"Training labels: {train_dataset[idx]['labels']} - {train_dataset[idx]['label']}")

In [None]:
#!pavf/bin/pip uninstall -y datasets

#!pavf/bin/pip install datasets


In [None]:
#!pavf/bin/pip install torchinfo torchmetrics torch-audiomentations

# Model

In [None]:
from dataclasses import dataclass
from typing import Optional, Tuple
import torch
from transformers.file_utils import ModelOutput


@dataclass
class SpeechClassifierOutput(ModelOutput):
    loss: Optional[torch.FloatTensor] = None
    logits: torch.FloatTensor = None
    hidden_states: Optional[Tuple[torch.FloatTensor]] = None
    attentions: Optional[Tuple[torch.FloatTensor]] = None


In [None]:
import torch
import torch.nn as nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss

from transformers.models.wav2vec2.modeling_wav2vec2 import (
    Wav2Vec2PreTrainedModel,
    Wav2Vec2Model
)


class Wav2Vec2ClassificationHead(nn.Module):
    """Head for wav2vec classification task."""

    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.dropout = nn.Dropout(config.final_dropout)
        self.out_proj = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, features, **kwargs):
        x = features
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.out_proj(x)
        return x


class Wav2Vec2ForSpeechClassification(Wav2Vec2PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.pooling_mode = config.pooling_mode
        self.config = config

        self.wav2vec2 = Wav2Vec2Model(config)
        self.classifier = Wav2Vec2ClassificationHead(config)

        self.init_weights()

    def freeze_feature_extractor(self):
        self.wav2vec2.feature_extractor._freeze_parameters()

    def merged_strategy(
            self,
            hidden_states,
            mode="mean"
    ):
        if mode == "mean":
            outputs = torch.mean(hidden_states, dim=1)
        elif mode == "sum":
            outputs = torch.sum(hidden_states, dim=1)
        elif mode == "max":
            outputs = torch.max(hidden_states, dim=1)[0]
        else:
            raise Exception(
                "The pooling method hasn't been defined! Your pooling mode must be one of these ['mean', 'sum', 'max']")

        return outputs

    def forward(
            self,
            input_values,
            attention_mask=None,
            output_attentions=None,
            output_hidden_states=None,
            return_dict=None,
            labels=None,
    ):
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        outputs = self.wav2vec2(
            input_values,
            attention_mask=attention_mask,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        hidden_states = outputs[0]
        hidden_states = self.merged_strategy(hidden_states, mode=self.pooling_mode)
        logits = self.classifier(hidden_states)

        loss = None
        if labels is not None:
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels)
            elif self.config.problem_type == "single_label_classification":
                loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels)

        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return SpeechClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

In [None]:
from dataclasses import dataclass
from typing import Dict, List, Optional, Union
import torch

import transformers
from transformers import Wav2Vec2Processor


@dataclass
class DataCollatorCTCWithPadding:
    """
    Data collator that will dynamically pad the inputs received.
    Args:
        processor (:class:`~transformers.Wav2Vec2Processor`)
            The processor used for proccessing the data.
        padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:
            * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
              sequence if provided).
            * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
              maximum acceptable input length for the model if that argument is not provided.
            * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
              different lengths).
        max_length (:obj:`int`, `optional`):
            Maximum length of the ``input_values`` of the returned list and optionally padding length (see above).
        max_length_labels (:obj:`int`, `optional`):
            Maximum length of the ``labels`` returned list and optionally padding length (see above).
        pad_to_multiple_of (:obj:`int`, `optional`):
            If set will pad the sequence to a multiple of the provided value.
            This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
            7.5 (Volta).
    """

    processor: Wav2Vec2Processor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        input_features = [{"input_values": feature["input_values"]} for feature in features]
        label_features = [feature["labels"] for feature in features]

        d_type = torch.long if isinstance(label_features[0], int) else torch.float

        batch = self.processor.pad(
            input_features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )

        batch["labels"] = torch.tensor(label_features, dtype=d_type)

        return batch


In [None]:
data_collator = DataCollatorCTCWithPadding(processor=processor, padding=True)

In [None]:
is_regression = False

In [None]:
import numpy as np
from sklearn.metrics import f1_score, precision_score, recall_score

from transformers import EvalPrediction


def compute_metrics(p):
    """
    Computes accuracy, F1-score, precision, and recall for multi-class classification.
    
    Args:
      p (EvalPrediction): An object containing predictions and labels.
    
    Returns:
      dict: A dictionary containing accuracy, F1-score, precision, and recall.
    """

    # Convert predictions to numpy array for metric calculations
    preds = np.array(p.predictions)
    labels = np.array(p.label_ids)

    min_vals = preds.min(axis=1, keepdims=True)
    max_vals = preds.max(axis=1, keepdims=True)
    normalized_preds = (preds - min_vals) / (max_vals - min_vals + 1e-10)

    thresh = 0.5
    preds = (normalized_preds > thresh).astype(float) 


    # Calculate accuracy
    accuracy = (preds == labels).mean().item()

    # Calculate F1-score (macro-average by default)
    f1 = f1_score(labels, preds, average='macro',zero_division=0.0)

    # Calculate precision (macro-average by default)
    precision = precision_score(labels, preds, average='macro')

    # Calculate recall (macro-average by default)
    recall = recall_score(labels, preds, average='macro')

    return {
        "accuracy": accuracy,
        "f1": f1,
        "precision": precision,
        "recall": recall,
    }


In [None]:
model = Wav2Vec2ForSpeechClassification.from_pretrained(
    model_name_or_path,
    config=config,
)


In [None]:
model.freeze_feature_extractor()

In [None]:
#!pavf/bin/pip install torch torchvision torchaudio


In [None]:
#!pavf/bin/pip uninstall -y accelerate

In [None]:
#!pavf/bin/pip install accelerate -U

In [None]:
import torch
print(torch.version.cuda)


In [None]:
os.environ['WORLD_SIZE'] = '1'
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'

In [None]:
import os
from transformers import TrainerCallback, TrainingArguments, Trainer

training_args = TrainingArguments(
    output_dir="./all_dataset",
    per_device_train_batch_size=100,
    per_device_eval_batch_size=100,
    gradient_accumulation_steps=2,
    evaluation_strategy="epoch",
    save_strategy="epoch",  # Adicionando save_strategy para "epoch"
    num_train_epochs=10.0,
    fp16=True,
    save_steps=65,
    eval_steps=65,
    logging_steps=10,
    learning_rate=1e-4,
    save_total_limit=10,
    report_to="wandb",
    no_cuda=False,
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    greater_is_better=False
)

trainer = Trainer(
    model=model,
    data_collator=data_collator,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    tokenizer=processor.feature_extractor
)

trainer.train()