Transcription of Audio

In [1]:
import time
import requests
import os
import csv
import wave
import sys
import numpy as np
import pandas as pd
import glob
import librosa
import matplotlib.pyplot as plt
from matplotlib.backend_bases import RendererBase
from scipy import signal
from scipy.io import wavfile
from PIL import Image
from scipy.fftpack import fft
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils import data
import chardet
from transformers import BertModel, BertTokenizer
from pydub import AudioSegment
from tqdm import tqdm
from natsort import natsort_keygen
tqdm.pandas()
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
from torch.hub import load_state_dict_from_url

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def read_file(filename, chunk_size=5242880):
    with open(filename, 'rb') as _file:
        while True:
            data = _file.read(chunk_size)
            if not data:
                break
            yield data

In [3]:
def get_url(token,filepath):
  '''
    Parameter:
      token: The API key
      data : The File Object to upload
    Return Value:
      url  : Url to uploaded file
  '''
  headers = {'authorization': token}
  response = requests.post('https://api.assemblyai.com/v2/upload',
                         headers=headers,
                         data=read_file(filepath))
  url = response.json()["upload_url"]
  print("Uploaded File and got temporary URL to file")
  return url

In [4]:
def get_transcribe_id(token,url):
  '''
    Parameter:
      token: The API key
      url  : Url to uploaded file
    Return Value:
      id   : The transcribe id of the file
  '''
  endpoint = "https://api.assemblyai.com/v2/transcript"
  json = {
    "audio_url": url,"speaker_labels": True,"auto_highlights": True
  }
  headers = {
    "authorization": token,
    "content-type": "application/json"
  }
  response = requests.post(endpoint, json=json, headers=headers)
  id = response.json()['id']
  print("Made request and file is currently queued")
  return id

In [5]:
def upload_file(token,filepath):
  '''
    Parameter: 
      filepath: The File Object to transcribe
    Return Value:
      token  : The API key
      transcribe_id: The ID of the file which is being transcribed
  '''
  
  # token = "398a8ab00a764f0092e93ff6a480a68f"
  file_url = get_url(token,filepath)
  transcribe_id = get_transcribe_id(token,file_url)
  return transcribe_id

In [6]:
def get_text(token,transcribe_id):
  '''
    Parameter: 
      token: The API key
      transcribe_id: The ID of the file which is being 
    Return Value:
      result : The response object
  '''  
  endpoint = f"https://api.assemblyai.com/v2/transcript/{transcribe_id}"
  headers = {
    "authorization": token
  }
  result = requests.get(endpoint, headers=headers).json()
  return result

In [7]:
def json_data_extraction(result,fname):

    audindex = pd.json_normalize(result['words'])
    audindex['fname'] = fname

    speakers = list(audindex.speaker)  # Change df to your dataframe name
    previous_speaker = 'A'
    l = len(speakers)
    i = 1
    speaker_seq_list = list()
    for index, new_speaker in enumerate(speakers):
        if index > 0:
            previous_speaker = speakers[index - 1]
        if new_speaker != previous_speaker:
            i += 1
        speaker_seq_list.append(i)
        # print(str(previous_speaker)+"  "+str(new_speaker)+"  "+str(i))
    audindex['seq'] = speaker_seq_list
    df = pd.DataFrame(audindex.groupby(['fname', 'speaker', 'seq']).agg(utter=('text', ' '.join), stime=('start', 'min'),etime=('end', 'max')))
    df.reset_index(inplace=True)
    df.sort_values(by=['stime'], inplace=True)

    df['stime'] = df.stime // 1000
    df['etime'] = df.etime // 1000
    df['seq'] = df.seq - 1
    df.rename(columns = {'speaker':'spcode'},inplace=True)
    # df.to_csv('tx_speaker_db.csv', mode='a', header=False, index=False)
    return df

In [8]:
def generate_csv(audio_filepath):
  fname = os.path.splitext(os.path.basename(audio_filepath))[0]
  token =  "a2b89c113dc54485abe2692fc8e9ab30"
  tid = upload_file(token,audio_filepath)
  result = {}
  while result.get("status") != 'processing':
    result = get_text(token, tid)
  while result.get("status") != 'completed':
    result = get_text(token, tid)
  df = json_data_extraction(result,fname)
  return df




Audio Slicing 

In [9]:
def crop_audio_csv(audio_filepath,output_path):
    # slicing/cropping audio files
    audio_df =(generate_csv(audio_filepath))
    fname = os.path.splitext(os.path.basename(audio_filepath))[0]
    for i in range(len(audio_df)):
        t1,t2 = audio_df.stime[i],audio_df.etime[i]
        t1 = t1 * 1000
        t2 = t2 * 1000
        newAudio = AudioSegment.from_wav(audio_filepath)
        newAudio = newAudio[t1:t2]
        newAudio.export(output_path+str(t1)+'_'+str(t2)+'_'+fname+".wav", format="wav")
    print('Cropped Successfully.')

    path_list = os.listdir(output_path)
    data = []
    for file in path_list:
        data.append(output_path+file)

    df = pd.DataFrame(data,columns=['file'])
    df.sort_values(
    by="file",
    key=natsort_keygen(),inplace=True)
    df.reset_index(drop=True,inplace=True)
    df= pd.concat([df,generate_csv(audio_filepath)['utter']], axis=1)

    return df

