# Data
For this project I will be using the dataset collected and annotated for Dr. Helen Gent's doctoral dissertation at the University of Illinois, which I was involved in preparing.

The dataset comes from Jarvis Johnson's SadBoyz podcast and has been segmented to individual utterances and each segment has been labeled either "ironic" or "non-ironic"

In [None]:
!git clone https://github.com/helengent/Irony-Recognition.git # clone Dr Gent's repo for the data

# A Transformer-Encoder Classifier

In [None]:
!pip install torch
!pip install librosa

import os, re
import math, random
import librosa
import torch

class PrunedCorpus:

  def __init__(self, path):
    self.len = 0
    self.path = path
    self.data = {} # key is episode num, val is list of (utterance file, utterance num, speaker, is_sarcasm) tuple
    self.load_data()

  def load_data(self):
    for filename in os.listdir(self.path):
      if filename == '.DS_Store':
        continue
      self.len += 1
      filename_split = filename.split('_')
      episode_label = filename_split[0]
      if episode_label not in self.data:
        self.data[episode_label] = []
      utterance_num = re.findall(r'\d+', filename_split[1])[0]
      speaker = filename_split[1][0]
      is_sarcasm = "-I.wav" in filename
      #print(filename, utterance_num, speaker, is_sarcasm)
      self.data[episode_label].append((filename, utterance_num, speaker, is_sarcasm))
    # sort the utterances once all files have been read
    for episode, utterances in self.data.items():
      episode_sorted = sorted(utterances, key=lambda x: int(x[1]))
      self.data[episode] = episode_sorted

  def get_episodes(self):
    return list(self.data.keys())

  def get_utterances(self, episode):
    return self.data[episode]

  def utterance_to_tensor(self, utterance:tuple):
    audio, sr = librosa.load(self.path + "/" + utterance[0], sr=16000)
    mel_spectrogram = librosa.feature.melspectrogram(y=audio).T # transpose so that the rows are time frames
    mel_spectrogram = librosa.power_to_db(mel_spectrogram)
    normalized_mel_spectogram = librosa.util.normalize(mel_spectrogram) # normalize the audio to [-1, 1]
    return torch.tensor(normalized_mel_spectogram).to('cuda')

  def __iter__(self):
    for episode in self.get_episodes():
      for utterance in self.get_utterances(episode):
        yield self.utterance_to_tensor(utterance)

  def bool_to_tensor(self, arg):
    if arg:
      return torch.tensor([1.], dtype=float)
    else:
      return torch.tensor([0.], dtype=float)

  def easy_trainset(self):
    episode = self.data["SBep13"]
    return {
        "tensors" :[self.utterance_to_tensor(utt) for utt in episode],
        "labels" : [self.bool_to_tensor(utt[3]) for utt in episode]
    }

  def trainset(self):
    # use all episodes except SBep13 SBep19 and only use d and c speakers
    utts = []
    for name, episode in self.data.items():
      if name == "SBep19" or name == "SBep13":
        continue
      for utterance in episode:
        if utterance[2] in ['c', 'd']:
          utts.append(utterance)
    # shuffle and rebalance ironic and non ironic utts
    random.shuffle(utts)
    ironic = [utt for utt in utts if utt[3]]
    non_ironic = [utt for utt in utts if not utt[3]]
    max_len = min(len(ironic), len(non_ironic))
    utts_balanced = ironic[0:max_len] + non_ironic[0:max_len]
    random.shuffle(utts_balanced)

    tensors = []
    labels = []
    for utt in utts_balanced:
      tensors.append(self.utterance_to_tensor(utterance))
      labels.append(utterance[3])
    return {
        "tensors": tensors,
        "labels": labels
    }

  def testset(self):
    episode = self.data["SBep13"]
    return {
        "tensors" :[self.utterance_to_tensor(utt) for utt in episode],
        "labels" : [self.bool_to_tensor(utt[3]) for utt in episode]
    }

  def devset(self):
    episode = self.data["SBep19"]
    return {
        "tensors" :[self.utterance_to_tensor(utt) for utt in episode],
        "labels" : [self.bool_to_tensor(utt[3]) for utt in episode]
    }

corpus = PrunedCorpus('Irony-Recognition/AudioData/GatedPruned3')

# look at distribution of speakers over episodes
for episode in corpus.get_episodes():
  speakers = set()
  for utterance in corpus.get_utterances(episode):
    speakers.add(utterance[2])
  print(episode, speakers)

class SarcasmClassifier(torch.nn.Module):

  def __init__(self, dim):
    super(SarcasmClassifier, self).__init__()
    self.dim = dim
    self.transformer_encoder_layer = torch.nn.TransformerEncoderLayer(d_model=dim, nhead=32)
    self.transformer_encoder = torch.nn.TransformerEncoder(self.transformer_encoder_layer, num_layers=12)
    self.attention_linear = torch.nn.Linear(dim, 1)
    self.mlp = torch.nn.Sequential(
            torch.nn.Linear(dim, 64),
            torch.nn.ReLU(),
            torch.nn.Linear(64, 1),
        )

  def forward(self, x):
    def positional_encoding(pos, d_model):
      pos_enc = torch.zeros(d_model).to('cuda')
      for i in range(0, d_model, 2):
        pos_enc[i] = math.sin(pos / (10000 ** (2 * i / d_model)))
        pos_enc[i + 1] = math.cos(pos / (10000 ** (2 * i / d_model)))
      return pos_enc

    positional_encodings = torch.zeros(len(x), self.dim).to('cuda')
    for pos in range(len(x)):
      positional_encodings[pos, :] = positional_encoding(pos, self.dim)
    encoded_output = self.transformer_encoder(positional_encodings)
    attention_scores = self.attention_linear(encoded_output)
    attention_weights = torch.nn.functional.softmax(attention_scores, dim=0)
    x_weighted = (attention_weights * encoded_output).sum(dim=0)
    return self.mlp(x_weighted)

