In [None]:
def import_lib():
  import whisper
  import datetime
  import subprocess
  import torch
  import pyannote.audio

  #speaker vefification - pretrained model -->  identifies speakers
  from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding
  embedding_model = PretrainedSpeakerEmbedding(
      "speechbrain/spkrec-ecapa-voxceleb",
      device=torch.device("cuda"))

  from pyannote.audio import Audio
  from pyannote.core import Segment

  import wave
  import contextlib

  #cluster
  from sklearn.cluster import AgglomerativeClustering
  import numpy as np

In [None]:
def diarize_call(call_rec):

  #import necessary libraries
  import whisper
  import datetime
  import subprocess
  import torch
  import pyannote.audio
  from pydub import AudioSegment

  #speaker vefification - pretrained model -->  identifies speakers
  from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding
  embedding_model = PretrainedSpeakerEmbedding(
      "speechbrain/spkrec-ecapa-voxceleb",
      device=torch.device("cuda"))

  from pyannote.audio import Audio
  from pyannote.core import Segment

  import wave
  import contextlib

  #cluster
  from sklearn.cluster import AgglomerativeClustering
  import numpy as np


  #initialise values
  num_speakers = 2

  language = 'English'

  model_size = 'medium'


  model_name = model_size
  if language == 'English' and model_size != 'large':
    model_name += '.en'

  #check if code isnt wav file
  if call_rec[-3:] != 'wav':
    subprocess.call(['ffmpeg', '-i', call_rec, 'audio.wav', '-y'])
    path = 'audio.wav'

  #loading the model
  model = whisper.load_model(model_size)

  result = model.transcribe(call_rec)
  segments = result["segments"]

  audio = AudioSegment.from_file(call_rec)

  # Get the raw audio data as an array of samples
  samples = audio.get_array_of_samples()
  frames = len(samples)

  # Get the sample rate (frame rate) of the audio
  rate = audio.frame_rate

  # Print the results
  print("Frames:", len(samples))
  print("Frame Rate:", rate)

  duration = frames/float(rate)

  audio = Audio()

  def segment_embedding(segment):
    start = segment["start"]

    # Whisper overshoots the end timestamp in the last segment
    end = min(duration, segment["end"])
    clip = Segment(start, end)
    waveform, sample_rate = audio.crop(path, clip)
    return embedding_model(waveform[None])

  embeddings = np.zeros(shape=(len(segments), 192))

  for i, segment in enumerate(segments):
    embeddings[i] = segment_embedding(segment)

  embeddings = np.nan_to_num(embeddings)

  clustering = AgglomerativeClustering(num_speakers).fit(embeddings)
  labels = clustering.labels_
  for i in range(len(segments)):
    segments[i]["speaker"] = 'SPEAKER ' + str(labels[i] + 1)

  return segments

In [None]:
import nltk
nltk.download('punkt')

In [None]:
def extract_linguistic(sentence):
  import os
  import pandas as pd
  from gensim.models import Word2Vec
  import nltk
  from nltk.tokenize import word_tokenize
  import string
  import numpy as np

  model_path = "C:/Projects/capstone-project/backend/word2vec/wordtovector.model"
  model = Word2Vec.load(model_path)
  tokens = word_tokenize(sentence)
  vectors = [model.wv[word] for word in tokens if word in model.wv.key_to_index]
  if vectors:
    return np.mean(vectors, axis=0)
  return np.zeros(model.vector_size)

In [None]:
def extract_acoustic(path,start,end):
  import librosa
  import os
  import shutil
  import numpy as np
  from sklearn.cluster import KMeans
  from pydub import AudioSegment
  start = int(start*1000)
  end = int(end*1000)
  audio, sr = librosa.load(path, sr=None)
  audio_file = audio[start:end]
  # extract the features
  neutral_mfccs = librosa.feature.mfcc(y=audio_file, sr=sr, n_mfcc=30)
  neutral_chroma = librosa.feature.chroma_stft(y=audio_file, sr=sr, n_chroma=20)

  # concatenate the features into a single feature vector
  features = np.concatenate((neutral_mfccs.mean(axis=1), neutral_mfccs.var(axis=1), neutral_chroma.mean(axis=1), neutral_chroma.var(axis=1)))

  return features

In [None]:
def feature_extraction(segment, path):
  import numpy as np

  ling_features = extract_linguistic(segment['text'])
  print(ling_features)
  print(len(ling_features))

  acoustic_features = extract_acoustic(path, segment['start'], segment['end'])
  print(acoustic_features)
  print(len(acoustic_features))

  #Combining the features
  ling_features = np.array(ling_features)
  acoustic_features = np.array(acoustic_features)

  combined_features = np.concatenate((ling_features,acoustic_features))
  print(len(combined_features))

  return combined_features



In [None]:
def load_model():
  from tensorflow.keras.models import load_model
  import tensorflow as tf

  comms_round = 10
  model_path = "/content/drive/MyDrive/capstone/global_model/globalmodel.h5"
  loss='categorical_crossentropy'
  metrics = ['accuracy',tf.keras.metrics.Recall(),tf.keras.metrics.Precision() ]
  optimizer = tf.keras.optimizers.legacy.RMSprop(learning_rate=0.00001, decay=1e-6)

  model = load_model(model_path)

  model.compile(loss=loss,
                optimizer=optimizer,
                metrics=metrics)
  return model

In [None]:
def predict_emotion(features,model):
  import numpy as np
  class_labels = ['Neutral','Angry','Happy','Confused']
  features = np.expand_dims(features, axis=0)
  # Predict the output for the single input
  logits = model.predict(features)

  # Assuming the model has softmax activation for multiclass classification, you can convert logits to probabilities
  probabilities = tf.nn.softmax(logits)

  # Find the index of the class with the highest probability
  predicted_class_index = np.argmax(probabilities)

  # Get the corresponding class label
  predicted_class_label = class_labels[predicted_class_index]

  # Return the predicted probabilities
  return predicted_class_label


In [None]:
def testing():

  import os
  from google.colab import drive
  from google.colab import files

  output_directory = "/content/drive/MyDrive/Capstone_Dataset/testing"
  os.makedirs(output_directory, exist_ok=True)

  '''
  #inputing single audio file
  audio = input_audio()

  '''
  #to input multiple audio files
  audio=[]
  opt = 'y'
  while opt == 'y':
    rec = files.upload()
    uploaded_file_name = list(rec.keys())[0]
    content = rec[uploaded_file_name]

    # Specify the output file path
    output_filename = os.path.join(output_directory, uploaded_file_name)

    # Save the content to the specified file path
    with open(output_filename, 'wb') as f:
      f.write(content)

    audio.append(output_filename)
    opt = input("continue?")


  for call_rec in audio:
    # diarize call_rec
    call_data = diarize_call(call_rec)

    print(call_data)

    i=0

    for segment in call_data:
      #extract features
      features = feature_extraction(segment, call_rec)

      print(features)
      print(len(features))

      model = load_model()

      emotion = predict_emotion(features,model)
      segment['emotion'] = emotion

  for i in segments:
    print(i['text'],"spoken by", i['speaker'],"is most likey", i['emotion'] )