# Классификатор акустических событий

In [1]:
from scipy.io import wavfile
from scipy.signal import hamming
from scipy.fftpack import fft

from functools import reduce

import os
import pandas as pd
import numpy as np

from sklearn.model_selection import cross_val_score, ShuffleSplit
from sklearn.ensemble import RandomForestClassifier

### Функции для построения признаков:

In [2]:
def read_signal(filename):
    '''Считать аудифайл'''
    
    _, data = wavfile.read(filename)
    
    return (data - data.mean())

In [3]:
def check_chunk(data, threshold, k):
    ''' 
    Проверить, что доля абсолютных отсчётов больших порога превышает k.
    Т.е. доля отсчётов, которые возможно принадлежат событию.
    '''
    
    count = reduce(lambda acum, x : acum + (x > threshold), np.abs(data)) + 1
    
    return count > len(data) * k

In [4]:
def get_transform_chunk(data, threshold, k, n, window):
    ''' Умножение на окно и получение спектра для отрезка'''
    
    if check_chunk(data, threshold, k):
        data *= window(n)
        
        return (True, np.abs(fft(data, n))[1 : int(n/2)])
    else:   
        return (False, None)

In [5]:
def get_transform(data, threshold, k, n, window, m):
    '''
    Получение среднего спектра сигнала по отрезкам где предположительно нет шума
    
    data - сигнал
    threshold - порог для отсечения шумового отрезка
    k - минимальная доля нешумовых отсчётов в отрезке 
    n - количество точек для Фурье
    window - применяемое окно
    m - минимальная доля отрезков, на которых был посчитан спектр
    '''
    
    result = []
    i = 0
    count = 0
    
    while i < data.size and i + n < data.size:
        tmp = get_transform_chunk(data[i:i + n], threshold, k, n, window)
        i += n
        
        # Если отрезок не шумовой добавли в список спектр данного отрезка
        if tmp[0]:
            count += 1
            result.append(tmp[1])
    
    # Если доля отрезков, на которых был посчитан спектр больше m,
    # то вернуть среднее всех отрезков(средний спектр по всем отрезкам)
    
    # Иначе пройти по всему сигналу и посчитать спектр на каждом отрезке
    
    if count > (data.size / n) * m:
        return sum(map(lambda x : x / count, result))
    else:
        result = []
        i = 0
        count = 0
        
        while i < data.size and i + n < data.size:
            tmp = get_transform_chunk(data[i:i + n], threshold, 0, n, window)
            i += n
        
        if tmp[0]:
            count += 1
            result.append(tmp[1])
            
        return sum(map(lambda x : x / count, result))

### Извлечение меток классов обучающей выборки

In [6]:
meta = pd.read_csv('./meta/meta.txt', sep='\t', header=None, index_col=0)

meta.drop([1, 2, 3], axis=1, inplace=True)

meta.index.name = 'name'
meta.columns = ['class']

In [7]:
name_to_class = {'background' : 1, 'bags' : 2, 'door' : 3, 'keyboard' : 4, 'knocking_door' : 5, 'ring' : 6, 'speech' : 7, 'tool' : 8}

class_to_name = {value : key for key, value in name_to_class.items()}

In [8]:
y_train = np.zeros(11307)

for index, file in enumerate(os.listdir('./audio')):
    y_train[index] = name_to_class[meta['class'][file]]

### Извлечение признаков обучающей выборки

In [9]:
X_train = np.zeros((11307, 511))

for index, file in enumerate(os.listdir('./audio')):
    data = read_signal('./audio' + '/' + file) 
    X_train[index] = get_transform(data, np.median(np.abs(data)), 0.2, 1024, hamming, 0.1)

### Кросс-валидация

In [10]:
cv = ShuffleSplit(n_splits=3, test_size=0.2, random_state=141)

model_to_set = RandomForestClassifier(criterion='entropy', max_depth=50, n_estimators=500, random_state=141, n_jobs=-1)

cross_val_score(model_to_set, X_train, y_train, cv=cv)

array([0.96905393, 0.96905393, 0.96816976])

### Обучение классификатора

In [11]:
clf = model_to_set
clf.fit(X_train, y_train)

RandomForestClassifier(bootstrap=True, class_weight=None, criterion='entropy',
            max_depth=50, max_features='auto', max_leaf_nodes=None,
            min_impurity_decrease=0.0, min_impurity_split=None,
            min_samples_leaf=1, min_samples_split=2,
            min_weight_fraction_leaf=0.0, n_estimators=500, n_jobs=-1,
            oob_score=False, random_state=141, verbose=0, warm_start=False)

### Проверка на тест сете

In [16]:
result = open('result.txt', 'w')

In [17]:
for file in os.listdir('./test'):
    
    data = read_signal('./test' + '/' + file) 
    data = get_transform(data, np.median(np.abs(data)), 0.2, 1024, hamming, 0.1)
    
    predicted_probs = clf.predict_proba(data.reshape((1, 511)))[0]
    max_predicted_proba = max(predicted_probs)
    predicted_class = class_to_name[int(clf.predict(data.reshape((1, 511)))[0])]
  
    result.write(file + '\t' + str(max_predicted_proba) + '\t' + predicted_class + '\n')
    
#     if file.find('unknown') == -1:
#     # Для открытой задачи
#         result.write(file + '\t' + str(max_predicted_proba) + '\t' + predicted_class + '\n')
#     elif max_predicted_proba > 0.65:
#     # Если классификатор достаточно уверен
#         result.write(file + '\t' + str(max_predicted_proba) + '\t' + predicted_class + '\n')
#     elif max_predicted_proba > 0.4 and sorted(np.abs(predicted_probs - max_predicted_proba))[1] >= 0.35:
#     # Если классификатор уверен не очень сильно, но остальные вероятности намного меньше
#         result.write(file + '\t' + str(max_predicted_proba) + '\t' + predicted_class + '\n')
#     else:
#     # Классификатор слабо уверен или есть несколько равных не очень больших вероятностей
#         result.write(file + '\t' + str(predicted_probs) + '\t' + 'unknown' + '\n')

In [18]:
result.close()