dim = 128
model = SarcasmClassifier(dim).to('cuda')

# train
trainset = corpus.trainset()
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
TRAIN_EPOCHS = 10
corpus_size = len(trainset["tensors"])
for epoch in range(TRAIN_EPOCHS):
    print("Epoch: ", epoch+1)
    point_counter = 0
    assert len(trainset["tensors"]) == len(trainset["labels"])
    for i in range(len(trainset["tensors"])):
        optimizer.zero_grad()
        output = model(trainset["tensors"][i])
        target_tensor = torch.tensor([1.], dtype=torch.float).to('cuda') if trainset["labels"][i] else torch.tensor([0.], dtype=torch.float).to('cuda')
        #print(output, target_tensor)
        loss = criterion(output, target_tensor)
        loss.backward()
        optimizer.step()
        point_counter += 1
        if point_counter % 25 == 0:
          print(f"Loss: {loss}, {point_counter}/{corpus_size}")
    # evaluate against devset
    with torch.no_grad():
      evalset = corpus.devset()
      correct = 0
      for i in range(len(evalset["tensors"])):
        output = model(evalset["tensors"][i])
        pred = True if torch.sigmoid(output) >= 0.5 else False
        if pred == evalset["labels"][i]:
          correct += 1
        print(output, torch.sigmoid(output), evalset["labels"][i])
# evaluate against testset
with torch.no_grad():
  evalset = corpus.testset()
  correct = 0
  for i in range(len(evalset["tensors"])):
    output = model(evalset["tensors"][i])
    pred = True if torch.sigmoid(output) >= 0.5 else False
    if pred == evalset["labels"][i]:
      correct += 1
    print(output, torch.sigmoid(output), evalset["labels"][i])


# Fine-tuned Wav2Vec

Instead of learning relationships directly from spectograms from scratch, we can use a pre-trained model to try to make up for our small (~10hrs) dataset. First, we try fine-tuning Wav2Vec to produce vector embeddings for the audio clip. To convert the time series embeddings to a single vector to be classified, I will use a sinlge layer forward LSTM, as Dr. Gent did with the features for her classifier.

In [None]:
!pip install torch
!pip install transformers
import os, re, random
import torch
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence
from transformers import Wav2Vec2Model, Wav2Vec2Tokenizer
import soundfile as sf

class PrunedCorpus:

  def __init__(self, path):
    self.len = 0
    self.path = path
    self.data = {} # key is episode num, val is list of (utterance file, utterance num, speaker, is_sarcasm) tuple
    self.load_data()
    self.tokenizer = Wav2Vec2Tokenizer.from_pretrained("facebook/wav2vec2-base-960h")
    self.model_wav2vec = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base-960h")

  def load_data(self):
    for filename in os.listdir(self.path):
      if filename == '.DS_Store':
        continue
      self.len += 1
      filename_split = filename.split('_')
      episode_label = filename_split[0]
      if episode_label not in self.data:
        self.data[episode_label] = []
      utterance_num = re.findall(r'\d+', filename_split[1])[0]
      speaker = filename_split[1][0]
      is_sarcasm = "-I.wav" in filename
      #print(filename, utterance_num, speaker, is_sarcasm)
      self.data[episode_label].append((filename, utterance_num, speaker, is_sarcasm))
    # sort the utterances once all files have been read
    for episode, utterances in self.data.items():
      episode_sorted = sorted(utterances, key=lambda x: int(x[1]))
      self.data[episode] = episode_sorted

  def get_episodes(self):
    return list(self.data.keys())

  def get_utterances(self, episode):
    return self.data[episode]

  def utterance_to_tensor(self, utterance:tuple, with_grad=False):
    audio_input, _ = sf.read(self.path + "/" + utterance[0])
    input_values = self.tokenizer(audio_input, return_tensors="pt", padding="longest").input_values
    if with_grad:
      h = self.model_wav2vec(input_values).last_hidden_state
    else:
      with torch.no_grad():
        h = self.model_wav2vec(input_values).last_hidden_state
    return h.squeeze(0)

  def __iter__(self):
    for episode in self.get_episodes():
      for utterance in self.get_utterances(episode):
        yield self.utterance_to_tensor(utterance)

  def bool_to_tensor(self, arg):
    if arg:
      return torch.tensor([1.], dtype=torch.float)
    else:
      return torch.tensor([0.], dtype=torch.float)

  def easy_trainset(self):
    episode = self.data["SBep13"]
    utts = [utt for utt in episode]
    # now balance
    ironic_utts = [utt for utt in episode if utt[3]]
    non_ironic_utts_all = [utt for utt in episode if not utt[3]]
    non_ironic_balanced = []
    i = 0
    for x in non_ironic_utts_all:
      if i % 6 == 0:
        non_ironic_balanced.append(x)
      i += 1
    balanced_utts = ironic_utts + non_ironic_balanced
    random.shuffle(balanced_utts)
    print(balanced_utts)
    return balanced_utts

  def trainset(self):
    # use all episodes except SBep13 and SBep19 (devset and testset) and only use d and c speakers
    utts = []
    for name, episode in self.data.items():
      if name == "SBep19" or name == "SBep13":
        continue # dont include testset episode in trainset
      for utterance in episode:
        if utterance[2] in ['c', 'd']:
          utts.append(utterance)
    # shuffle and rebalance ironic and non ironic utts
    random.shuffle(utts)
    ironic = [utt for utt in utts if utt[3]]
    non_ironic = [utt for utt in utts if not utt[3]]
    max_len = min(len(ironic), len(non_ironic))
    utts_balanced = ironic[0:max_len] + non_ironic[0:max_len]
    random.shuffle(utts_balanced)
    return utts_balanced

  def testset(self):
    episode = self.data["SBep19"]
    return [utt for utt in episode]

