<a href="https://colab.research.google.com/github/devpriyagoel/Speech-Emotion-Recognition/blob/master/data_pre_processing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import wave
import numpy as np
import os
import glob
import pickle

In [2]:
!pip install python_speech_features

Collecting python_speech_features
  Downloading https://files.pythonhosted.org/packages/ff/d1/94c59e20a2631985fbd2124c45177abaa9e0a4eee8ba8a305aa26fc02a8e/python_speech_features-0.6.tar.gz
Building wheels for collected packages: python-speech-features
  Building wheel for python-speech-features (setup.py) ... [?25l[?25hdone
  Created wheel for python-speech-features: filename=python_speech_features-0.6-cp36-none-any.whl size=5887 sha256=00536b0a3b23a1a3c43e4527c42d3fafb584a31c61b83defd14e1d313956b35e
  Stored in directory: /root/.cache/pip/wheels/3c/42/7c/f60e9d1b40015cd69b213ad90f7c18a9264cd745b9888134be
Successfully built python-speech-features
Installing collected packages: python-speech-features
Successfully installed python-speech-features-0.6


In [3]:
import python_speech_features as ps

Read data from IEMOCAP database and prepare train, test, validation sets

In [4]:
class PreProcess:
 
    def __init__(self, root_dir):
        self.eps = 1e-5
        self.filter_num = 40
        self.frame_num = 300
        self.num_per_emo = 300
        self.root_dir = root_dir
 
        self.train_num = 2928
        self.test_utterance_num = 259  # the number of test utterance
        self.valid_utterance_num = 298
        self.test_segment_num = 420  # the number of test 2s segments
        self.valid_segment_num = 436
 
        self.train_emt = {'hap': 0, 'ang': 0, 'neu': 0, 'sad': 0}
        self.test_emt = {'hap': 0, 'ang': 0, 'neu': 0, 'sad': 0}
        self.valid_emt = {'hap': 0, 'ang': 0, 'neu': 0, 'sad': 0}
        
        self.train_data = np.empty((self.train_num, self.frame_num, self.filter_num, 3), dtype=np.float32)
        self.test_data = np.empty((self.test_segment_num, self.frame_num, self.filter_num, 3), dtype=np.float32)
        self.valid_data = np.empty((self.valid_segment_num, self.frame_num, self.filter_num, 3), dtype=np.float32)
        
        self.train_label = np.empty((self.train_num, 1), dtype=np.int8)
        self.test_label_utterance = np.empty((self.test_utterance_num, 1), dtype=np.int8)
        self.valid_label_utterance = np.empty((self.valid_utterance_num, 1), dtype=np.int8)
        self.test_label_segment = np.empty((self.test_segment_num, 1), dtype=np.int8)
        self.valid_label_segment = np.empty((self.valid_segment_num, 1), dtype=np.int8)
        
        self.test_segments_per_utterance = np.arange(self.test_utterance_num)
        self.valid_segments_per_utterance = np.arange(self.valid_utterance_num)
        
        self.train_num = 0
        self.test_segment_num = 0
        self.valid_segment_num = 0
        self.test_utterance_num = 0
        self.valid_utterance_num = 0
        self.wav_files = None
        self.mean1, self.mean2, self.mean3 = 0, 0, 0
        self.std1, self.std2, self.std3 = 0, 0, 0
 
    def read_wav_file(self, wav_filename):
        """Read the audio files in wav format and store the wave data"""
        wav_file = wave.open(wav_filename, 'r')
        params = wav_file.getparams()
        _, _, framerate, wav_length = params[:4]
        str_data = wav_file.readframes(wav_length)
        wave_data = np.frombuffer(str_data, dtype=np.short)
        wav_file.close()
        mel_spec = ps.logfbank(wave_data, framerate, nfilt=self.filter_num)
        delta1 = ps.delta(mel_spec, 2)
        delta2 = ps.delta(delta1, 2)
        return mel_spec, delta1, delta2
 
    @staticmethod
    def generate_label(emotion):
        if emotion == 'ang':
            return 0
        elif emotion == 'sad':
            return 1
        elif emotion == 'hap':
            return 2
        elif emotion == 'neu':
            return 3
        elif emotion == 'fear':
            return 4
        return 5
 
    @staticmethod
    def parse_emo_file(emo_file_name):
        emo_map = {}
        with open(emo_file_name, 'r') as emo_file:
            while True:
                line = emo_file.readline()
                if not line:
                    break
                if line[0] != '[':
                    continue
                t = line.split()
                emo_map[t[3]] = t[4]
        return emo_map
 
    def read_IEMOCAP(self):
        """Read the data files and generate a dict with generated features"""
        wav_file_features = {}
        for session in sorted(os.listdir(self.root_dir)):
            if session[0] != 'S':
                continue
            wav_dir = os.path.join(self.root_dir, session, 'sentences', 'wav')
            emo_labels_dir = os.path.join(self.root_dir, session, 'dialog', 'EmoEvaluation')
            for impro in sorted(os.listdir(wav_dir)):
                if impro[7] != 'i':
                    continue
                emo_file_name = os.path.join(emo_labels_dir, impro + '.txt')
                emo_map = self.parse_emo_file(emo_file_name)
                file_dir = os.path.join(wav_dir, impro, '*.wav')
                files = glob.glob(file_dir)
                for filename in sorted(files):
                    wav_name = os.path.basename(filename)
                    wav_name = os.path.splitext(wav_name)[0]
                    emotion = emo_map[wav_name]
                    if emotion not in ['hap', 'ang', 'neu', 'sad']:
                        continue
                    mel_spec, delta1, delta2 = self.read_wav_file(filename)
                    wav_file_features[wav_name] = {
                        'emotion': emotion,
                        'mel_spec': mel_spec,
                        'delta1': delta1,
                        'delta2': delta2
                    }
        self.wav_files = wav_file_features
        # print(self.wav_files)
 
    def add_to_set(self, part, delta11, delta21, emotion, set_type):
        # TODO extend it for test validation
        if set_type == 'train':
            self.train_data[self.train_num, :, :, 0] = part
            self.train_data[self.train_num, :, :, 1] = delta11
            self.train_data[self.train_num, :, :, 2] = delta21
            self.train_label[self.train_num] = self.generate_label(emotion)
            self.train_emt[emotion] += 1
            self.train_num += 1
        elif set_type == 'test':
            self.test_data[self.test_segment_num, :, :, 0] = part
            self.test_data[self.test_segment_num, :, :, 1] = delta11
            self.test_data[self.test_segment_num, :, :, 2] = delta21
            self.test_label_segment[self.test_segment_num] = self.generate_label(emotion)
            self.test_emt[emotion] += 1
            self.test_segment_num += 1
        else:
            self.valid_data[self.valid_segment_num, :, :, 0] = part
            self.valid_data[self.valid_segment_num, :, :, 1] = delta11
            self.valid_data[self.valid_segment_num, :, :, 2] = delta21
            self.valid_label_segment[self.valid_segment_num] = self.generate_label(emotion)
            self.valid_emt[emotion] += 1
            self.valid_segment_num += 1
        # print('\t'.join((str(train_num), wavname, '0', 'self.frame_num', emotion)))
 
    @staticmethod
    def find_set(wav_name):
        """returns whether the wav_name should be part of train/test/validation set"""
        if wav_name[4] in ['1', '2', '3', '4']:
            return 'train'
        if wav_name[-4] == 'M':
            return 'test'
        return 'validation'
 
    def data_padding(self, data):
        """Padding short segments of data with 0s"""
        return np.pad(data, ((0, self.frame_num - data.shape[0]), (0, 0)), 'constant', constant_values=0)
 
    def generate_data(self):
        """generates train test validation sets before calculating zscore """
        for wav_name in sorted(self.wav_files.keys()):
            part = self.wav_files[wav_name]['mel_spec']
            delta11 = self.wav_files[wav_name]['delta1']
            delta21 = self.wav_files[wav_name]['delta2']
            emotion = self.wav_files[wav_name]['emotion']
            time = part.shape[0]
            set_type = self.find_set(wav_name)  # train/test/validation
 
            if time <= self.frame_num:
                part = self.data_padding(part)
                delta11 = self.data_padding(delta11)
                delta21 = self.data_padding(delta21)
                start_times = [0]
            elif emotion == 'hap' and set_type == 'train':
                frames = divmod(time - self.frame_num, 100)[0] + 1
                start_times = [i * 100 for i in range(frames)]
            else:
                start_times = [0, time - self.frame_num]
 
            end_times = [i + self.frame_num for i in start_times]
 
            for begin, end in zip(start_times, end_times):
                self.add_to_set(part[begin:end, :], delta11[begin:end, :], delta21[begin:end, :], emotion, set_type)
            
            if set_type == 'test':
                self.test_label_utterance[self.test_utterance_num] = self.generate_label(emotion)
                self.test_segments_per_utterance[self.test_utterance_num] = len(start_times)
                self.test_utterance_num = self.test_utterance_num + 1
 
            if set_type == 'validation':
                self.valid_label_utterance[self.valid_utterance_num] = self.generate_label(emotion)
                self.valid_segments_per_utterance[self.valid_utterance_num] = len(start_times)
                self.valid_utterance_num = self.valid_utterance_num + 1
 
    def calculate_zscore(self):
        """calculates zscore from train data """
        self.mean1 = np.mean(self.train_data[:, :, :, 0].reshape(self.train_num * self.frame_num, self.filter_num), axis=0)
        self.mean2 = np.mean(self.train_data[:, :, :, 1].reshape(self.train_num * self.frame_num, self.filter_num), axis=0)
        self.mean3 = np.mean(self.train_data[:, :, :, 2].reshape(self.train_num * self.frame_num, self.filter_num), axis=0)
        self.std1 = np.std(self.train_data[:, :, :, 0].reshape(self.train_num * self.frame_num, self.filter_num), axis=0)
        self.std2 = np.std(self.train_data[:, :, :, 1].reshape(self.train_num * self.frame_num, self.filter_num), axis=0)
        self.std3 = np.std(self.train_data[:, :, :, 2].reshape(self.train_num * self.frame_num, self.filter_num), axis=0)
    
    def standardize_data(self):
        """Standardize train test validation sets after the calculation of zscore"""
        for i in range(self.train_num):
            self.train_data[i, :, :, 0] = (self.train_data[i, :, :, 0] - self.mean1) / (self.std1 + self.eps)
            self.train_data[i, :, :, 1] = (self.train_data[i, :, :, 1] - self.mean2) / (self.std2 + self.eps)
            self.train_data[i, :, :, 2] = (self.train_data[i, :, :, 2] - self.mean3) / (self.std3 + self.eps)
 
        for i in range(self.test_segment_num):
            self.test_data[i, :, :, 0] = (self.test_data[i, :, :, 0] - self.mean1) / (self.std1 + self.eps)
            self.test_data[i, :, :, 1] = (self.test_data[i, :, :, 1] - self.mean2) / (self.std2 + self.eps)
            self.test_data[i, :, :, 2] = (self.test_data[i, :, :, 2] - self.mean3) / (self.std3 + self.eps)
 
        for i in range(self.valid_segment_num):
            self.valid_data[i, :, :, 0] = (self.valid_data[i, :, :, 0] - self.mean1) / (self.std1 + self.eps)
            self.valid_data[i, :, :, 1] = (self.valid_data[i, :, :, 1] - self.mean2) / (self.std2 + self.eps)
            self.valid_data[i, :, :, 2] = (self.valid_data[i, :, :, 2] - self.mean3) / (self.std3 + self.eps)
 
    def class_indices(self):
        """"Index of each emotion class instance in the training data"""
        hap_index = np.arange(self.train_emt['hap'])
        neu_index = np.arange(self.train_emt['neu'])
        sad_index = np.arange(self.train_emt['sad'])
        ang_index = np.arange(self.train_emt['ang'])
        h2 = 0
        a0 = 0
        n3 = 0
        s1 = 0
        for i in range(self.train_num):
            if self.train_label[i] == 0:
                ang_index[a0] = i
                a0 = a0 + 1
            elif self.train_label[i] == 1:
                sad_index[s1] = i
                s1 = s1 + 1
            elif self.train_label[i] == 2:
                hap_index[h2] = i
                h2 = h2 + 1
            else:
                neu_index[n3] = i
                n3 = n3 + 1
 
        return hap_index, sad_index, neu_index, ang_index
 
    def generate_training_batch(self, hap_index, sad_index, neu_index, ang_index):
        """Generating a training batch with self.frame_num segments from each emotion"""
        np.random.seed(0)
        np.random.shuffle(neu_index)
        np.random.seed(0)
        np.random.shuffle(hap_index)
        np.random.seed(0)
        np.random.shuffle(sad_index)
        np.random.seed(0)
        np.random.shuffle(ang_index)
 
        train_label = np.empty((4 * self.num_per_emo, 1), dtype=np.int8)
        train_data = np.empty((4 * self.num_per_emo, self.frame_num, self.filter_num, 3), dtype=np.float32)
        train_data[0:self.num_per_emo] = self.train_data[hap_index[0:self.num_per_emo]].copy()
        train_label[0:self.num_per_emo] = self.train_label[hap_index[0:self.num_per_emo]].copy()
        train_data[self.num_per_emo:2 * self.num_per_emo] = self.train_data[sad_index[0:self.num_per_emo]].copy()
        train_label[self.num_per_emo:2 * self.num_per_emo] = self.train_label[sad_index[0:self.num_per_emo]].copy()
        train_data[2 * self.num_per_emo:3 * self.num_per_emo] = self.train_data[neu_index[0:self.num_per_emo]].copy()
        train_label[2 * self.num_per_emo:3 * self.num_per_emo] = self.train_label[neu_index[0:self.num_per_emo]].copy()
        train_data[3 * self.num_per_emo:4 * self.num_per_emo] = self.train_data[ang_index[0:self.num_per_emo]].copy()
        train_label[3 * self.num_per_emo:4 * self.num_per_emo] = self.train_label[ang_index[0:self.num_per_emo]].copy()
 
        arr = np.arange(4 * self.num_per_emo)
        np.random.seed(0)
        np.random.shuffle(arr)
        train_data = train_data[arr[0:]]
        train_label = train_label[arr[0:]]
 
        output = './processed_data.pkl'
        f = open(output, 'wb')
        pickle.dump((
            train_data, train_label,
            self.test_data, self.test_label_utterance, self.test_label_segment, self.test_segments_per_utterance,
            self.valid_data, self.valid_label_utterance, self.valid_label_segment, self.valid_segments_per_utterance),
            f)
        f.close()
 
    def preprocess(self):
        """Process the audio files to generate train/test/validation data with extracted features"""
        self.read_IEMOCAP()
        self.generate_data()
        self.calculate_zscore()
        self.standardize_data()
 
        hap_index, sad_index, neu_index, ang_index = self.class_indices()
        self.generate_training_batch(hap_index, sad_index, neu_index, ang_index)

In [None]:
from google.colab import drive
drive.mount("/content/drive")

In [None]:
import sys
# check the path
path_to_code_files = '/content/drive/My Drive/BTP - Dev Priya and Kushagra/Speech Emotion Recognition/Code'
sys.path.append(path_to_code_files)
%cd "/content/drive/My Drive/BTP - Dev Priya and Kushagra/Speech Emotion Recognition/Code"

/content/drive/My Drive/BTP - Dev Priya and Kushagra/Speech Emotion Recognition/Code


In [None]:
path_to_iemocap = '/content/drive/My Drive/BTP - Dev Priya and Kushagra/Licensed_Data/IEMOCAP'
p = PreProcess(path_to_iemocap)
p.preprocess()