## Generating TorchScript

Source of code: https://discuss.pytorch.org/t/conversion-of-pytorch-pt-model-file-into-torchscript-ts-file/185671/4

In [85]:
import elephant_rumble_inference as eri
import torch
import time

In [76]:
elephant_rumble_classifier = eri.ElephantRumbleClassifier().to('cpu')

In [77]:
elephant_rumble_classifier.load_state_dict(torch.load("/Users/suhanashri/Downloads/Cornell ELP research/rumble_detector/elephant-rumble-inference/model_files/2024-07-03.pth"))

<All keys matched successfully>

In [78]:
elephant_rumble_classifier.eval()

ElephantRumbleClassifier(
  (act): LeakyReLU(negative_slope=0.01)
  (linear1): Linear(in_features=768, out_features=192, bias=True)
  (linear2): Linear(in_features=192, out_features=2, bias=True)
  (dropout): Dropout(p=0.2, inplace=False)
)

In [79]:
torchscript_model = torch.jit.script(elephant_rumble_classifier)

In [80]:
torch.jit.save(torchscript_model, 'torchscript_version.pt')

In [127]:
#checking if Torchscript file was created successfully
ts_model = torch.jit.load('torchscript_version.pt')

## Performance Evaluation of Model

Code taken from training_notebook.ipynb

In [None]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

aves_hubert_model = eri.AvesTorchaudioWrapper().to(DEVICE)
raven_file_helper = eri.RavenFileHelper("/Users/suhanashri/Downloads/Cornell ELP research/testing dataset")
audio_file_processor = eri.AudioFileProcessor(aves_hubert_model,ts_model,device=DEVICE)

In [None]:
def get_aves_embedding_cache_filename(audio_file,start,duration,preroll,postroll,sr):
   prefix = f"tmp/aves_embedding_cache/{sr}-{preroll}-{postroll}/{audio_file}"
   filename = f"{prefix}/{start}-{duration}.pt"
   os.makedirs(prefix,exist_ok=True)
   return filename

def get_aves_embeddings_from_file_with_buffers(audio_file,start,duration,preroll,postroll,sr=AVES_SR):
    cachefile = get_aves_embedding_cache_filename(audio_file,start,duration,preroll,postroll,sr)
    if os.path.exists(cachefile):
        return torch.load(cachefile,mmap=True)#.to('cpu').detach()
    if preroll > start:
        preroll = start
    y = raven_file_helper.get_downsampled_tensor(audio_file,start-preroll,duration+preroll+postroll,new_sr=sr)
    unit_vecs = get_normalized_aves_embeddings(y)#.to('cpu').detach()
    preroll_index = int(audio_file_processor.time_to_score_index(preroll))
    postroll_index = int(audio_file_processor.time_to_score_index(preroll+duration))
    # .clone().detatch() to avoid saving the entire original vector
    relevant_unit_vecs = unit_vecs[preroll_index:postroll_index].clone().detach()
    torch.save(relevant_unit_vecs,cachefile)
    return relevant_unit_vecs


def get_interesting_embeddings(audio_file,labels,max_labels=9999,sr=AVES_SR):
    interesting_embs = []
    labels = labels[:max_labels]
    with tqdm.tqdm(total=len(labels)) as pbar:
        for idx,row in enumerate(labels):
            unit_vecs = get_aves_embeddings_from_file_with_buffers(audio_file,row.bt,row.duration,500,200,sr)
            if mean_embedding_per_label := False:
                mean_tensor = einops.reduce(unit_vecs, 'h w -> w', 'mean')
                mean_tensor = mean_tensor / mean_tensor.norm(p=2)
                interesting_embs.append(mean_tensor.unsqueeze(0))
            elif trim_labels := False:
                # Idea - Trim training data bounding boxes that extend beyond the actual rumble
                interesting_embs.append(unit_vecs[3:-3])
            else:
                interesting_embs.append(unit_vecs)
            if idx >= max_labels:
                break
            #del(y,unit_vecs,mean_tensor)
            pbar.update(1)
        return torch.concat(interesting_embs)

In [None]:
import einops

