In [1]:
import os
import sys
import json
import logging
import gentle
import math
import numpy as np
import librosa
import shutil
import multiprocessing as mp
from collections import OrderedDict
from pathlib import Path
import scipy.io.wavfile as sciwav

Import of 'jit' requested from: 'numba.decorators', please update to use 'numba.core.decorators' or pin to Numba version 0.48.0. This alias will not be present in Numba version 0.50.0.
  from numba.decorators import jit as optional_jit


In [2]:
DISFLUENCIES = {'uh', 'um'}  # set of disfluencies
RESOURCES = gentle.Resources()
N_THREADS = mp.cpu_count()
logging.getLogger().setLevel("INFO")
EPS = 1e-8  # 0.00000001
OPTIMAL_DURATION = 0.115
win_length = 0.025
win_step = 0.01

In [3]:
def restore_model(model, out_path):
    chk_file = glob.glob(out_path + '/' + '*.pth')

    if chk_file:
        chk_file = str(chk_file[0])
        print('found modeL {}, restoring'.format(chk_file))
        model.load_state_dict(torch.load(chk_file, map_location=torch.device('cpu')))
    else:
        print('Model not found, using untrained model')
    return model


def restore_objects(out_path, default):
    data_file = glob.glob(out_path + '/' + '*.dat')
    if data_file:
        data_file = str(data_file[0])
        print('found data {}, restoring'.format(data_file))
        with open(data_file, 'rb') as input_:
            obj = pickle.load(input_)

        return obj
    else:
        return default


In [4]:

def _on_progress(p):
    for k, v in p.items():
        logging.debug("%s: %s" % (k, v))
        

def _get_key_val_pair(line):
    line_split = line[:-1].split()
    word = line_split[0]
    if word[-1] == ')':
        word = word.split('(')[0]

    word = word.lower()
    key = [word]
    val = []
    for phoneme in line_split[1:]:
        val.append(phoneme.lower())
        if phoneme[-1].isdigit():
            phoneme = phoneme[:-1]

        phoneme = phoneme.lower()
        key.append(phoneme)

    key = " ".join(key)
    val = tuple(val)
    return key, val

def _create_dict():
    phoneme_alignment_dict = dict()

    cmu_file = open('/home/arunav/Desktop/8th-semester/RE/lexical-stress-detection-master/alignment/cmudict-0.7b.txt', 'r')
    for line in cmu_file:
        key, val = _get_key_val_pair(line)
        phoneme_alignment_dict[key] = val

    return phoneme_alignment_dict

def align_audio(wav_path, transcript):
    with gentle.resampled(wav_path) as wavfile:
        print("starting alignment {}".format(wav_path))
        aligner = gentle.ForcedAligner(RESOURCES, transcript, nthreads=N_THREADS, disfluency=False,
                                       conservative=False, disfluencies=DISFLUENCIES)
        result = aligner.transcribe(wavfile, progress_cb=_on_progress, logging=logging)
        result_json = json.loads(result.to_json())

    return result_json

In [5]:
def get_mfcc(signal, samplerate):
    # in librosa the window length and step size (stride) are set by number of frames and not
    # duration. window_length is set by n_fft and step is set by hop_length
    frame_length = int(0.025 * samplerate)
    step_size = int(0.01 * samplerate)
    mfcc = librosa.feature.mfcc(signal, samplerate, n_mfcc=13, n_fft=frame_length, hop_length=step_size, center=False)
    mfcc_derivative = librosa.feature.delta(mfcc, order=1)
    mfcc_second_derivative = librosa.feature.delta(mfcc, order=2)

    assert mfcc.shape == (13, 10)
    assert mfcc_derivative.shape == (13, 10)
    assert mfcc_second_derivative.shape == (13, 10)

    # stack mfcc, derivative and second derivative horizontally
    mfcc_matrix = np.concatenate([mfcc, mfcc_derivative, mfcc_second_derivative], axis=1)
    assert mfcc_matrix.shape == (13, 30)

    return mfcc_matrix