corpus = PrunedCorpus('Irony-Recognition/AudioData/GatedPruned3')

# look at distribution of speakers over episodes
for episode in corpus.get_episodes():
  speakers = set()
  for utterance in corpus.get_utterances(episode):
    speakers.add(utterance[2])
  print(episode, speakers)

# look at distribution of tags over episodes
sarcastic_utts = []
non_sarcastic_utts = []

class SarcasmClassifier(torch.nn.Module):
  # with LSTM pooling
  def __init__(self, dim):
    super(SarcasmClassifier, self).__init__()
    self.lstm = torch.nn.LSTM(dim, 512, 1, batch_first=True, bidirectional=False)
    self.mlp = torch.nn.Sequential(
            torch.nn.Linear(512, 128),
            torch.nn.ReLU(),
            torch.nn.Linear(128, 1),
        )

  def forward(self, x):
    h, (hn, _) = self.lstm(x)
    hn = hn.squeeze(0)
    print(hn.shape)
    return self.mlp(hn)

dim = 768
model = SarcasmClassifier(dim)

# train
trainset = corpus.trainset()
# first look at balance of labels
trainset_size = len(trainset)
trainset_ironic_size = len([x for x in trainset if x[3]])
trainset_non_ironic_size = trainset_size - trainset_ironic_size
print("trainset data:", trainset_size, trainset_ironic_size, trainset_non_ironic_size)

criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
TRAIN_EPOCHS = 1
BATCH_SIZE = 16
point_counter = 0
for epoch in range(TRAIN_EPOCHS):
    print("Epoch: ", epoch+1)
    remaining_examples = trainset_size
    i = 0
    while remaining_examples >= BATCH_SIZE:
      optimizer.zero_grad()
      # get batch
      batch_examples = trainset[i:i+BATCH_SIZE]
      batch_labels = torch.stack([corpus.bool_to_tensor(x[3]) for x in batch_examples])
      batch_examples = [corpus.utterance_to_tensor(x) for x in batch_examples]
      lengths = [seq.size(0) for seq in batch_examples]
      padded_sequences = pad_sequence(batch_examples, batch_first=True)
      packed_sequences = pack_padded_sequence(padded_sequences, lengths, batch_first=True, enforce_sorted=False)
      i += BATCH_SIZE
      remaining_examples -= BATCH_SIZE
      # now calc loss and backprop
      y_hat = model(packed_sequences)
      print(y_hat, batch_labels)
      loss = criterion(y_hat, batch_labels)
      loss.backward()
      optimizer.step()
      point_counter += 1
      if point_counter % 5 == 0:
          print(f"Loss: {loss}, {point_counter*BATCH_SIZE}/{trainset_size}, {y_hat}, {batch_labels}")

    # evaluate
    print("evaluating")
    with torch.no_grad():
      evalset = corpus.testset()
      correct = 0
      total = 0
      for example in evalset:
        x = corpus.utterance_to_tensor(example, with_grad=False)
        y = corpus.bool_to_tensor(example[3])
        y_hat = torch.sigmoid(model(x))
        y_pred = torch.round(y_hat)
        print(x.shape, y, y_pred, y_hat)
        if (y_hat[0] >= 0.5 and y[0] >= 0.5) or (y_hat[0] < 0.5 and y[0] < 0.5):
          correct += 1
        total += 1
      print(f'{correct}/{total}')

Next, I try swapping the original Wav2Vec model with Wav2Vec-Conformer, which uses convolutions (helpful in vision/audio!!) instead of linear layers in the transformer blocks

In [None]:
!pip install torch
!pip install transformers
import os, re, random
import torch
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence
from transformers import Wav2Vec2ConformerModel, AutoProcessor
import soundfile as sf

