In [1]:
import os
import pickle
import numpy
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.mixture import GaussianMixture
from sklearn.linear_model import SGDClassifier
import config
from preprocess import read_data, MyDataset
from model import LSTMClassifier

OSError: [WinError 127] The specified procedure could not be found. Error loading "C:\Users\LeHoa\Anaconda3\lib\site-packages\torch\lib\caffe2_detectron_ops.dll" or one of its dependencies.

In [2]:

class Extractor:
    def __init__(self):
        sample_rate = 16000
        self.mfcc = MFCC(sample_rate=sample_rate, n_mfcc=40,
                         melkwargs={'win_length': int(0.025 * sample_rate),
                                    'hop_length': int(0.010 * sample_rate),
                                    'n_fft': int(0.025 * sample_rate)})

    def extract_feature(self, audio):
        shape = audio.size()
        audio = audio.reshape(-1, shape[-1])
        feature = self.mfcc(audio)
        feature = feature.reshape(shape[:-1] + feature.shape[-2:])[:, 0]
        feature = feature.transpose(1, 2)
        return feature


In [4]:
def read_data(data_dir):
    extractor = Extractor()
    audio_dir = os.path.join(data_dir, "audio")
    label_dir = os.path.join(data_dir, "label")
    audio_silent = []
    audio_cough = []

    for filename in os.listdir(audio_dir):
        audio_path = os.path.join(audio_dir, filename)
        label_path = os.path.join(label_dir, filename.replace(".wav", ".txt"))
        audio, sr = torchaudio.load(audio_path)
        audio_size = audio.shape[1]
        segment_silent = []
        segment_cough = []

        last = 0
        with open(label_path, "r") as f:
            for line in f.readlines():
                start, end, _ = line.split()
                start, end = int(float(start) * 16000), int(float(end) * 16000)
                segment_silent.append((max(0, last), min(start, audio_size)))
                segment_cough.append((max(0, start), min(end, audio_size)))
                last = end

            segment_silent.append((max(last, 0), audio_size))

        for start, end in segment_silent:
            chunk_sample = config.CHUNK * 16000 // 1000
            if end - start >= chunk_sample:
                if config.model == "LSTM":
                    for x in range(start, end - chunk_sample, 160):
                        audio_silent.append(audio[:, x: x + chunk_sample])
                else:
                    for i in range((end - chunk_sample - start) // chunk_sample):
                        x = random.randint(start, end - chunk_sample)
                        audio_silent.append(audio[:, x: x + chunk_sample])

        for start, end in segment_cough:
            chunk_sample = config.CHUNK * 16000 // 1000
            if end - start >= chunk_sample:
                if config.model == "LSTM":
                    for x in range(start, end - chunk_sample, 160):
                        audio_cough.append(audio[:, x: x + chunk_sample])
                else:
                    for i in range((end - chunk_sample - start) // chunk_sample):
                        x = random.randint(start, end - chunk_sample)
                        audio_cough.append(audio[:, x: x + chunk_sample])

    audio_silent = torch.stack(audio_silent, dim=0)
    audio_cough = torch.stack(audio_cough, dim=0)
    feature_silent = extractor.extract_feature(audio_silent)
    feature_cough = extractor.extract_feature(audio_cough)
    feature = torch.cat([feature_silent, feature_cough], dim=0)
    label = torch.tensor(([0] * feature_silent.shape[0] + [1] * feature_cough.shape[0]), dtype=torch.int64)
    return feature, label

In [None]:
def train_svm():
    feature, label = read_data(config.DATA_DIR)
    train_feature, test_feature, train_label, test_label = train_test_split(feature, label)
    train_feature = train_feature.reshape(train_feature.shape[0], -1)
    test_feature = test_feature.reshape(test_feature.shape[0], -1)
    svm = SVC()
    svm.fit(train_feature.numpy(), train_label.numpy())
    y_pred = svm.predict(test_feature.numpy())
    pickle.dump(svm, open("resource/gmm.pkl", 'wb'))
    print(accuracy_score(test_label.numpy(), y_pred))

In [None]:
def train_gmm():
    feature, label = read_data(config.DATA_DIR)
    feature, label = feature.reshape(feature.shape[0], -1).numpy(), label.numpy()
    train_feature, test_feature, train_label, test_label = train_test_split(feature, label)
    model_0 = GaussianMixture(n_components=2, max_iter=100)
    model_1 = GaussianMixture(n_components=2, max_iter=100)
  
    model_0.fit(train_feature[train_label == 0], train_label[train_label == 0])
    model_1.fit(train_feature[train_label == 1], train_label[train_label == 1])
 
    y_pred = []
    score_0 = model_0.score_samples(test_feature)
    score_1 = model_1.score_samples(test_feature)
    for i in range(len(score_1)):
        if score_0[i] > score_1[i]:
            y_pred.append(0)
        else:
            y_pred.append(1)

    # print(model_0.score_samples(test_feature), model_1.score_samples(test_feature))
    print(accuracy_score(test_label, y_pred))
    pickle.dump(model_0, open("resource/non_cough.pkl", 'wb'))
    pickle.dump(model_1, open("resource/cough.pkl", 'wb'))

In [5]:
def augment_value(indir, outdir, label_dir):
    for filename in os.listdir(indir):
        v = random.uniform(0.6, 1.5)
        filleout = "{}_value-{}.wav".format(filename.replace(".wav", ""), str(v).replace(".", "_"))
        inpath = os.path.join(indir, filename)
        outpath = os.path.join(outdir, filleout)
        inlabelpath = os.path.join(label_dir, filename.replace(".wav", ".txt"))
        outlabelpath = os.path.join(outdir, filleout.replace(".wav", ".txt"))
        os.system("sox -v {} {} {}".format(v, inpath, outpath))
        os.system("cp {} {}".format(inlabelpath, outlabelpath))

In [6]:
def augment_speed(indir, outdir, label_dir):
    for filename in os.listdir(indir):
        s = random.uniform(0.7, 1.4)
        filleout = "{}_speed-{}.wav".format(filename.replace(".wav", ""), str(s).replace(".", "_"))
        inpath = os.path.join(indir, filename)
        outpath = os.path.join(outdir, filleout)
        inlabelpath = os.path.join(label_dir, filename.replace(".wav", ".txt"))
        outlabelpath = os.path.join(outdir, filleout.replace(".wav", ".txt"))
        os.system("sox {} {} tempo {}".format(inpath, outpath, s))


        fw = open(outlabelpath, "w")
        with open(inlabelpath, "r") as f:
            for line in f.readlines():
                start, end, label = line.split()
                start, end = float(start) / s, float(end) / s
                write("{}\t{}\t{}\n".format(start, end, label))

        fw.close()

In [None]:
def read_data(data_dir):
    audio_dir = os.path.join(data_dir, "audio")
    label_dir = os.path.join(data_dir, "label")
    feature_silent = []
    feature_cough = []

    for filename in os.listdir(audio_dir):
        audio_path = os.path.join(audio_dir, filename)
        label_path = os.path.join(label_dir, filename.replace(".wav", ".txt"))
        # audio, sr = torchaudio.load(audio_path)
        audio, sr = librosa.load(audio_path, sr=16000)
        mfcc_feature = librosa.feature.mfcc(y=audio, sr=sr, hop_length=int(0.010*sr), n_fft=int(0.025*sr), n_mfcc=40)
        zcr_feature = librosa.feature.zero_crossing_rate(y=audio, hop_length=int(0.010 * sr))
        feature = np.concatenate((mfcc_feature, zcr_feature)).transpose((1, 0))
        # print(mfcc_feature.shape, zcr_feature.shape)

        audio_size = feature.shape[0]
        segment_silent = []
        segment_cough = []

        last = 0
        with open(label_path, "r") as f:
            for line in f.readlines():
                start, end, _ = line.split()
                start, end = int(float(start) * 1000) // 10, int(float(end) * 1000) // 10
                # print(start, end, feature.shape)
                segment_silent.append((max(0, last), min(start, audio_size)))
                segment_cough.append((max(0, start), min(end, audio_size)))
                last = end

            segment_silent.append((max(last, 0), audio_size))

        for start, end in segment_silent:
            chunk_sample = config.CHUNK // 10
            if end - start >= chunk_sample:
                if config.model == "LSTM":
                    for x in range(start, end - chunk_sample, 1):
                        feature_silent.append(audio[:, x: x + chunk_sample])
                else:
                    if "speed" in filename or "value" in filename:
                        continue

                    for i in range((end - start) // chunk_sample):
                        x = random.randint(start, end - chunk_sample)
                        feature_silent.append(feature[x: x + chunk_sample])

        for start, end in segment_cough:
            chunk_sample = config.CHUNK // 10
            if end - start >= chunk_sample:
                if config.model == "LSTM":
                    for x in range(start, end - chunk_sample, 1):
                        feature_cough.append(audio[:, x: x + chunk_sample])
                else:
                    if "speed" in filename or "value" in filename:
                        continue

                    for i in range((end - start) // chunk_sample):
                        x = random.randint(start, end - chunk_sample)
                        feature_cough.append(feature[x: x + chunk_sample])

    feature_silent = np.stack(feature_silent, axis=0)
    feature_cough = np.stack(feature_cough, axis=0)

    feature = np.concatenate([feature_silent, feature_cough], axis=0)
    label = torch.tensor(([0] * feature_silent.shape[0] + [1] * feature_cough.shape[0]), dtype=torch.int64)
    return feature, label

In [None]:
def train_gmm():
    feature, label = read_data(config.DATA_DIR)
    feature, label = feature.reshape(feature.shape[0], -1).numpy(), label.numpy()
    train_feature, test_feature, train_label, test_label = train_test_split(feature, label)
    model_0 = GaussianMixture(n_components=3, max_iter=100, weights_init=[1/3, 1/3, 1/3], random_state=42)
    model_1 = GaussianMixture(n_components=3, max_iter=100, weights_init=[1/3, 1/3, 1/3], random_state=42)
    # model.means_init = numpy.array([train_feture[train_label == i].mean(axis=0)
    #                                 for i in range(2)])

    model_0.fit(train_feature[train_label == 0], train_label[train_label == 0])
    model_1.fit(train_feature[train_label == 1], train_label[train_label == 1])
    # pred = model.predict(test_feature)
    # for feat in test_feature:
    y_pred = []
    score_0 = model_0.score_samples(test_feature)
    score_1 = model_1.score_samples(test_feature)
    for i in range(len(score_1)):
        if score_0[i] > score_1[i]:
            y_pred.append(0)
        else:
            y_pred.append(1)

    # print(model_0.score_samples(test_feature), model_1.score_samples(test_feature))
    print(accuracy_score(test_label, y_pred))

    # recall and precision
    matrix = classification_report(test_label, y_pred)
    print("Classification report: \n", matrix)

    # Plot non-normalized confusion matrix
    np.set_printoptions(precision=2)
    con_matrix = confusion_matrix(test_label, y_pred)
    class_names = ["Non_cough", "Cough"]
    plt.figure()
    plot_confusion_matrix(con_matrix, classes=class_names,
                          title='Confusion matrix, without normalization')

    # Plot normalized confusion matrix
    plt.figure()
    plot_confusion_matrix(con_matrix, classes=class_names, normalize=True,
                          title='Normalized confusion matrix')

    plt.show()

    pickle.dump(model_0, open("resource/non_cough.pkl", 'wb'))
    pickle.dump(model_1, open("resource/cough.pkl", 'wb'))



In [None]:
def read_data(data_dir):
    audio_dir = os.path.join(data_dir, "audio")
    label_dir = os.path.join(data_dir, "label")
    feature_silent = []
    feature_cough = []

    for filename in os.listdir(audio_dir):
        audio_path = os.path.join(audio_dir, filename)
        label_path = os.path.join(label_dir, filename.replace(".wav", ".txt"))
        # audio, sr = torchaudio.load(audio_path)
        audio, sr = librosa.load(audio_path, sr=16000)
        gfcc_feature = gfcc(sig=audio, fs=sr, num_ceps=13)
        feature_size = gfcc_feature.shape[0]
        mfcc_feature = librosa.feature.mfcc(y=audio, sr=sr, hop_length=int(0.010 * sr), n_fft=int(0.025 * sr),
                                            n_mfcc=40).transpose(1, 0)[: feature_size]

        zcr_feature = librosa.feature.zero_crossing_rate(y=audio, hop_length=int(0.010 * sr)).transpose(1, 0)[
                      : feature_size]

        feature = np.concatenate((mfcc_feature, gfcc_feature, zcr_feature), axis=1)

        segment_silent = []
        segment_cough = []

        last = 0
        with open(label_path, "r") as f:
            for line in f.readlines():
                start, end, _ = line.split()
                start, end = int(float(start) * 1000) // 10, int(float(end) * 1000) // 10
                segment_silent.append((max(0, last), min(start, feature_size)))
                segment_cough.append((max(0, start), min(end, feature_size)))
                last = end

            segment_silent.append((max(last, 0), feature_size))

        for start, end in segment_silent:
            chunk_sample = config.CHUNK // 10
            if end - start >= chunk_sample:
                for x in range(start, end - chunk_sample, chunk_sample):
                    feature_silent.append(feature[x: x + chunk_sample])


        for start, end in segment_cough:
            chunk_sample = config.CHUNK // 10
            if end - start >= chunk_sample:
                for x in range(start, end - chunk_sample, chunk_sample):
                    feature_cough.append(feature[x: x + chunk_sample])


    feature_silent = np.stack(feature_silent, axis=0)
    feature_cough = np.stack(feature_cough, axis=0)

    feature = np.concatenate([feature_silent, feature_cough], axis=0)
    label = torch.tensor(([0] * feature_silent.shape[0] + [1] * feature_cough.shape[0]), dtype=torch.int64)
    return feature, label