In [6]:
def audio2frame(signal, frame_length, frame_step, winfunc=lambda x: np.ones((x,))):
    """
    Frame a signal into overlapping frames.
    :param signal: the audio signal to frame.
    :param frame_length: length of each frame measured in samples.
    :param frame_step: number of samples after the start of the previous frame that the next frame should begin.
    :param winfunc: the analysis window to apply to each frame. By default no window is applied.
    :returns: an array of frames. Size is NUMFRAMES by frame_len.
    """
    signal_length = len(signal)
    frame_length = int(round(frame_length))
    frame_step = int(round(frame_step))
    if signal_length <= frame_length:
        frames_num = 1
    else:
        frames_num = 1 + int(math.ceil((1.0 * signal_length - frame_length) / frame_step))

    pad_length = int((frames_num - 1) * frame_step + frame_length)

    zeros = np.zeros((pad_length - signal_length,))
    pad_signal = np.concatenate((signal, zeros))

    indices = np.tile(np.arange(0, frame_length), (frames_num, 1)) + np.tile(
        np.arange(0, frames_num * frame_step, frame_step), (frame_length, 1)).T
    indices = np.array(indices, dtype=np.int32)
    frames = pad_signal[indices]
    win = np.tile(winfunc(frame_length), (frames_num, 1))

    return frames * win


In [7]:
def get_p2pamplitude(signal):
    """
    f1 : Compute the peak-to-peak amplitude of the signal
    """
    return np.max(signal) - np.min(signal)


def get_mean_energy_over_syllable_nucleus(energy):
    """
    f2 : Mean energy over syllable nucleus
    """
    return np.mean(energy)


def get_max_energy_over_syllable_nucleus(energy):
    """
    f3 : Max energy over syllable nucleus
    """
    return np.max(energy)


def get_duration(signal, samplerate):
    """
    f4 & f5 : Duration of a sound wave. Send input (syllable/vowel) accordingly
    """
    len_frames = len(signal)
    return len_frames / samplerate


def get_max_pitch_over_syllable_nucleus(pitch_for_frames):
    """
    f6 : Maximum pitch over syllable nucleus
    """
    return np.max(pitch_for_frames)


def get_mean_pitch_over_syllable_nucleus(pitch_for_frames):
    """
    f7 : Mean pitch over syllable nucleus
    """
    return np.mean(pitch_for_frames)

def pitch_from_zcr(frame, fs):
    """
    The function detects the F0 of isolated phoneme by zero-crossing
    """
    M = np.round(0.016 * fs) - 1
    # print (frames.shape)
    R = np.correlate(frame, frame, mode='full')
    g = R[len(frame) - 1]
    R = R[len(frame):-1]
    # estimate m0 (as the first zero crossing of R)
    [a, ] = np.nonzero(np.diff(np.sign(R)))
    if len(a) == 0:
        m0 = len(R) - 1
    else:
        m0 = a[0]

    if M > len(R):
        M = len(R) - 1

    M = int(M)
    m0 = int(m0)
    Gamma = np.zeros(M)
    CSum = np.cumsum(frame ** 2)
    Gamma[m0:M] = R[m0:M] / (np.sqrt((g * CSum[M:m0:-1])) + EPS)
    ZCR = zcr(Gamma)
    if ZCR[1] > 0.15:
        HR = 0.0
        f0 = 0.0
    else:
        if len(Gamma) == 0:
            HR = 1.0
            blag = 0.0
            Gamma = np.zeros((M), dtype=np.float64)
        else:
            HR = np.max(Gamma)
            blag = np.argmax(Gamma)
        # Get fundamental frequency:
        f0 = fs / (blag + EPS)
        if f0 > 5000:
            f0 = 0.0
        if HR < 0.1:
            f0 = 0.0
    pitch = f0
    return HR, pitch


def zcr(frame):
    """
    Compute the number and rate of sign-changes of the signal during the duration of a particular frame
    """
    count = len(frame)
    countZC = np.sum(np.abs(np.diff(np.sign(frame)))) / 2
    return countZC, (np.float64(countZC) / np.float64(count - 1.0))


