In [7]:
from transformers import pipeline
import numpy as np
import pandas as pd
from datasets import Dataset, DatasetDict, load_dataset, Audio

In [8]:
df = pd.read_csv("musicnet_metadata_periods(Completed).csv", encoding="ISO-8859-1")

#all composers with more than 10 recordings in the dataset(from excel analaysis)
composer_list=['Beethoven', 'Bach', 'Schubert', 'Handel', 'Brahms', 'Schumann', 'Mozart', 'Dvorak', 'Vivaldi']

mydict={}
for composer in composer_list:
    #print(df.loc[df['composer'] == composer])
    mydf = df.loc[df['composer'] == composer]
    list_of_path=list(mydf['path'])
    mydict[composer]=list_of_path

In [9]:
from transformers import AutoFeatureExtractor

model_id = "ntu-spml/distilhubert"
feature_extractor = AutoFeatureExtractor.from_pretrained(
    model_id, do_normalize=True, return_attention_mask=True
)

In [10]:
sampling_rate = feature_extractor.sampling_rate
# Creating dataset with labels
data = {
    "audio": [],
    "label": []
}

for i in range(len(composer_list)):
    composer = composer_list[i]
    for audio_file in mydict[composer]:
        data["audio"].append(audio_file)  # Adding audio files
        data["label"].append(i)  # Assigning composer id as label

# Creating Dataset
dataset = Dataset.from_dict(data).cast_column("audio", Audio(sampling_rate=sampling_rate))
dataset = DatasetDict({"train": dataset})
dataset.save_to_disk("tempDataset") #since we are creating data from in-memory data, we save the dataset to disk to avoid filling up the RAM

# Check the first entry
print(dataset['train'])

Saving the dataset (0/54 shards):   0%|          | 0/443 [00:00<?, ? examples/s]

Dataset({
    features: ['audio', 'label'],
    num_rows: 443
})


In [11]:
label2id, id2label = dict(), dict()
for i, label in enumerate(composer_list):
    label2id[label] = str(i)
    id2label[str(i)] = label

print(id2label, '\n\n', label2id)
print(id2label['0'])
print(label2id['Beethoven'])

{'0': 'Beethoven', '1': 'Bach', '2': 'Schubert', '3': 'Handel', '4': 'Brahms', '5': 'Schumann', '6': 'Mozart', '7': 'Dvorak', '8': 'Vivaldi'} 

 {'Beethoven': '0', 'Bach': '1', 'Schubert': '2', 'Handel': '3', 'Brahms': '4', 'Schumann': '5', 'Mozart': '6', 'Dvorak': '7', 'Vivaldi': '8'}
Beethoven
0


In [14]:
import math
max_duration = 3 

def subdivide_function(entry):
    audio_array = entry["audio"][0]['array']
    audio_label = entry['label'][0]
    print(entry)
    #print(audio_label)
    sampling_rate = feature_extractor.sampling_rate
    segment_length = int(sampling_rate * max_duration)
    
    #print(len(audio_array))
    num_segments = math.ceil(len(audio_array) / segment_length)
    chunks = []
    labels = []
    for i in range(num_segments):
        start_idx = i * segment_length
        end_idx = start_idx + segment_length
        if end_idx >= len(audio_array):
            segment = audio_array[start_idx:] #meaning last segment already
        else:
            segment = audio_array[start_idx:end_idx] #meaning still in the middle
        #print(segment)
        labels.append(audio_label)
        chunks.append(segment)

    return {"labels": labels,
            "chunks": chunks}
    
dataset = DatasetDict.load_from_disk("tempDataset")
chunked_dataset = dataset.map(subdivide_function, batch_size = 1, batched=True, remove_columns=['audio', 'label'])

Loading dataset from disk:   0%|          | 0/54 [00:00<?, ?it/s]

Map:   0%|          | 0/443 [00:00<?, ? examples/s]