In [10]:
os.makedirs("content/crop_audio", exist_ok=True)

SER Model

Audio Features 

In [11]:
def audio2spectrogram(filepath):
    #fig = plt.figure(figsize=(5,5))
    samplerate, test_sound  = wavfile.read(filepath,mmap=True)
    _, spectrogram = log_specgram(test_sound, samplerate)
    #plt.imshow(spectrogram.T, aspect='auto', origin='lower')
    return spectrogram
    
def audio2wave(filepath):
    fig = plt.figure(figsize=(5,5))
    samplerate, test_sound  = wavfile.read(filepath,mmap=True)
    plt.plot(test_sound)

In [12]:
def log_specgram(audio, sample_rate, window_size=40,
                 step_size=20, eps=1e-10):
    nperseg = int(round(window_size * sample_rate / 1e3))
    noverlap = int(round(step_size * sample_rate / 1e3))
    freqs, _, spec = signal.spectrogram(audio,
                                    fs=sample_rate,
                                    window='hann',
                                    nperseg=nperseg,
                                    noverlap=noverlap,
                                    detrend=False)
    return freqs, np.log(spec.T.astype(np.float32) + eps)

In [13]:
N_CHANNELS = 3
def get_3d_spec(Sxx_in, moments=None):
    if moments is not None:
        (base_mean, base_std, delta_mean, delta_std,
             delta2_mean, delta2_std) = moments
    else:
        base_mean, delta_mean, delta2_mean = (0, 0, 0)
        base_std, delta_std, delta2_std = (1, 1, 1)
    h, w = Sxx_in.shape
    right1 = np.concatenate([Sxx_in[:, 0].reshape((h, -1)), Sxx_in], axis=1)[:, :-1]
    delta = (Sxx_in - right1)[:, 1:]
    delta_pad = delta[:, 0].reshape((h, -1))
    delta = np.concatenate([delta_pad, delta], axis=1)
    right2 = np.concatenate([delta[:, 0].reshape((h, -1)), delta], axis=1)[:, :-1]
    delta2 = (delta - right2)[:, 1:]
    delta2_pad = delta2[:, 0].reshape((h, -1))
    delta2 = np.concatenate([delta2_pad, delta2], axis=1)
    base = (Sxx_in - base_mean) / base_std
    delta = (delta - delta_mean) / delta_std
    delta2 = (delta2 - delta2_mean) / delta2_std
    stacked = [arr.reshape((h, w, 1)) for arr in (base, delta, delta2)]
    return np.concatenate(stacked, axis=2)

AlexNet Model

In [14]:
__all__ = ['AlexNet', 'alexnet']


model_urls = {
    'alexnet': 'https://download.pytorch.org/models/alexnet-owt-4df8aa71.pth',
}