In [8]:
def get_energy_for_frame(frame):
    """
    Compute energy value of frame
    """
    return np.sum(frame ** 2) / np.float64(len(frame))


def get_energy_for_frames(frames):
    """
    Compute energy value for all frames
    """
    energy = []
    for i in range(len(frames)):
        energy.append(get_energy_for_frame(frames[i]))
    return energy


def get_pitch_values(frames, fs):
    """
    Compute pitch values for all frames
    """
    pitch_for_frames = []
    for i in range(len(frames)):
        pitch_for_frames.append(pitch_from_zcr(frames[i], fs))
    return pitch_for_frames



In [9]:
def get_non_mfcc(signal, samplerate):
    """
    Compute the non-MFCC features of the signal, these include:
    f1 : Compute the peak-to-peak amplitude of the signal
    f2 : Mean energy over syllable nucleus
    f3 : Max energy over syllable nucleus
    f4 : Duration of a vowel nucleus
    f5 : Maximum pitch over syllable nucleus
    f6 : Mean pitch over syllable nucleus
    """

    non_mfcc_features = np.zeros(6)
    frames = audio2frame(signal, win_length * samplerate, win_step * samplerate)
    energy = get_energy_for_frames(frames)
    pitch_vals = get_pitch_values(frames, samplerate)
    non_mfcc_features[0] = get_p2pamplitude(signal)
    non_mfcc_features[1] = get_mean_energy_over_syllable_nucleus(energy)
    non_mfcc_features[2] = get_max_energy_over_syllable_nucleus(energy)
    non_mfcc_features[3] = get_duration(signal, samplerate)
    non_mfcc_features[4] = get_max_pitch_over_syllable_nucleus(pitch_vals)
    non_mfcc_features[5] = get_mean_pitch_over_syllable_nucleus(pitch_vals)
    return non_mfcc_features

In [10]:
class LRU(OrderedDict):
    """Limit size, evicting the least recently looked-up key when full"""
    def __init__(self, maxsize=128, *args, **kwargs):
        self.maxsize = maxsize
        super().__init__(*args, **kwargs)

    def __getitem__(self, key):
        value = super().__getitem__(key)
        self.move_to_end(key)
        return value

    def __setitem__(self, key, value):
        super().__setitem__(key, value)
        if len(self) > self.maxsize:
            oldest = next(iter(self))
            del self[oldest]


In [11]:
class Phoneme:
    def __init__(self, path, id_, word, phoneme):
        self.path = path
        self.id_ = id_
        self.word = word
        self.phoneme = phoneme