class PrunedCorpus:

  def __init__(self, path):
    self.len = 0
    self.path = path
    self.data = {} # key is episode num, val is list of (utterance file, utterance num, speaker, is_sarcasm) tuple
    self.load_data()
    self.tokenizer = AutoProcessor.from_pretrained("facebook/wav2vec2-conformer-rope-large-960h-ft")
    self.model_wav2vec = Wav2Vec2ConformerModel.from_pretrained("facebook/wav2vec2-conformer-rope-large-960h-ft").to('cuda')

  def load_data(self):
    for filename in os.listdir(self.path):
      if filename == '.DS_Store':
        continue
      self.len += 1
      filename_split = filename.split('_')
      episode_label = filename_split[0]
      if episode_label not in self.data:
        self.data[episode_label] = []
      utterance_num = re.findall(r'\d+', filename_split[1])[0]
      speaker = filename_split[1][0]
      is_sarcasm = "-I.wav" in filename
      #print(filename, utterance_num, speaker, is_sarcasm)
      self.data[episode_label].append((filename, utterance_num, speaker, is_sarcasm))
    # sort the utterances once all files have been read
    for episode, utterances in self.data.items():
      episode_sorted = sorted(utterances, key=lambda x: int(x[1]))
      self.data[episode] = episode_sorted

  def get_episodes(self):
    return list(self.data.keys())

  def get_utterances(self, episode):
    return self.data[episode]

  def utterance_to_tensor(self, utterance:tuple, with_grad=False):
    audio_input, _ = sf.read(self.path + "/" + utterance[0])
    input_values = self.tokenizer(audio_input, sampling_rate=16000, return_tensors="pt", padding="longest").input_values
    input_values = input_values.to('cuda')
    if with_grad:
      h = self.model_wav2vec(input_values).last_hidden_state
    else:
      with torch.no_grad():
        h = self.model_wav2vec(input_values).last_hidden_state
    return h.squeeze(0)

  def __iter__(self):
    for episode in self.get_episodes():
      for utterance in self.get_utterances(episode):
        yield self.utterance_to_tensor(utterance)

  def bool_to_tensor(self, arg):
    if arg:
      return torch.tensor([1.], dtype=torch.float)
    else:
      return torch.tensor([0.], dtype=torch.float)

  def easy_trainset(self):
    episode = self.data["SBep13"]
    utts = [utt for utt in episode]
    # now balance
    ironic_utts = [utt for utt in episode if utt[3]]
    non_ironic_utts_all = [utt for utt in episode if not utt[3]]
    non_ironic_balanced = []
    i = 0
    for x in non_ironic_utts_all:
      if i % 6 == 0:
        non_ironic_balanced.append(x)
      i += 1
    balanced_utts = ironic_utts + non_ironic_balanced
    random.shuffle(balanced_utts)
    print(balanced_utts)
    return balanced_utts

  def trainset(self):
    # use all episodes except SBep13 and SBep19 (devset and testset) and only use d and c speakers
    utts = []
    for name, episode in self.data.items():
      if name == "SBep19" or name == "SBep13":
        continue # dont include testset episode in trainset
      for utterance in episode:
        if utterance[2] in ['c', 'd']:
          utts.append(utterance)
    # shuffle and rebalance ironic and non ironic utts
    random.shuffle(utts)
    ironic = [utt for utt in utts if utt[3]]
    non_ironic = [utt for utt in utts if not utt[3]]
    max_len = min(len(ironic), len(non_ironic))
    utts_balanced = ironic[0:max_len] + non_ironic[0:max_len]
    random.shuffle(utts_balanced)
    return utts_balanced

  def testset(self):
    episode = self.data["SBep13"]
    return [utt for utt in episode]

  def devset(self):
    episode = self.data["SBep19"]
    return [utt for utt in episode]

corpus = PrunedCorpus('Irony-Recognition/AudioData/GatedPruned3')

# look at distribution of speakers over episodes
for episode in corpus.get_episodes():
  speakers = set()
  for utterance in corpus.get_utterances(episode):
    speakers.add(utterance[2])
  print(episode, speakers)

# look at distribution of tags over episodes
sarcastic_utts = []
non_sarcastic_utts = []

class SarcasmClassifier(torch.nn.Module):
  # with LSTM pooling
  def __init__(self, dim):
    super(SarcasmClassifier, self).__init__()
    self.lstm = torch.nn.LSTM(dim, 512, 1, batch_first=True, bidirectional=False)
    self.mlp = torch.nn.Sequential(
            torch.nn.Linear(512, 128),
            torch.nn.ReLU(),
            torch.nn.Linear(128, 1),
        )

  def forward(self, x):
    h, (hn, _) = self.lstm(x)
    hn = hn.squeeze(0)
    #print(hn.shape)
    return self.mlp(hn)

dim = 1024
model = SarcasmClassifier(dim).to('cuda')

# train
trainset = corpus.trainset()
# first look at balance of labels
trainset_size = len(trainset)
trainset_ironic_size = len([x for x in trainset if x[3]])
trainset_non_ironic_size = trainset_size - trainset_ironic_size
print("trainset data:", trainset_size, trainset_ironic_size, trainset_non_ironic_size)

criterion = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters())
TRAIN_EPOCHS = 10
BATCH_SIZE = 16
for epoch in range(TRAIN_EPOCHS):
    print("Epoch: ", epoch+1)
    point_counter = 0
    remaining_examples = trainset_size
    i = 0
    while remaining_examples >= BATCH_SIZE:
      optimizer.zero_grad()
      # get batch
      batch_examples = trainset[i:i+BATCH_SIZE]
      batch_labels = torch.stack([corpus.bool_to_tensor(x[3]) for x in batch_examples]).to('cuda')
      batch_examples = [corpus.utterance_to_tensor(x) for x in batch_examples]
      lengths = [seq.size(0) for seq in batch_examples]
      padded_sequences = pad_sequence(batch_examples, batch_first=True)
      packed_sequences = pack_padded_sequence(padded_sequences, lengths, batch_first=True, enforce_sorted=False)
      i += BATCH_SIZE
      remaining_examples -= BATCH_SIZE
      # now calc loss and backprop
      y_hat = model(packed_sequences)
      #print(y_hat, batch_labels)
      loss = criterion(y_hat, batch_labels)
      loss.backward()
      optimizer.step()
      point_counter += 1
      if point_counter % 10 == 0:
          print(f"Loss: {loss}, {point_counter*BATCH_SIZE}/{trainset_size}, {y_hat}, {batch_labels}")

    # evaluate
    print("evaluating")
    with torch.no_grad():
      evalset = corpus.devset()
      correct = 0
      total = 0
      for example in evalset:
        x = corpus.utterance_to_tensor(example, with_grad=False)
        y = corpus.bool_to_tensor(example[3])
        y_hat = torch.sigmoid(model(x))
        y_pred = torch.round(y_hat)
        print(x.shape, y, y_pred, y_hat)
        if (y_hat[0] >= 0.5 and y[0] >= 0.5) or (y_hat[0] < 0.5 and y[0] < 0.5):
          correct += 1
        total += 1
      print(f'{correct}/{total}')

# Pre-training Wav2Vec on Emotion Detection then Fine-Tuning for Irony Detection

