In [1]:
import numpy as np
import matplotlib.pyplot as plt
import soundfile as sf
import sounddevice as sd
import math
import time
import cmath
import sys
import os
import random
import tabletext

# Что происходит в этом файле

Голосовая идентификация.

А именно, к концу этого файла создается функция, которая выдает "похожесть" двух записей друг на друга относительно источника.
Чтобы записи были хорошо похожи друг на друга, лучше произносить одну и ту же фразу.

Процесс:

1. Запись голоса (чтение готового файла)
2. Выделяем несколько фрагментов, которые будем анализировать (такие, на которых заметны колебания, то есть что-то говорят
3. Выполняем спектральный анализ с помощью быстрого преобразования Фурье для выделения амплитуд в голосе (иначе говоря, раскладываем на частоты)
4. Выделяем топ пиков (самые высокие локальные максимумы, а для каждой частоты пика запоминаем амплитуду.
5. Склеиваем этот топ пиков по всем фрагментам (считаем динамическое программирование с разбиением точек на K групп, где группой считается блок из точек с разницей не более 50 герц друг от друга), оптимизируется функция $$\displaystyle \sum_{i=1}^n \frac{amp_i}{1.05^{|x_i - center(x_i)|}}$$
6. Получаем вектор, который отвечает за то, какими частотами (усредненными) характеризуется голос
7. Чтобы сравнить два метода, используем расстояние Левенштейна, адаптированное под вещественные последовательности

In [2]:
fs = 44100
PARAMETERS = {
    "sample_length": 0.10,
    "record_length": 2,
    "peaks": 25,
    "output_dim": 20,
    "exp": 1.05,
    "peak_criterion": 3,
    "delta": 0.1,
    "penalty": 100,
    'iterations': 15,
    'pts_diff': 5,
    'group': 50
}

In [3]:
def read(name):
    n_data = sf.read('auth/' + name + '_voice.wav')[0] # if use this, use n_data[0]
    sd.play(n_data, fs)
    sd.wait()
    return list(n_data)

def record(name):
    global PARAMETERS
    print('record')
    n_data = sd.rec(PARAMETERS["record_length"] * fs, samplerate=fs, channels=1)
    sd.wait()
    print('play')
    sd.play(n_data, fs)
    sd.wait()
    sf.write('auth/' + name + '_voice.wav', n_data, samplerate=fs);
    return list(n_data)

In [4]:
def normalize_result(val):
    ln = len(val)
    ampl = [0] * ln
    arg = [0] * ln
    for i in range(0, len(val)):
        ampl[i] = abs(val[i]) / ln
        arg[i] = val[i].imag
    return ampl, arg

In [5]:
'''
slow version
 result = []
 for i in range(len(A)):
     result.append(complex(0))
     for j in range(len(A)):
         result[i] += A[j] * complex(math.cos(2 * math.pi * i * j / len(A)), -math.sin(2 * math.pi * i * j / len(A)))
 return result
'''
def fft(A):
    if len(A) == 1:
        return A
    assert len(A) % 2 == 0
    n =	len(A)
    a, b = [], []
    for i in range(0, n):
        if (i % 2 == 0):
            a.append(A[i].copy())
        else:
            b.append(A[i].copy())
    a = fft(a)
    b = fft(b)
    result = [complex(1) for _ in range(n)]
    pw = complex(1)
    w = complex(math.cos(2 * math.pi / n), math.sin(2 * math.pi / n))	
    for i in range(0, n // 2):
        tmp = pw * b[i]
        result[i] = a[i] + pw * b[i]
        result[i + n // 2] = a[i] - pw * b[i]
        pw *= w
    return result

In [6]:
def hamming(index, n):
    return 0.54 - 0.46 * math.cos((2 * math.pi * index) / (n - 1))

In [7]:
def find_peaks(arr, n, top_k=PARAMETERS["peaks"]):
    global PARAMETERS
    assert n > 200
    assert top_k <= n * 0.90
    result = []
    for i in range(int(n * 0.01), int(n * 0.99)):
        ok = True
        mn = arr[i]
        for j in range(int(-n * 0.01), int(n * 0.01)):
            idx = int(i + j)
            mn = min(arr[i + j], mn)
            if arr[idx] > arr[i]:
                ok = False
                break
        if ok and mn * PARAMETERS["peak_criterion"] < arr[i]:
            result.append((arr[i] - mn * PARAMETERS["peak_criterion"], i))
    return sorted(result)[::-1][:top_k]

In [8]:
def merge_similarities(arr, k = PARAMETERS["output_dim"]):
    n = len(arr)
    result = []
    arr.sort(key=lambda x: x[1])
    dp = [[-10**10 for j in range(k + 1)] for i in range(n + 1)]
    pre = [[(i - 1, i - 1, 0) for j in range(k + 1)] for i in range(n + 1)]
    dp[0][0] = 0
    for i in range(1, n + 1):
        for j in range(k, 0, -1):
            dp[i][j] = dp[i - 1][j]
            pre[i][j] = pre[i - 1][j]
            if dp[i][j - 1] > dp[i][j]:
                dp[i][j] = dp[i][j - 1]
                pre[i][j] = pre[i][j - 1]
            l = i - 1
            while l >= 0 and arr[i - 1][1] - arr[l][1] <= PARAMETERS['group']:
                l -= 1
                assert i >= 1
            calc = 0
            x_mean = 0
            sm = 0
            for f in range(l + 1, i):
                x_mean += arr[f][0] * arr[f][1]
                sm += arr[f][0]
            x_mean /= sm
            for f in range(l + 1, i):
                calc += arr[f][0] / PARAMETERS["exp"] ** (abs(arr[f][1] - x_mean))
            if calc + dp[l][j - 1] > dp[i][j]:
                dp[i][j] = calc + dp[l][j - 1]
                pre[i][j] = (l, x_mean, -1)
    fin = [n, k]
    res = []
    while fin[1] >= 0 and fin[0] >= 0:
        pr = pre[fin[0]][fin[1]]
        if pr[2] != 0:
            res.append(pr[1])
        fin[0] = pr[0]
        fin[1] += pr[2]
    return res

In [9]:
def get_vector(name, iterations=PARAMETERS['iterations'], visualize=False):
    data = np.array(read(name))
    if visualize:
        plt.plot(data)
        plt.show()
    ln = len(data)
    new_ln = 1
    while new_ln < fs * PARAMETERS["sample_length"]:
        new_ln *= 2
    peaks = []
    for start_idx in range(iterations):
        start = random.randint(0, len(data) - new_ln)
        np_data = np.array(data)[start:start + new_ln]
        while True:
            if np_data.max() - np_data.min() > PARAMETERS["delta"]:
                break
            start = random.randint(0, len(data) - new_ln)
            np_data = np.array(data)[start:start + new_ln]
           
        np_data = fft(np_data)
        for i in range(len(np_data)):
            np_data[i] *= hamming(i, len(np_data))
        ampl, arg = normalize_result(np_data)
        
        '''
        to scale array
        '''
        def get(arr, idx):
            k = PARAMETERS["sample_length"] * 2
            if int(idx * k) == idx * k:
                return arr[int(idx * k)]
            diff = idx * k - int(idx * k)
            return arr[int(idx * k)] * diff + arr[int(idx * k) + 1] * (1 - diff)
        
        res = [get(ampl, i) for i in range(5000)]
        if visualize:
            plt.plot(res, alpha=0.5)
        peaks.extend(find_peaks(res, len(res)))
    if visualize:
        plt.show()
    return merge_similarities(peaks)

In [10]:
get_vector('kostya', 10)

[4520.335901263482,
 4373.697804941869,
 4096.172546370104,
 3916.6672946040967,
 3835.5785543150087,
 3779.5585572375294,
 3690.646137131692,
 3617.911682648064,
 3513.2002816241716,
 3413.47574463024,
 3292.6246272402154,
 3219.043558258989,
 908.9260447257794,
 811.3144107240264,
 677.7744129932518,
 457.54091752997743,
 409.9870544050689,
 311.03738657213944,
 269.1867316808703,
 144.05885265994667]

In [11]:
get_vector('dima', 10)

[3589.798003097965,
 3534.0617930459007,
 3396.056630014393,
 3279.4937401431807,
 3157.164296957356,
 3088.7029515252143,
 3018.9222028575978,
 2372.9780952123006,
 1671.343445983293,
 1441.0104702220292,
 1383.9154157598632,
 1070.4170777637912,
 932.9435932032438,
 819.8860101344906,
 736.3130240414297,
 610.150672305322,
 433.0924855926702,
 354.76216092267106,
 239.06434156326824,
 142.64433368785694]

In [12]:
def score(a, b):
    assert(len(a) == len(b))
    n = len(a)
    dp = [[10 ** 10 for i in range(n + 1)] for j in range(n + 1)]
    dp[0][0] = 0
    penalty = PARAMETERS['penalty']
    for i in range(n + 1):
        for j in range(n + 1):
            if i == 0 and j == 0:
                continue
            if i == 0:
                dp[i][j] = dp[i][j - 1] + penalty
            elif j == 0:
                dp[i][j] = dp[i - 1][j] + penalty
            else:
                dp[i][j]= min(min(dp[i - 1][j], dp[i][j - 1]) + penalty, dp[i - 1][j - 1] + abs(a[i - 1] - b[j - 1]) * PARAMETERS['pts_diff'])
    return dp[n][n]

In [13]:
score(get_vector('kostya'), get_vector('dima'))

2864.3500625804672

In [14]:
score(get_vector('kostya'), get_vector('kostya'))

1753.2502825995616

In [15]:
score(get_vector('kostya', 10), get_vector('kostya', 10))

1517.562833857663

In [16]:
score(get_vector('kostya'), get_vector('roman'))

3118.614625210423

In [17]:
score(get_vector('dima'), get_vector('roman'))

3140.055201103294

In [18]:
score(get_vector('kostya'), get_vector('kostya_2021'))

3405.298336875361

In [19]:
# record('lucy_houses')

In [20]:
# record('lucy_paint')

In [21]:
# record('kostya_houses')

In [22]:
# record('kostya_paint')

In [23]:
names = ['lucy_paint', 'lucy_houses', 'kostya_paint', 'kostya_houses']
result = []
for a in names:
    result.append([])
    for b in names:
        result[-1].append(score(get_vector(a), get_vector(b)))

In [24]:
print(tabletext.to_text(result))

┌────────────────────┬────────────────────┬────────────────────┬────────────────────┐
│  704.4412722543377 │ 2984.8245567850267 │ 3321.9721620685796 │  3600.735397773483 │
├────────────────────┼────────────────────┼────────────────────┼────────────────────┤
│  2396.964096870589 │  946.3078708385973 │ 2922.4045313887673 │ 3205.8688588424234 │
├────────────────────┼────────────────────┼────────────────────┼────────────────────┤
│ 3159.9488103414387 │  3096.049918085073 │ 1469.1304774091034 │ 2739.3625096002384 │
├────────────────────┼────────────────────┼────────────────────┼────────────────────┤
│ 3324.4298966681895 │  3452.613225093144 │  3044.874468875461 │ 1777.7080196032257 │
└────────────────────┴────────────────────┴────────────────────┴────────────────────┘


# Итог

Я не сильно подкручивал параметры, но разница все-таки есть, непохожие друг на друга голоса имеют больший score при сравнении. Актуальны только записи kostya_houses, lucy_houses, kostya_paint, lucy_paint. Предыдущие были сделаны на другой микрофон (и качеством они хуже) и теперь непонятно, как их сравнивать