In [12]:
class SampleExtraction:
    def __init__(self, wav_root, alignment_file, out_dir,label):
        self.wav_root = wav_root
        self.alignment_file = alignment_file
        self.out_dir = out_dir
        self.label = label
        self.pool = mp.Pool(mp.cpu_count())
        self.make_directories()

    def make_directories(self):
        os.makedirs(self.out_dir + '/0', exist_ok=True)
        os.makedirs(self.out_dir + '/1', exist_ok=True)
        print('Created directories for each label in path: {}'.format(self.out_dir))

    def get_phoneme_features(self, index, n, vowel_phonemes, features_cache):
        # if out of bound then
        if index < 0 or index >= n:
            return np.zeros(shape=(1, 13, 30), dtype=np.float32), np.zeros(6, dtype=np.float32)

        phoneme = vowel_phonemes[index]

        if phoneme not in features_cache:
            signal, samplerate = librosa.load(self.wav_root + '/' + phoneme.path, sr=None)
            optimal_signal_len = int(samplerate * OPTIMAL_DURATION)

            signal_len = len(signal)
            excess = signal_len - optimal_signal_len
            left_pad = abs(excess // 2)
            right_pad = abs(excess) - left_pad

            if signal_len > optimal_signal_len:
                signal_mfcc = signal[left_pad:-right_pad]

            elif signal_len < optimal_signal_len:
                signal_mfcc = np.concatenate([np.zeros(left_pad), signal, np.zeros(right_pad)], axis=0)
            else:
                signal_mfcc = signal

            # extract MFCC features, should be a matrix of shape (1, 13, 30)
            mfcc_features = get_mfcc(signal_mfcc, samplerate)
            # returned np array is of shape (13, 30), add a new channel axis
            mfcc_features = mfcc_features[np.newaxis, :, :]

            # extract non MFCC features, should be a vector of shape (6,)
            non_mfcc_features = get_non_mfcc(signal, samplerate)

            features_cache[phoneme] = (mfcc_features, non_mfcc_features)

        return features_cache[phoneme]

    def generate_samples(self, vowel_phonemes):
        n = len(vowel_phonemes)
        features_cache = LRU(size=5)
        for i in range(n):
            phoneme = vowel_phonemes[i]
            #label = phoneme.phoneme[-1]

            pre_mfcc, pre_non_mfcc = self.get_phoneme_features(i - 1, n, vowel_phonemes, features_cache)
            anchor_mfcc, anchor_non_mfcc = self.get_phoneme_features(i, n, vowel_phonemes, features_cache)
            suc_mfcc, suc_non_mfcc = self.get_phoneme_features(i + 1, n, vowel_phonemes, features_cache)

            mfcc_tensor = np.concatenate([pre_mfcc, anchor_mfcc, suc_mfcc], axis=0)
            non_mfcc_vector = np.concatenate([pre_non_mfcc, anchor_non_mfcc, suc_non_mfcc], axis=0)
            file_name = phoneme.id_ + '_' + phoneme.word + '_' + phoneme.phoneme
            np.save(self.out_dir + '/' + self.label + '/' + file_name + '_mfcc.npy', mfcc_tensor)
            np.save(self.out_dir + '/' + self.label + '/' + file_name + '_other.npy', non_mfcc_vector)

        print('finished writing {} samples for id: {}, word: {}'.
              format(n, vowel_phonemes[0].id_, vowel_phonemes[0].word))

    def extract_features(self):
        phoneme_alignment_file = open(self.alignment_file, 'r')
        current_word = None
        curr_vowels = []
        for line in phoneme_alignment_file:
            path, word, phoneme = line[:-1].split('\t')
            id_='1'
            phoneme = Phoneme(path, id_, word, phoneme)
            if not current_word:
                current_word = (id_, word)
                if phoneme.phoneme[-1].isnumeric():
                    curr_vowels.append(phoneme)

            elif current_word == (id_, word):
                if phoneme.phoneme[-1].isnumeric():
                    curr_vowels.append(phoneme)

            elif current_word != (id_, word):
                # new word encountered. create training samples from the old list
                self.pool.apply_async(self.generate_samples, args=[curr_vowels])

                # overwrite the curr_word and curr_vowels
                current_word = (id_, word)
                curr_vowels = []
                if phoneme.phoneme[-1].isnumeric():
                    curr_vowels.append(phoneme)

        self.pool.apply(self.generate_samples, args=[curr_vowels])
        phoneme_alignment_file.close()
        self.pool.close()
        self.pool.join()

    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['pool']
        return self_dict

    def __setstate__(self, state):
        self.__dict__.update(state)



In [13]:
wav_root='/home/arunav/Desktop/8th-semester/RE/lexical-stress-detection-master/data/dev-clean/2412/153948'
wav_file='2412-153948-0004.wav'
phoneme_path='/home/arunav/Desktop/8th-semester/RE/lexical-stress-detection-master/data/temp'
output_csv='/home/arunav/Desktop/8th-semester/RE/lexical-stress-detection-master/data/temp_csv.csv'

In [14]:
out_file = open(output_csv, 'w')
alignment_dict = _create_dict()
wav_file = wav_root + '/' + wav_file
sr, signal = sciwav.read(wav_file)
transcript ="SHEEP AND CATTLE WERE INTRODUCED AND BRED WITH EXTREME RAPIDITY MEN TOOK UP THEIR FIFTY THOUSAND OR ONE HUNDRED THOUSAND ACRES OF COUNTRY GOING INLAND ONE BEHIND THE OTHER TILL IN A FEW YEARS THERE WAS NOT AN ACRE BETWEEN THE SEA AND THE FRONT RANGES WHICH WAS NOT TAKEN UP AND STATIONS EITHER FOR SHEEP OR CATTLE WERE SPOTTED ABOUT AT INTERVALS OF SOME TWENTY OR THIRTY MILES OVER THE WHOLE COUNTRY"
print(transcript)
alignment = align_audio(wav_file, transcript)

for word in alignment['words']:
    if word['case'] != 'success':
        continue

    start_time, end_time = word['start'], word['end']
    aligned_word = word['alignedWord']
    key = [aligned_word.lower()]
    for phoneme in word['phones']:
        phone = phoneme['phone']
        key.append(phone.split('_')[0])

    key = ' '.join(key)
    phoneme_tuple = alignment_dict.get(key, ())

    if len(phoneme_tuple) == 0:
        print('word: {} not in dict, skipping...'.format(word))
        continue

    if len(phoneme_tuple) != len(word['phones']):
        print('word: {} not aligned properly, skipping...'.format(word))
        continue

    # now map phonemes and slice wav
    for i, phoneme in enumerate(word['phones']):
        phone_start = start_time
        phone_end = phone_start + phoneme['duration']
        # check if vowel phoneme
        if phoneme_tuple[i][-1].isdigit():

            file_name =  aligned_word + '_' + phoneme_tuple[i] + '_' + \
                        str(int(phone_start * 1000)) + '_' + str(int(phone_end * 1000)) + '.wav'

            start_frame, end_frame = int(phone_start * sr), int(phone_end * sr)
            sciwav.write(phoneme_path + '/' + file_name, sr, signal[start_frame:end_frame])
            out_file.write(file_name + '\t' +  aligned_word + '\t' + phoneme_tuple[i] + '\n')
        start_time = phone_end

print('done alignment and slicing for file: {}'.format(wav_file))
out_file.close()

SHEEP AND CATTLE WERE INTRODUCED AND BRED WITH EXTREME RAPIDITY MEN TOOK UP THEIR FIFTY THOUSAND OR ONE HUNDRED THOUSAND ACRES OF COUNTRY GOING INLAND ONE BEHIND THE OTHER TILL IN A FEW YEARS THERE WAS NOT AN ACRE BETWEEN THE SEA AND THE FRONT RANGES WHICH WAS NOT TAKEN UP AND STATIONS EITHER FOR SHEEP OR CATTLE WERE SPOTTED ABOUT AT INTERVALS OF SOME TWENTY OR THIRTY MILES OVER THE WHOLE COUNTRY
starting alignment /home/arunav/Desktop/8th-semester/RE/lexical-stress-detection-master/data/dev-clean/2412/153948/2412-153948-0004.wav


INFO:root:1/2
INFO:root:2/2
INFO:root:2 unaligned words (of 73)
INFO:root:after 2nd pass: 2 unaligned words (of 73)


word: {'alignedWord': 'extreme', 'case': 'success', 'end': 2.81, 'endOffset': 54, 'phones': [{'duration': 0.06, 'phone': 'eh_B'}, {'duration': 0.09, 'phone': 'k_I'}, {'duration': 0.02, 'phone': 's_I'}, {'duration': 0.06, 'phone': 't_I'}, {'duration': 0.04, 'phone': 'r_I'}, {'duration': 0.09, 'phone': 'iy_I'}, {'duration': 0.06, 'phone': 'm_E'}], 'start': 2.39, 'startOffset': 47, 'word': 'EXTREME'} not in dict, skipping...
word: {'alignedWord': '<unk>', 'case': 'success', 'end': 3.41, 'endOffset': 63, 'phones': [{'duration': 0.6, 'phone': 'oov_S'}], 'start': 2.81, 'startOffset': 55, 'word': 'RAPIDITY'} not in dict, skipping...
word: {'alignedWord': 'was', 'case': 'success', 'end': 11.44, 'endOffset': 201, 'phones': [{'duration': 0.05, 'phone': 'w_B'}, {'duration': 0.05, 'phone': 'ah_I'}, {'duration': 0.07, 'phone': 'z_E'}], 'start': 11.27, 'startOffset': 198, 'word': 'WAS'} not in dict, skipping...
word: {'alignedWord': 'was', 'case': 'success', 'end': 14.329998999999999, 'endOffset': 2

In [15]:
out_dir='/home/arunav/Desktop/8th-semester/RE/lexical-stress-detection-master/data/temp_feat'
sample_extraction = SampleExtraction(phoneme_path, output_csv, out_dir,'0')
sample_extraction.extract_features()

finished writing 1 samples for id: 1, word: were
finished writing 1 samples for id: 1, word: sheep
finished writing 1 samples for id: 1, word: and
finished writing 2 samples for id: 1, word: cattle
Created directories for each label in path: /home/arunav/Desktop/8th-semester/RE/lexical-stress-detection-master/data/temp_feat
finished writing 1 samples for id: 1, word: bred
finished writing 1 samples for id: 1, word: and
finished writing 1 samples for id: 1, word: with
finished writing 1 samples for id: 1, word: men
finished writing 1 samples for id: 1, word: took
finished writing 1 samples for id: 1, word: up
finished writing 1 samples for id: 1, word: their
finished writing 3 samples for id: 1, word: introduced
finished writing 2 samples for id: 1, word: fifty
finished writing 1 samples for id: 1, word: or
finished writing 1 samples for id: 1, word: one
finished writing 2 samples for id: 1, word: thousand
finished writing 2 samples for id: 1, word: hundred
finished writing 1 samples fo

In [16]:
import sys
import time
import tqdm
import glob
import os
import pickle
import numpy as np
import torch
import torch.nn as nn
import pandas as pd
from torch import optim
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torchvision.datasets import DatasetFolder

In [17]:
class CNNDataset(Dataset):
    def __init__(self, root):
        self.dataset_folder = DatasetFolder(root=root, loader=CNNDataset._npy_loader, extensions=('_mfcc.npy',))
        self.len_ = len(self.dataset_folder)
        self.folder_to_index = self.dataset_folder.class_to_idx

    @staticmethod
    def _npy_loader(path):
        mfcc = np.load(path)
        non_mfcc_file_path = path.replace('mfcc', 'other')
        non_mfcc = np.load(non_mfcc_file_path)

        # in_channels x height x width
        assert mfcc.shape == (3, 13, 30)
        assert non_mfcc.shape == (18, )

        mfcc = torch.from_numpy(mfcc).float()
        non_mfcc = torch.from_numpy(non_mfcc).float()

        return mfcc, non_mfcc, path

    def __getitem__(self, index):

        return self.dataset_folder[index]

    def __len__(self):
        return self.len_

In [18]:
class ResBlock(nn.Module):

    def __init__(self, in_channels, out_channels, kernel_size, stride=1):
        super().__init__()
        padding = (kernel_size - 1) // 2
        self.network = nn.Sequential(
            nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size,
                      padding=padding, stride=stride),
            nn.BatchNorm2d(in_channels),
            nn.ReLU(),
            nn.Conv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=kernel_size,
                      padding=padding, stride=stride),
            nn.BatchNorm2d(out_channels)
        )
        self.relu = nn.ReLU()

    def forward(self, x):
        out = self.network(x)
        out = out + x
        out = self.relu(out)
        return out