To augment our small dataset, I will see if improvement can be made by fine tuning first on a related dataset, and then further fine tuning on our SadBoyz dataset. The related dataset I have selected is the ravdess emotion classification dataset. The most attractive property of this dataset is that it features the same sentence said different times with different emotional prosody, which should help us detect irony using only prosodic cues.

In [None]:
!pip install torch
!pip install transformers
!pip install datasets

import os, re, random
import torch
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence
from transformers import Wav2Vec2Model, Wav2Vec2Tokenizer
import soundfile as sf
from datasets import load_dataset
from scipy.signal import decimate

class EmotionDataset:

  def __init__(self, wav2vec_tokenizer, wav2vec_model):
    dataset = load_dataset("narad/ravdess", split="train")
    dataset = dataset.train_test_split(test_size=0.1)
    self.train_dataset = dataset["train"]
    self.test_dataset = dataset["test"]
    print(self.train_dataset, self.test_dataset)
    self.tokenizer = wav2vec_tokenizer
    self.model_wav2vec = wav2vec_model

  def trainset(self, with_grad=False):
    self.train_dataset.shuffle()
    for example in self.train_dataset:
      audio_input, sample_rate = sf.read(example['audio']['path'])
      #print(sample_rate, type(audio_input), audio_input.shape, audio_input)
      # decimate by factor of 3 to downsample 48kHZ to 16kHZ
      try:
        resampled_data = decimate(audio_input, 3)
      except:
        print("!!!")
        yield None, None
      input_values = self.tokenizer(resampled_data, return_tensors="pt", padding="longest", sampling_rate=16000).input_values
      input_values = input_values.to('cuda')
      if with_grad:
        h = self.model_wav2vec(input_values).last_hidden_state
      else:
        with torch.no_grad():
          h = self.model_wav2vec(input_values).last_hidden_state
      tokens = h.squeeze(0)
      #print("example['labels']", example['labels'])
      label = torch.tensor(example['labels'], dtype=torch.long)
      yield tokens, label

  def testset(self):
    print(self.test_dataset)
    for example in self.test_dataset:
      audio_input, sample_rate = sf.read(example['audio']['path'])
      #print(sample_rate, type(audio_input), audio_input.shape, audio_input)
      # decimate by factor of 3 to downsample 48kHZ to 16kHZ
      try:
        resampled_data = decimate(audio_input, 3)
      except:
        print("!!!")
        yield None, None
      input_values = self.tokenizer(resampled_data, return_tensors="pt", padding="longest", sampling_rate=16000).input_values
      input_values = input_values.to('cuda')
      with torch.no_grad():
        h = self.model_wav2vec(input_values).last_hidden_state
      tokens = h.squeeze(0)
      label = torch.zeros(8, dtype=torch.float)
      #print(example)
      print("example['labels']", example['labels'])
      label[example['labels']] = 1.0
      yield tokens, label

class IronyCorpus:

  def __init__(self, path, tokenizer, model):
    self.len = 0
    self.path = path
    self.data = {} # key is episode num, val is list of (utterance file, utterance num, speaker, is_sarcasm) tuple
    self.load_data()
    self.tokenizer = tokenizer
    self.model_wav2vec = model

  def load_data(self):
    for filename in os.listdir(self.path):
      if filename == '.DS_Store':
        continue
      self.len += 1
      filename_split = filename.split('_')
      episode_label = filename_split[0]
      if episode_label not in self.data:
        self.data[episode_label] = []
      utterance_num = re.findall(r'\d+', filename_split[1])[0]
      speaker = filename_split[1][0]
      is_sarcasm = "-I.wav" in filename
      #print(filename, utterance_num, speaker, is_sarcasm)
      self.data[episode_label].append((filename, utterance_num, speaker, is_sarcasm))
    # sort the utterances once all files have been read
    for episode, utterances in self.data.items():
      episode_sorted = sorted(utterances, key=lambda x: int(x[1]))
      self.data[episode] = episode_sorted

  def get_episodes(self):
    return list(self.data.keys())

  def get_utterances(self, episode):
    return self.data[episode]

  def utterance_to_tensor(self, utterance:tuple, with_grad=False):
    audio_input, _ = sf.read(self.path + "/" + utterance[0])
    input_values = self.tokenizer(audio_input, return_tensors="pt", padding="longest", sampling_rate=16000).input_values
    input_values = input_values.to('cuda')
    if with_grad:
      h = self.model_wav2vec(input_values).last_hidden_state
    else:
      with torch.no_grad():
        h = self.model_wav2vec(input_values).last_hidden_state
    return h.squeeze(0)

  def __iter__(self):
    for episode in self.get_episodes():
      for utterance in self.get_utterances(episode):
        yield self.utterance_to_tensor(utterance)

  def bool_to_tensor(self, arg):
    if arg:
      return torch.tensor([1.], dtype=torch.float)
    else:
      return torch.tensor([0.], dtype=torch.float)

  def easy_trainset(self):
    episode = self.data["SBep13"]
    utts = [utt for utt in episode]
    # now balance
    ironic_utts = [utt for utt in episode if utt[3]]
    non_ironic_utts_all = [utt for utt in episode if not utt[3]]
    non_ironic_balanced = []
    i = 0
    for x in non_ironic_utts_all:
      if i % 6 == 0:
        non_ironic_balanced.append(x)
      i += 1
    balanced_utts = ironic_utts + non_ironic_balanced
    random.shuffle(balanced_utts)
    print(balanced_utts)
    return balanced_utts

  def trainset(self, shuffle=True):
    # use all episodes except SBep13 and SBep19 (devset and testset) and only use d and c speakers
    utts = []
    for name, episode in self.data.items():
      if name == "SBep19" or name == "SBep13":
        continue # dont include testset episode in trainset
      for utterance in episode:
        if utterance[2] in ['c', 'd']:
          utts.append(utterance)
    # shuffle and rebalance ironic and non ironic utts
    random.shuffle(utts)
    ironic = [utt for utt in utts if utt[3]]
    non_ironic = [utt for utt in utts if not utt[3]]
    max_len = min(len(ironic), len(non_ironic))
    utts_balanced = ironic[0:max_len] + non_ironic[0:max_len]
    random.shuffle(utts_balanced)
    return utts_balanced

  def devset(self):
    episode = self.data["SBep13"]
    return [utt for utt in episode]

  def testset(self):
    episode = self.data["SBep19"]
    return [utt for utt in episode]