def get_aves_embeddings(y):
    with torch.inference_mode(): # torch.no_grad():
      y32 = y.to(torch.float32).view(1,y.shape[0])
      aves_embeddings = aves_hubert_model.forward(y32.to(DEVICE)).to('cpu').detach()
      del(y32)
      reshaped_tensor = einops.rearrange(aves_embeddings, '1 n d -> n d')  # remove that batch dimension
      del(aves_embeddings)
      if torch.cuda.is_available():
        torch.cuda.empty_cache()
      return reshaped_tensor.to('cpu').detach()

In [None]:
def get_normalized_aves_embeddings(y):
    with torch.inference_mode(): # torch.no_grad():
      embs = get_aves_embeddings(y)
      norms = embs.norm(p=2, dim=1, keepdim=True)
      unit_vecs = embs / norms
      return unit_vecs.to('cpu').detach()

In [None]:
def get_embeddings_for_labels_in_file(audio_file):
    gc.collect()
    labels = raven_file_helper.get_all_labels_for_wav_file(audio_file)
    if len(labels) < 2:
        return (None,None)
    negative_labels = raven_file_helper.get_negative_labels(labels)
    if len(negative_labels) < 2:
        return (None,None)
    print(f"found {len(labels)} labels and {len(negative_labels)} negative labels in {audio_file}")
    interesting_embs = get_interesting_embeddings(audio_file,labels,9999,sr=AVES_SR)
    uninteresting_embs = get_interesting_embeddings(audio_file,negative_labels,9999,sr=AVES_SR)
    return interesting_embs, uninteresting_embs

In [None]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader

class MyDataset(Dataset):
  def __init__(self, inputs, labels):
    self.inputs = inputs
    self.labels = torch.tensor([1 if l == 'rumble' else 0 for l in labels]) # convert labels to binary
    self.len = self.labels.shape[0]

  def __getitem__(self, index):
    return self.inputs[index].type(torch.float32), self.labels[index]

  def __len__(self):
    return self.len
  
  def save(self,name):
     torch.save((self.inputs,self.labels), f'mydataset_{name}.pt')

  def load(self,name):
    self.inputs,self.labels = torch.load(f'mydataset_{name}.pt')
    self.len = self.labels.shape[0]
  
def make_dataset(interesting_files, balance_classes=True):
  iea=[]
  uea=[]
  for f in interesting_files:
      ie,ue = get_embeddings_for_labels_in_file(f)
      if ie is not None and ue is not None:
          iea.append(ie)
          uea.append(ue)
  interesting_embs= torch.cat(iea)
  uninteresting_embs = torch.cat(uea)
  del(iea)
  del(uea)
  gc.collect()
  if balance_classes:
      # should probably be true - it worked really well with it true
      print("initially, the shapes were",interesting_embs.shape,uninteresting_embs.shape)
      print("or a ratio of",uninteresting_embs.shape[0] / interesting_embs.shape[0])
      num_rows_to_keep = interesting_embs.shape[0]
      random_indices = torch.randperm(uninteresting_embs.shape[0])[:num_rows_to_keep]
      uninteresting_embs = uninteresting_embs[random_indices]
  print(f"from {len(interesting_files)} found {interesting_embs.shape} positive and {uninteresting_embs.shape} negatives")
  all_embs = torch.cat((interesting_embs, uninteresting_embs), dim=0)
  all_labels = np.concatenate((np.array(['rumble'] * interesting_embs.shape[0]), 
                                np.array(['not'] * uninteresting_embs.shape[0])))
  return MyDataset(all_embs, all_labels)

In [None]:
testing_files = ["twenty_four_hr_file.wav"]
testing_dataset = make_dataset(testing_files, balance_classes=True)

In [None]:
testing_loader = DataLoader(testing_dataset, batch_size=1000, shuffle=False, num_workers=0)

In [None]:
def get_predictions(model, dataloader, device):
    model.eval()
    all_predictions = []
    all_labels = []
    all_outputs = []
    
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            _, predictions = torch.max(outputs, 1)
            
            all_predictions.extend(predictions.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_outputs.extend(outputs.cpu().numpy())
    
    return np.array(all_predictions), np.array(all_labels), np.array(all_outputs)

In [None]:
ts_pred,ts_true,ts_scores = get_predictions(ts_model, testing_loader, 'cpu')

In [None]:
from sklearn.metrics import classification_report

target_names = ['not rumbles','rumbles']
print(classification_report(ts_true, ts_pred, target_names=target_names))