In [None]:
#File finder attribute to locate file outside current folders too
#using torchaudio.info in order to speed up the data-loading and training procedure
import torch, torchmetrics
from pyannote.database import FileFinder
from pyannote.audio.core.io import get_torchaudio_info
preprocessors = {'audio': FileFinder(), "torchaudio.info": get_torchaudio_info}

In [None]:
#Setting up database for training

from pyannote.database import get_protocol
ami = get_protocol('AMI.SpeakerDiarization.only_words',preprocessors=preprocessors)

In [None]:
#login into huggingface_hub with access token
from huggingface_hub import notebook_login
notebook_login()

In [None]:
from pyannote.audio import Model
pretrained = Model.from_pretrained("pyannote/segmentation", use_auth_token=True)

Let's visualize how it performs on our test file:

In [None]:
# here we use a test file provided by the protocol, but it could be any audio file
# e.g. test_file = "/path/to/test.wav".

test_file = next(ami.test())

In [None]:
from pyannote.audio import Inference
spk_probability = Inference(pretrained, step=2.5)(test_file)
spk_probability

A perfect output would look like that:

In [None]:
test_file["annotation"]

We are going to fine-tune this pretrained model on the AMI dataset:

In [None]:
from pyannote.audio.tasks import Segmentation
seg_task = Segmentation(ami, duration=5.0, max_num_speakers=4)

To check that fine-tuning was actually helpful, we need to evaluate the performance of the pretrained model, and compute the average local diarization error rate on a 5s window sliding over the whole test set. To do so, we need to create a helper function:

In [None]:
def test(model, protocol, subset="test"):
    from pyannote.audio.utils.signal import binarize
    from pyannote.audio.utils.metric import DiscreteDiarizationErrorRate
    from pyannote.audio.pipelines.utils import get_devices

    (device,) = get_devices(needs=1)
    metric = DiscreteDiarizationErrorRate()
    files = list(getattr(protocol, subset)())

    inference = Inference(model, device=device)

    for file in files:
        reference = file["annotation"]
        hypothesis = binarize(inference(file))
        uem = file["annotated"]
        _ = metric(reference, hypothesis, uem=uem)
        
    return abs(metric)

We can then evaluate the model and see its local DER:

In [None]:
der_pretrained = test(model=pretrained, protocol=ami, subset="test")
print(f"Local DER (pretrained) = {der_pretrained * 100:.1f}%")

Next, we prepare the model for fine-tuning, simply by overriding its `task` attribute...

In [None]:
from copy import deepcopy
finetuned = deepcopy(pretrained)
finetuned.task = seg_task

In [None]:
#If you need to change/update learning rate, weight decay or learning rate schedulers, 
#you can do so by the below mentioned function

def configure_optimizers(model):
        optimizer = torch.optim.Adam(model.parameters(), lr=5e-5,eps= 1e-08,maximize= False,weight_decay=0)
        lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer,T_max = 20)
        return {"optimizer": optimizer, "lr_scheduler": lr_scheduler}

In [None]:
#If you have changed the parameters, update the parameters for your model

from types import MethodType
finetuned.configure_optimizers = MethodType(configure_optimizers, finetuned)

... and we train it (for just one epoch)

In [None]:
import pytorch_lightning as pl
trainer = pl.Trainer(gpus=1, max_epochs=1)
trainer.fit(finetuned)

We now evaluate the performance of the fine-tuned model...

In [None]:
der_finetuned = test(model=finetuned, protocol=ami, subset="test")
print(f"Local DER (finetuned) = {der_finetuned * 100:.1f}%")

In [None]:
#If you want to gradually unfreeze the layers follow this

from pyannote.audio.core.callback import GraduallyUnfreeze
import pytorch_lightning as pl
trainer = pl.Trainer(gpus=1,max_epochs=500)
#Gradually unfreeze layers after every 20 epochs
trainer.callbacks.append(GraduallyUnfreeze(epochs_per_stage=20))
trainer.fit(finetuned)