class SarcasmClassifier(torch.nn.Module):
  # with LSTM pooling
  def __init__(self, dim):
    super(SarcasmClassifier, self).__init__()
    self.pretrain = True # use a different output transformation for emotion dataset (8 classes) and irony dataset (2) classes
    self.lstm = torch.nn.LSTM(dim, 512, 1, batch_first=True, bidirectional=False)
    self.out_transform_pretrain = torch.nn.Linear(512, 8)
    self.out_transform_finetune = torch.nn.Linear(512, 1)

  def forward(self, x):
    #try:
    #  print(x.shape, x)
    #except:
    #  print(x.data.shape, x.data)
    h, (hn, _) = self.lstm(x)
    hn = hn.squeeze(0)
    #print(hn.shape)
    if self.pretrain:
      return self.out_transform_pretrain(hn)
    else:
      return self.out_transform_finetune(hn)

dim = 768
model = SarcasmClassifier(dim).to('cuda')

wav2vec_model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base-960h").to('cuda') # fine tune this model on both datasets
wav2vec_tokenizer = Wav2Vec2Tokenizer.from_pretrained("facebook/wav2vec2-base-960h")

# first fine tune on emtion dataset
emotion_dataset = EmotionDataset(wav2vec_tokenizer, wav2vec_model)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
TRAIN_EPOCHS = 5
point_counter = 0
for epoch in range(TRAIN_EPOCHS):
  print("Emotion Epoch: ", epoch+1)
  trainset = emotion_dataset.trainset()
  for x, y in trainset:
    try:
      x = x.to('cuda')
      optimizer.zero_grad()
      y_hat = model(x)
      #print(x.shape, y, y_hat)
      loss = criterion(y_hat, y)
      loss.backward()
      optimizer.step()
      point_counter += 1
      if point_counter % 500 == 0:
          print(f"Loss: {loss}, {point_counter}, {y}, {y_hat}")
    except Exception as e:
      print("Err", e)

  # evaluate
  print("evaluating")
  with torch.no_grad():
    evalset = emotion_dataset.testset()
    correct = 0
    total = 0
    for x, y in evalset:
      try:
        #print(x.shape,y.shape)
        y_hat = model(x)
        y_pred = torch.argmax(y_hat, dim=0)
        print(y_hat, y_pred, y, torch.argmax(y, dim=0))
        if y_pred == torch.argmax(y, dim=0):
          correct += 1
        total += 1
      except Exception as e:
        print("Err:", e)
    print(f'{correct}/{total}')

# then fine tune on irony dataset
model.pretrain = False
ironony_dataset = IronyCorpus('Irony-Recognition/AudioData/GatedPruned3', wav2vec_tokenizer, wav2vec_model)
optimizer = torch.optim.Adam(model.parameters())
criterion = torch.nn.BCEWithLogitsLoss()
TRAIN_EPOCHS = 10
BATCH_SIZE = 16
for epoch in range(TRAIN_EPOCHS):
    print("Irony Epoch: ", epoch+1)
    trainset = ironony_dataset.trainset(shuffle=True)
    trainset_size = len(trainset)
    remaining_examples = trainset_size
    point_counter = 0
    i = 0
    while remaining_examples >= BATCH_SIZE:
      try:
        optimizer.zero_grad()
        # get batch
        batch_examples = trainset[i:i+BATCH_SIZE]
        batch_labels = torch.stack([ironony_dataset.bool_to_tensor(x[3]) for x in batch_examples])
        batch_examples = [ironony_dataset.utterance_to_tensor(x) for x in batch_examples]
        lengths = [seq.size(0) for seq in batch_examples]
        padded_sequences = pad_sequence(batch_examples, batch_first=True)
        packed_sequences = pack_padded_sequence(padded_sequences, lengths, batch_first=True, enforce_sorted=False)
        i += BATCH_SIZE
        remaining_examples -= BATCH_SIZE
        # now calc loss and backprop
        y_hat = model(packed_sequences)
        loss = criterion(y_hat, batch_labels)
        loss.backward()
        optimizer.step()
        point_counter += 1
        if point_counter % 32 == 0:
            print(f"Loss: {loss}, {point_counter*BATCH_SIZE}/{trainset_size}, {y_hat}, {batch_labels}")
      except Exception as e:
        print("Err", e)
    # evaluate
    print("evaluating")
    with torch.no_grad():
      evalset = ironony_dataset.testset()
      correct = 0
      total = 0
      for example in evalset:
        try:
          x = ironony_dataset.utterance_to_tensor(example, with_grad=False)
          y = ironony_dataset.bool_to_tensor(example[3])
          y_hat = torch.sigmoid(model(x))
          y_pred = torch.round(y_hat)
          print(x.shape, y, y_pred, y_hat)
          if (y_hat[0] >= 0.5 and y[0] >= 0.5) or (y_hat[0] < 0.5 and y[0] < 0.5):
            correct += 1
          total += 1
        except Exception as e:
          print("Err:", e)
      print(f'{correct}/{total}')

