In [None]:
from transformers import AutoFeatureExtractor, ASTForAudioClassification, AutoModelForAudioClassification, TrainingArguments, Trainer
from datasets import load_dataset, Audio, load_metric
import torch
import numpy as np

In [None]:
SEED = 42

In [None]:
#https://huggingface.co/datasets/marsyas/gtzan
df = load_dataset("marsyas/gtzan", trust_remote_code=True)

In [None]:
df = df['train'].train_test_split(seed = SEED, shuffle = True, 
                                  test_size = .2)

In [None]:
# Obtaining human-readable label
id2label_function = df['train'].features['genre'].int2str

print("genre: ", id2label_function(df['train'][0]['genre']))

In [None]:
sampling_rate_check = None
all_same = True

# Iterating through each sample
for set_name in ['train', 'test']: # Iterating through both sets
    for sample in df[set_name]:
        sampling_rate = sample['audio']['sampling_rate']

        if sampling_rate_check is None:
            sampling_rate_check = sampling_rate
        else:
            if sampling_rate != sampling_rate_check:
                all_same = False
            break
        
# Printing result
if all_same:
    print(f"All samples have the same sampling rate: {sampling_rate_check} Hz")
else:
    print("The samples in the dataframe have different sampling rates.")

## Feature extraction

In [None]:
model_checkpoint = "MIT/ast-finetuned-audioset-10-10-0.4593"
#model_checkpoint = 'ntu-spml/distilhubert'

In [None]:
feature_extractor = AutoFeatureExtractor.from_pretrained(model_checkpoint)

In [None]:
sampling_rate = feature_extractor.sampling_rate
print(f'AST sampling rate: {sampling_rate} Hz')

# Resampling data
df = df.cast_column("audio", Audio(sampling_rate = 16000))

The number of data-points in the array of the audio-files is not exactly the same. So in the feature-extractor we set max_length to 30 seconds and truncate.

In [None]:
max_duration = 30.0 # 30 seconds

def preprocess_function(examples):
    # Extracting and saving arrays 
    audio_arrays = [x['array'] for x in examples['audio']]
    
    # Preprocessing audio inputs
    inputs = feature_extractor(audio_arrays,
                              sampling_rate = feature_extractor.sampling_rate,
                              return_tensors="pt", # output pytorch tensors
                              max_length = int(feature_extractor.sampling_rate * max_duration),
                              truncation = True)
    
    return inputs

In [None]:
df = df.map(preprocess_function,
                   remove_columns = ['audio', 'file'],
                   batched = True,
                   batch_size = 100,
                   num_proc = 1)

In [None]:
print(f"Size of spectogram: {len(df['train'][0]['input_values'][0])}, {len(df['train'][0]['input_values'])}")

In [None]:
# Renaming genre column
df = df.rename_column('genre', 'labels')

In [None]:
# Id to label
id2label = {str(i): id2label_function(i)
           for i in range(len(df['train'].features['labels'].names))}

# Label to id
label2id = {v: k for k, v in id2label.items()}

In [None]:
integer = 8 # Defining a random int
label = id2label[str(integer)] # Obtaining label 

print(f'\nId: {integer}')
print(f'\nLabel: {label}')

## Fine-tune

In [None]:
num_labels = len(id2label) # Obtaining the total number of labels

# Loading model
ast_model = AutoModelForAudioClassification.from_pretrained(model_checkpoint,
                                                         num_labels = num_labels,
                                                         label2id=label2id,
                                                         id2label=id2label,
                                                         ignore_mismatched_sizes=True)

# Visualizing model's architecture
print('\nAST Architecture')
print(ast_model) 

In [None]:
batch_size=2

training_args = TrainingArguments(
    output_dir = 'hubert_gtzan',
    evaluation_strategy = 'epoch',
    save_strategy = 'epoch',
    load_best_model_at_end = True,
    metric_for_best_model = 'accuracy',
    learning_rate = 5e-5,
    seed = SEED,
    per_device_train_batch_size = batch_size,
    per_device_eval_batch_size = batch_size,
    gradient_accumulation_steps = 1,
    num_train_epochs = 5,
    warmup_ratio = 0.1,
    #fp16 = True,
    save_total_limit = 2,
    report_to = 'none'
    )

In [None]:
# Loading `accuracy` metric from the evaluate library
metric = load_metric("accuracy")

def compute_metrics(eval_pred):
    """Computes accuracy on a batch of predictions"""
    predictions = np.argmax(eval_pred.predictions, axis=1)
    return metric.compute(predictions=predictions, references=eval_pred.label_ids)

In [None]:
trainer = Trainer(
    model=ast_model, 
    args = training_args,
    train_dataset = df['train'],
    eval_dataset = df['test'],
    tokenizer = feature_extractor,
    compute_metrics = compute_metrics)

trainer.train()