{'audio': [{'path': 'Romantic.008.wav', 'array': array([-4.46251619e-10,  6.09580997e-10, -5.98676220e-10, ...,
        9.07644571e-05,  1.85598547e-04,  0.00000000e+00]), 'sampling_rate': 16000}], 'label': [0]}
{'audio': [{'path': 'Romantic.009.wav', 'array': array([ 7.78915266e-11,  2.60398439e-11,  4.99640433e-12, ...,
       -2.01829571e-05, -2.16683147e-05,  0.00000000e+00]), 'sampling_rate': 16000}], 'label': [0]}
{'audio': [{'path': 'Romantic.010.wav', 'array': array([ 6.77716494e-11,  5.84781390e-11,  1.03103415e-10, ...,
       -1.60852243e-04, -1.64244295e-04, -2.00809722e-04]), 'sampling_rate': 16000}], 'label': [0]}
{'audio': [{'path': 'Romantic.011.wav', 'array': array([0.00000000e+00, 0.00000000e+00, 0.00000000e+00, ...,
       4.50815736e-14, 5.25562734e-14, 0.00000000e+00]), 'sampling_rate': 16000}], 'label': [0]}
{'audio': [{'path': 'Romantic.052.wav', 'array': array([0., 0., 0., ..., 0., 0., 0.]), 'sampling_rate': 16000}], 'label': [0]}
{'audio': [{'path': 'Romantic.0

In [15]:
chunked_dataset

DatasetDict({
    train: Dataset({
        features: ['labels', 'chunks'],
        num_rows: 50768
    })
})

In [16]:
chunked_dataset = chunked_dataset["train"].train_test_split(seed=42, shuffle=True, test_size=0.1)

In [17]:
chunked_dataset

DatasetDict({
    train: Dataset({
        features: ['labels', 'chunks'],
        num_rows: 45691
    })
    test: Dataset({
        features: ['labels', 'chunks'],
        num_rows: 5077
    })
})

In [18]:
def preprocess_function(examples):
    audio_arrays = examples['chunks']
    inputs = feature_extractor(
        audio_arrays,
        sampling_rate=feature_extractor.sampling_rate,
        max_length=int(feature_extractor.sampling_rate * max_duration),
        truncation=True,
        return_attention_mask=True,
    )
    return inputs

dataset_encoded = chunked_dataset.map(
    preprocess_function,
    remove_columns=["chunks"],
    batched=True,
    batch_size=100,
    num_proc=1,
)
dataset_encoded = dataset_encoded.rename_column("labels", "label")
dataset_encoded

Map:   0%|          | 0/45691 [00:00<?, ? examples/s]

Map:   0%|          | 0/5077 [00:00<?, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['label', 'input_values', 'attention_mask'],
        num_rows: 45691
    })
    test: Dataset({
        features: ['label', 'input_values', 'attention_mask'],
        num_rows: 5077
    })
})

In [19]:
from transformers import AutoModelForAudioClassification

num_labels = len(id2label)

model = AutoModelForAudioClassification.from_pretrained(
    model_id,
    num_labels=num_labels,
    label2id=label2id,
    id2label=id2label,
)

Some weights of HubertForSequenceClassification were not initialized from the model checkpoint at ntu-spml/distilhubert and are newly initialized: ['classifier.bias', 'classifier.weight', 'projector.bias', 'projector.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [20]:
from transformers import TrainingArguments

model_name = model_id.split("/")[-1]
batch_size = 4
gradient_accumulation_steps = 1
num_train_epochs = 10

training_args = TrainingArguments(
    f"{model_name}-finetuned-dataset",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=5e-5,
    per_device_train_batch_size=batch_size,
    gradient_accumulation_steps=gradient_accumulation_steps,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=num_train_epochs,
    warmup_ratio=0.1,
    logging_steps=5,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    fp16=True,
    push_to_hub=False,
)



In [21]:
import evaluate
import numpy as np

metric = evaluate.load("accuracy")


def compute_metrics(eval_pred):
    """Computes accuracy on a batch of predictions"""
    predictions = np.argmax(eval_pred.predictions, axis=1)
    return metric.compute(predictions=predictions, references=eval_pred.label_ids)

In [22]:
from transformers import Trainer

trainer = Trainer(
    model,
    training_args,
    train_dataset=dataset_encoded["train"].with_format("torch"),
    eval_dataset=dataset_encoded["test"].with_format("torch"),
    tokenizer=feature_extractor,
    compute_metrics=compute_metrics,
)

trainer.train()
trainer.save_model("ComposerClassifyModel")

  trainer = Trainer(


Epoch,Training Loss,Validation Loss,Accuracy
1,0.9728,0.882707,0.712429
2,1.2452,0.90145,0.775458
3,0.4074,0.683199,0.836912
4,0.5238,0.628899,0.892259
5,0.0013,0.626021,0.896396
6,0.4768,0.571036,0.918653
7,0.0006,0.595652,0.913926
8,0.0,0.641279,0.922395
9,0.0,0.604442,0.931062
10,0.0007,0.564171,0.936971