class AlexNet(nn.Module):
    def __init__(self, num_classes=1000):
        super(AlexNet, self).__init__()
        self.num_classes=num_classes
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(64, 192, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(192, 384, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        self.avgpool = nn.AdaptiveAvgPool2d((12, 12))
        self.classifier = nn.Sequential(
            nn.Dropout(),
            nn.Linear(256 * 6 * 6, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096, num_classes),
        )

    def forward(self, x):
        x = self.features(x)
        print('features',x.shape)
        return x
def alexnet(pretrained=False, progress=True, **kwargs):
    model = AlexNet(**kwargs)
    if pretrained:
        state_dict = load_state_dict_from_url(model_urls['alexnet'],
                                              progress=progress)
        model.load_state_dict(state_dict)
    return model

Modified Alexnet 

In [15]:
class ModifiedAlexNet(nn.Module):
    def __init__(self, num_classes=5):
        super(ModifiedAlexNet, self).__init__()
        self.num_classes=num_classes
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(64, 192, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(192, 384, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        self.classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(256, num_classes),
        )
        self.Sigmoid = nn.Sigmoid

    def forward(self, x):
        x = self.features(x)
        x=torch.flatten(x, start_dim=2)
        x=torch.sum(x, dim=2)
        x=self.classifier(x)
        return x
   
def modifiedAlexNet(pretrained=False, progress=True, **kwargs):
    model_modified = ModifiedAlexNet(**kwargs)
    if pretrained:
        state_dict = load_state_dict_from_url(model_urls['alexnet'],
                                              progress=progress)
        model_modified.load_state_dict(state_dict)
    return model_modified

In [16]:
def weight_init(m):
    if isinstance(m, torch.nn.Linear):
        print('init of linear is done')
        torch.nn.init.xavier_uniform_(m.weight)
        if m.bias is not None: 
            torch.nn.init.xavier_uniform_(m.bias)

Combined Audio Text Model

In [17]:
outputs_text= []
def hook_text(module, input, output):
    outputs_text.clear()
    outputs_text.append(output)
    return None

In [18]:
outputs_audio= []
def hook_audio(module, input, output):
    outputs_audio.clear()
    outputs_audio.append(output)
    return None

In [19]:
class CombinedAudioTextModel(nn.Module):
    def __init__(self, num_classes=5):
        super(CombinedAudioTextModel, self).__init__()
        self.num_classes=num_classes
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

        self.text_model=torch.load('model_text_best.pt',map_location=torch.device('cpu'))
        self.audio_model=torch.load('model_audio_best.pt',map_location=torch.device('cpu'))

        self.text_model.bert.pooler.register_forward_hook(hook_text)
        self.audio_model.features.register_forward_hook(hook_audio)

        for param in self.text_model.parameters():
          param.requires_grad = False
        for param in self.audio_model.parameters():
          param.requires_grad = False

        self.dropout = nn.Dropout(.5)
        self.linear = nn.Linear(1024, num_classes)

        self.Sigmoid = nn.Sigmoid

    def forward(self,text,audio):
        self.text_model(text)
        self.audio_model(audio)
        audio_embed=outputs_audio[0]
        text_embed=outputs_text[0]
        audio_embed=torch.flatten(audio_embed, start_dim=2)#a1,a2,a3......al{a of dim c} 
        audio_embed=torch.sum(audio_embed, dim=2)
        concat_embded=torch.cat((text_embed,audio_embed),1)
        x=self.dropout(concat_embded)
        x=self.linear(x)
        return x

In [20]:
model=torch.load('Combined_model_audio_text.pt',map_location=torch.device('cpu'))
model.eval()
model.to('cpu')

CombinedAudioTextModel(
  (text_model): BertForSequenceClassification(
    (bert): BertModel(
      (embeddings): BertEmbeddings(
        (word_embeddings): Embedding(30522, 768, padding_idx=0)
        (position_embeddings): Embedding(512, 768)
        (token_type_embeddings): Embedding(2, 768)
        (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (encoder): BertEncoder(
        (layer): ModuleList(
          (0): BertLayer(
            (attention): BertAttention(
              (self): BertSelfAttention(
                (query): Linear(in_features=768, out_features=768, bias=True)
                (key): Linear(in_features=768, out_features=768, bias=True)
                (value): Linear(in_features=768, out_features=768, bias=True)
                (dropout): Dropout(p=0.1, inplace=False)
              )
              (output): BertSelfOutput(
                (dense): Linear(in_features=768, out_features

In [21]:
label_dict = {'ang': 0, 'hap': 1, 'neu': 3, 'sad': 2,'exc':4}
indextolabel = dict()
for key, value in label_dict.items():
  indextolabel[value] = key

In [22]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [23]:
def convert_audio(pathAudio):
  #print("File to be converted {}".format(pathAudio))
  y, sr = librosa.load(pathAudio, sr = 16000, mono=True)
  y = y * 32767 / max(0.01, np.max(np.abs(y)))
  new_pathAudio = pathAudio.split(".wav")[0] + "_converted.wav"
  #print("Converted file saved at {}".format(new_pathAudio))
  wavfile.write(new_pathAudio, sr, y.astype(np.int16))
  return new_pathAudio

Main Function For Predicting Emotion

In [24]:
def predict_emotion(file_path,text):
  prob = None
  preds = None
  try:
    if audio2spectrogram(file_path).shape[1] == 0: 
      return {}
  except:
     file_path = convert_audio(file_path)
     filename=file_path.split('/')[-1].strip('.wav')
     spector=audio2spectrogram(file_path)
     spector=get_3d_spec(spector)
     npimg = np.transpose(spector,(2,0,1))
     input_tensor=torch.tensor(npimg)
     sprectrome = input_tensor.unsqueeze(0) # create a mini-batch as expected by the model
     input_ids = torch.tensor(tokenizer.encode(text, add_special_tokens=True)).unsqueeze(0)
     with torch.no_grad():
        if (sprectrome.shape[2]>65):
          output = model(input_ids,sprectrome)
          m = nn.Softmax(dim=1)
          probs = m(output)
          preds = torch.argmax(probs)
          prob = torch.max(probs)
          #print("Predicted class is {} with {}% probability".format(indextolabel[preds.item()], round(prob.item() * 100,2)))
          #print()
          probs = probs.tolist()[0]
          emotion ={}
          for i in range(len(probs)):
            emotion[indextolabel[i]] = round(probs[i] * 100,2)         
          return emotion

Predict Emotion (CSV File)

In [25]:
def predict_emotion_csv(audio_filepath,output_path):
  df1 = crop_audio_csv(audio_filepath,output_path)
  df1['emotions'] = df1.progress_apply(lambda x:predict_emotion((x['file']),x['utter']),axis=1 )
  df1.to_csv("transcript.csv")
  return df1

In [26]:
predict_emotion_csv('combine.wav','output/')

Uploaded File and got temporary URL to file
Made request and file is currently queued
Cropped Successfully.
Uploaded File and got temporary URL to file
Made request and file is currently queued


  .format(nperseg, input_length))
100%|██████████| 60/60 [00:20<00:00,  2.98it/s]


ValueError: Input nan is not valid. Should be a string, a list/tuple of strings or a list/tuple of integers.