In [19]:
class CNNStressNet(nn.Module):

    def __init__(self, reduction='mean'):
        super().__init__()
        self.loss_layer = nn.CrossEntropyLoss(reduction=reduction)
        self.cnn_network = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, padding=(3 - 1)//2, stride=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=(3 - 1)//2, stride=2),
            ResBlock(in_channels=32, out_channels=32, kernel_size=3),
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, padding=(3 - 1) // 2, stride=2),
            nn.ReLU(),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=(0, (3 - 1) // 2), stride=2),
            nn.BatchNorm2d(num_features=64),
            nn.ReLU(),
            nn.AvgPool2d(kernel_size=(1, 4))
        )

        self.dnn_network = nn.Sequential(
            nn.Linear(18, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 64)
        )

        self.fully_connected = nn.Sequential(
            nn.BatchNorm1d(num_features=128),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, 512),
            nn.Dropout(p=0.25),
            nn.ReLU(),
            nn.BatchNorm1d(num_features=512),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.BatchNorm1d(num_features=128),
            nn.ReLU(),
            nn.Linear(128, 2)
        )

    def forward(self, mfcc, non_mfcc):
        n = mfcc.shape[0]
        cnn_out = self.cnn_network(mfcc)
        cnn_out = cnn_out.reshape(n, 64)

        dnn_out = self.dnn_network(non_mfcc)

        out = torch.cat([cnn_out, dnn_out], dim=1)
        out = self.fully_connected(out)

        return out

    def loss(self, predictions, labels):
        loss_val = self.loss_layer(predictions, labels)
        return loss_val