Again, I do the same thing but I instead use Wav2Vec-Conformer

In [None]:
!pip install torch
!pip install transformers
!pip install datasets

import os, re, random
import torch
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence
from transformers import Wav2Vec2ConformerModel, AutoProcessor
import soundfile as sf
from datasets import load_dataset
from scipy.signal import decimate

class EmotionDataset:

  def __init__(self, wav2vec_tokenizer, wav2vec_model):
    dataset = load_dataset("narad/ravdess", split="train")
    dataset = dataset.train_test_split(test_size=0.1)
    self.train_dataset = dataset["train"]
    self.test_dataset = dataset["test"]
    print(self.train_dataset, self.test_dataset)
    self.tokenizer = wav2vec_tokenizer
    self.model_wav2vec = wav2vec_model

  def trainset(self, with_grad=False):
    self.train_dataset.shuffle()
    for example in self.train_dataset:
      audio_input, sample_rate = sf.read(example['audio']['path'])
      #print(sample_rate, type(audio_input), audio_input.shape, audio_input)
      # decimate by factor of 3 to downsample 48kHZ to 16kHZ
      try:
        resampled_data = decimate(audio_input, 3)
      except:
        print("!!!")
        yield None, None
      input_values = self.tokenizer(resampled_data, return_tensors="pt", padding="longest", sampling_rate=16000).input_values
      input_values = input_values.to('cuda')
      if with_grad:
        h = self.model_wav2vec(input_values).last_hidden_state
      else:
        with torch.no_grad():
          h = self.model_wav2vec(input_values).last_hidden_state
      tokens = h.squeeze(0)
      #print("example['labels']", example['labels'])
      label = torch.tensor(example['labels'], dtype=torch.long)
      yield tokens, label

  def testset(self):
    print(self.test_dataset)
    for example in self.test_dataset:
      audio_input, sample_rate = sf.read(example['audio']['path'])
      #print(sample_rate, type(audio_input), audio_input.shape, audio_input)
      # decimate by factor of 3 to downsample 48kHZ to 16kHZ
      try:
        resampled_data = decimate(audio_input, 3)
      except:
        print("!!!")
        yield None, None
      input_values = self.tokenizer(resampled_data, return_tensors="pt", padding="longest", sampling_rate=16000).input_values
      input_values = input_values.to('cuda')
      with torch.no_grad():
        h = self.model_wav2vec(input_values).last_hidden_state
      tokens = h.squeeze(0)
      label = torch.zeros(8, dtype=torch.float).to('cuda')
      #print(example)
      print("example['labels']", example['labels'])
      label[example['labels']] = 1.0
      yield tokens, label

class IronyCorpus:

  def __init__(self, path, tokenizer, model):
    self.len = 0
    self.path = path
    self.data = {} # key is episode num, val is list of (utterance file, utterance num, speaker, is_sarcasm) tuple
    self.load_data()
    self.tokenizer = tokenizer
    self.model_wav2vec = model

  def load_data(self):
    for filename in os.listdir(self.path):
      if filename == '.DS_Store':
        continue
      self.len += 1
      filename_split = filename.split('_')
      episode_label = filename_split[0]
      if episode_label not in self.data:
        self.data[episode_label] = []
      utterance_num = re.findall(r'\d+', filename_split[1])[0]
      speaker = filename_split[1][0]
      is_sarcasm = "-I.wav" in filename
      #print(filename, utterance_num, speaker, is_sarcasm)
      self.data[episode_label].append((filename, utterance_num, speaker, is_sarcasm))
    # sort the utterances once all files have been read
    for episode, utterances in self.data.items():
      episode_sorted = sorted(utterances, key=lambda x: int(x[1]))
      self.data[episode] = episode_sorted

  def get_episodes(self):
    return list(self.data.keys())

  def get_utterances(self, episode):
    return self.data[episode]

  def utterance_to_tensor(self, utterance:tuple, with_grad=False):
    audio_input, _ = sf.read(self.path + "/" + utterance[0])
    input_values = self.tokenizer(audio_input, return_tensors="pt", padding="longest", sampling_rate=16000).input_values
    input_values = input_values.to('cuda')
    if with_grad:
      h = self.model_wav2vec(input_values).last_hidden_state
    else:
      with torch.no_grad():
        h = self.model_wav2vec(input_values).last_hidden_state
    return h.squeeze(0)

  def __iter__(self):
    for episode in self.get_episodes():
      for utterance in self.get_utterances(episode):
        yield self.utterance_to_tensor(utterance)

  def bool_to_tensor(self, arg):
    if arg:
      return torch.tensor([1.], dtype=torch.float).to('cuda')
    else:
      return torch.tensor([0.], dtype=torch.float).to('cuda')

  def easy_trainset(self):
    episode = self.data["SBep13"]
    utts = [utt for utt in episode]
    # now balance
    ironic_utts = [utt for utt in episode if utt[3]]
    non_ironic_utts_all = [utt for utt in episode if not utt[3]]
    non_ironic_balanced = []
    i = 0
    for x in non_ironic_utts_all:
      if i % 6 == 0:
        non_ironic_balanced.append(x)
      i += 1
    balanced_utts = ironic_utts + non_ironic_balanced
    random.shuffle(balanced_utts)
    print(balanced_utts)
    return balanced_utts

  def trainset(self, shuffle=True):
    # use all episodes except SBep13 and SBep19 (devset and testset) and only use d and c speakers
    utts = []
    for name, episode in self.data.items():
      if name == "SBep19" or name == "SBep13":
        continue # dont include testset episode in trainset
      for utterance in episode:
        if utterance[2] in ['c', 'd']:
          utts.append(utterance)
    # shuffle and rebalance ironic and non ironic utts
    random.shuffle(utts)
    ironic = [utt for utt in utts if utt[3]]
    non_ironic = [utt for utt in utts if not utt[3]]
    max_len = min(len(ironic), len(non_ironic))
    utts_balanced = ironic[0:max_len] + non_ironic[0:max_len]
    random.shuffle(utts_balanced)
    return utts_balanced

  def devset(self):
    episode = self.data["SBep13"]
    return [utt for utt in episode]

  def testset(self):
    episode = self.data["SBep19"]
    return [utt for utt in episode]