In [20]:
test_path='/home/arunav/Desktop/8th-semester/RE/lexical-stress-detection-master/data/temp_feat'
model_path='/home/arunav/Desktop/8th-semester/RE/lexical-stress-detection-master/data/colab_data/models'

In [21]:
kwargs =  {}

test_dataset = CNNDataset(root=test_path)
test_loader = DataLoader(test_dataset, batch_size=512, shuffle=True, **kwargs)


model = CNNStressNet(reduction='mean')
model = restore_model(model, model_path)

found modeL /home/arunav/Desktop/8th-semester/RE/lexical-stress-detection-master/data/colab_data/models/9.pth, restoring


In [22]:
n=0
output=[]
for batch_idx, ((mfcc, non_mfcc, path), label) in enumerate(tqdm.tqdm(test_loader)):
    out = model(mfcc, non_mfcc)
    prob = torch.nn.functional.softmax(out, dim=1)
    output.append(prob)
    

100%|██████████| 1/1 [00:00<00:00,  1.01it/s]


In [23]:
temp=output[0].tolist()

In [24]:
temp


[[0.9999880790710449, 1.1899972378159873e-05],
 [4.652678398997523e-06, 0.9999953508377075],
 [0.9999973773956299, 2.676935309864348e-06],
 [0.9999867677688599, 1.326670462731272e-05],
 [0.999984622001648, 1.5382012861664407e-05],
 [0.9989761114120483, 0.0010238814866170287],
 [0.9994372725486755, 0.0005626857164315879],
 [0.9999798536300659, 2.0105633666389622e-05],
 [5.239860989547651e-09, 1.0],
 [0.9998307228088379, 0.00016923589282669127],
 [0.9998670816421509, 0.00013286595640238374],
 [0.000757101399358362, 0.999242901802063],
 [0.9999814033508301, 1.8616845409269445e-05],
 [0.9999415874481201, 5.841346501256339e-05],
 [0.9999477863311768, 5.220634557190351e-05],
 [9.248591226196368e-09, 1.0],
 [0.9999947547912598, 5.250706635706592e-06],
 [1.516290093483974e-09, 1.0],
 [0.9999902248382568, 9.828219845076092e-06],
 [0.9999910593032837, 8.93899687071098e-06],
 [1.0340244216422434e-06, 0.999998927116394],
 [0.382859468460083, 0.617140531539917],
 [0.9999969005584717, 3.088737230427

In [28]:

t=0
n=0
s=len(temp)
for i in temp:
    t+=i[0]
    n+=i[1]

In [29]:
output=[t/s , n/s]


In [30]:
print('The original label of wav file is:{} and the predicted output is [{},{}]'.format( wave_file_label,output[1],output[0]))

The original label of wav file is:1 and the predicted output is [0.3182120560104184,0.6817879402937157]