class SarcasmClassifier(torch.nn.Module):
  # with LSTM pooling
  def __init__(self, dim):
    super(SarcasmClassifier, self).__init__()
    self.pretrain = True # use a different output transformation for emotion dataset (8 classes) and irony dataset (2) classes
    self.lstm = torch.nn.LSTM(dim, 512, 1, batch_first=True, bidirectional=False)
    self.out_transform_pretrain = torch.nn.Linear(512, 8)
    self.out_transform_finetune = torch.nn.Linear(512, 1)

  def forward(self, x):
    #try:
    #  print(x.shape, x)
    #except:
    #  print(x.data.shape, x.data)
    h, (hn, _) = self.lstm(x)
    hn = hn.squeeze(0)
    #print(hn.shape)
    if self.pretrain:
      return self.out_transform_pretrain(hn)
    else:
      return self.out_transform_finetune(hn)

dim = 1024
model = SarcasmClassifier(dim).to('cuda')

wav2vec_model = Wav2Vec2ConformerModel.from_pretrained("facebook/wav2vec2-conformer-rope-large-960h-ft").to('cuda') # fine tune this model on both datasets
wav2vec_tokenizer = AutoProcessor.from_pretrained("facebook/wav2vec2-conformer-rope-large-960h-ft")

# first fine tune on emtion dataset
emotion_dataset = EmotionDataset(wav2vec_tokenizer, wav2vec_model)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
TRAIN_EPOCHS = 5
point_counter = 0
for epoch in range(TRAIN_EPOCHS):
  print("Emotion Epoch: ", epoch+1)
  trainset = emotion_dataset.trainset()
  for x, y in trainset:
    try:
      x = x.to('cuda')
      optimizer.zero_grad()
      y_hat = model(x)
      #print(x.shape, y, y_hat)
      loss = criterion(y_hat, y.to('cuda'))
      loss.backward()
      optimizer.step()
      point_counter += 1
      if point_counter % 500 == 0:
          print(f"Loss: {loss}, {point_counter}, {y}, {y_hat}")
    except Exception as e:
      print("Err", e)

  # evaluate
  print("evaluating")
  with torch.no_grad():
    evalset = emotion_dataset.testset()
    correct = 0
    total = 0
    for x, y in evalset:
      try:
        #print(x.shape,y.shape)
        y_hat = model(x)
        y_pred = torch.argmax(y_hat, dim=0)
        print(y_hat, y_pred, y, torch.argmax(y, dim=0))
        if y_pred == torch.argmax(y, dim=0):
          correct += 1
        total += 1
      except Exception as e:
        print("Err:", e)
    print(f'{correct}/{total}')

# then fine tune on irony dataset
model.pretrain = False
ironony_dataset = IronyCorpus('Irony-Recognition/AudioData/GatedPruned3', wav2vec_tokenizer, wav2vec_model)
optimizer = torch.optim.Adam(model.parameters())
criterion = torch.nn.BCEWithLogitsLoss()
TRAIN_EPOCHS = 10
BATCH_SIZE = 16
for epoch in range(TRAIN_EPOCHS):
    print("Irony Epoch: ", epoch+1)
    trainset = ironony_dataset.trainset(shuffle=True)
    trainset_size = len(trainset)
    remaining_examples = trainset_size
    point_counter = 0
    i = 0
    while remaining_examples >= BATCH_SIZE:
      try:
        optimizer.zero_grad()
        # get batch
        batch_examples = trainset[i:i+BATCH_SIZE]
        batch_labels = torch.stack([ironony_dataset.bool_to_tensor(x[3]) for x in batch_examples]).to('cuda')
        batch_examples = [ironony_dataset.utterance_to_tensor(x) for x in batch_examples]
        lengths = [seq.size(0) for seq in batch_examples]
        padded_sequences = pad_sequence(batch_examples, batch_first=True)
        packed_sequences = pack_padded_sequence(padded_sequences, lengths, batch_first=True, enforce_sorted=False)
        i += BATCH_SIZE
        remaining_examples -= BATCH_SIZE
        # now calc loss and backprop
        y_hat = model(packed_sequences)
        loss = criterion(y_hat, batch_labels)
        loss.backward()
        optimizer.step()
        point_counter += 1
        if point_counter % 32 == 0:
            print(f"Loss: {loss}, {point_counter*BATCH_SIZE}/{trainset_size}, {y_hat}, {batch_labels}")
      except Exception as e:
        print("Err", e)
    # evaluate
    print("evaluating")
    with torch.no_grad():
      evalset = ironony_dataset.testset()
      correct = 0
      total = 0
      for example in evalset:
        try:
          x = ironony_dataset.utterance_to_tensor(example, with_grad=False)
          y = ironony_dataset.bool_to_tensor(example[3])
          y_hat = torch.sigmoid(model(x))
          y_pred = torch.round(y_hat)
          print(x.shape, y, y_pred, y_hat)
          if (y_hat[0] >= 0.5 and y[0] >= 0.5) or (y_hat[0] < 0.5 and y[0] < 0.5):
            correct += 1
          total += 1
        except Exception as e:
          print("Err:", e)
      print(f'{correct}/{total}')