In [1]:
from audio_utils import fade
import os
import shutil
import librosa
import soundfile as sf
import json
import tqdm
import numpy as np
import lightning_module
import mos
import torch  
import torchaudio
from copy import deepcopy
import random

In [2]:
# Линейный фэйд с оптимальной длиной минимального ближайшего слова
def method_1(audio: np.ndarray, center_fade: float, fade_len: float, *samples: int):
    for ind, sample in enumerate(samples):
        if 0 < ind < len(samples) - 1:
            fade_duration = int(
                min(len(audio[samples[ind - 1]:sample]), len(audio[sample:samples[ind + 1]])) / fade_len)
        elif ind == 0 and len(samples) > 1:
            fade_duration = int(min(len(audio[:sample]), len(audio[sample:samples[ind + 1]])) / fade_len)
        elif ind == len(samples) - 1 and len(samples) > 1:
            fade_duration = int(min(len(audio[samples[ind - 1]:sample]), len(audio[sample:])) / fade_len)
        duration = int(fade_duration // 2)
        fade(audio[:sample], audio[sample:], duration, duration, 1.0, center_fade, 1.0)
    return audio


# Линейный фэйд с оптимальной длиной в секундах
def method_2(audio: np.ndarray, sr: int, center_fade: float, fade_duration: float, *samples: int):
    duration = int((fade_duration * sr) // 2)
    for ind, sample in enumerate(samples[1:-1]):
        fade(audio[:sample], audio[sample:], duration, duration, 1.0, center_fade, 1.0)
    return audio


# Экспоненциальный фэйд с оптимальной длиной минимального ближайшего слова и силой фейда
def method_3(audio: np.ndarray, center_fade: float, fade_len: float, fade_power: float, *samples: int):
    for ind, sample in enumerate(samples):
        if 0 < ind < len(samples) - 1:
            fade_duration = int(
                min(len(audio[samples[ind - 1]:sample]), len(audio[sample:samples[ind + 1]])) / fade_len)
        elif ind == 0 and len(samples) > 1:
            fade_duration = int(min(len(audio[:sample]), len(audio[sample:samples[ind + 1]])) / fade_len)
        elif ind == len(samples) - 1 and len(samples) > 1:
            fade_duration = int(min(len(audio[samples[ind - 1]:sample]), len(audio[sample:])) / fade_len)
        duration = int(fade_duration // 2)
        fade(audio[:sample], audio[sample:], duration, duration, 1.0, center_fade, 1.0, exp=fade_power)
    return audio


# Экспоненциальный фэйд с оптимальной длиной в секундах и силой фейда
def method_4(audio: np.ndarray, sr: int, center_fade: float, fade_duration: float, fade_power: float, *samples: int):
    duration = int((fade_duration * sr) // 2)
    for ind, sample in enumerate(samples[1:-1]):
        fade(audio[:sample], audio[sample:], duration, duration, 1.0, center_fade, 1.0, exp=fade_power)
    return audio

In [3]:
# Чтение данных аудио
with open('data/log.json', 'r') as logs:
    raw_data = logs.read()
data = json.loads(raw_data)

In [4]:
model = lightning_module.BaselineLightningModule.load_from_checkpoint("epoch=3-step=7459.ckpt").eval()



In [5]:
def clear_folder(folder):
    for filename in os.listdir(folder):
        file_path = os.path.join(folder, filename)
        try:
            if os.path.isfile(file_path) or os.path.islink(file_path):
                os.unlink(file_path)
            elif os.path.isdir(file_path):
                shutil.rmtree(file_path)
        except Exception as e:
            continue

In [6]:
clear_folder('data/method_1')
clear_folder('data/method_2')
clear_folder('data/method_3')
clear_folder('data/method_4')

In [6]:
# Сглаживание методом
def smooth(audio: torch.Tensor, sr: int, audio_data: dict, method, params: tuple):
    audio = deepcopy(audio)
    audio = audio.squeeze(0)
    samples = audio_data['splices']
    if method == method_1:
        center_fade, fade_len = params
        smoothed = method_1(np.array(audio), center_fade, fade_len, *samples)
    elif method == method_2:
        center_fade, fade_duration = params
        smoothed = method_2(np.array(audio), sr, center_fade, fade_duration, *samples)
    elif method == method_3:
        center_fade, fade_len, fade_power = params
        smoothed = method_3(np.array(audio), center_fade, fade_len, fade_power, *samples)
    else:
        center_fade, fade_duration, fade_power = params
        smoothed = method_4(np.array(audio), sr, center_fade, fade_duration, fade_power, *samples)
    return torch.FloatTensor(smoothed).unsqueeze(0)

In [7]:
# Предзагрузка всех аудио
def load_audio(data: dict) -> None:
    for ind, audio_data in enumerate(data):
        data[ind]['data'] = torchaudio.load(audio_data['myPath'])


load_audio(data)

In [8]:
def evaluate_1(x1, x2):
    global data
    all_smoothed = []
    for number, audio_data in enumerate(data):
        audio, sr = audio_data['data']
        smoothed = smooth(audio, sr, audio_data, method_1, (x1, x2))
        all_smoothed.append((smoothed, sr))
    cur_mos = mos.cals_mos_collection(all_smoothed, model)
    return cur_mos

In [9]:
def evaluate_2(x1, x2):
    global data
    all_smoothed = []
    for number, audio_data in enumerate(data):
        audio, sr = audio_data['data']
        smoothed = smooth(audio, sr, audio_data, method_2, (x1, x2))
        all_smoothed.append((smoothed, sr))
    cur_mos = mos.cals_mos_collection(all_smoothed, model)
    return cur_mos

In [10]:
def evaluate_3(x1, x2, x3):
    global data
    all_smoothed = []
    for number, audio_data in enumerate(data):
        audio, sr = audio_data['data']
        smoothed = smooth(audio, sr, audio_data, method_3, (x1, x2, x3))
        all_smoothed.append((smoothed, sr))
    cur_mos = mos.cals_mos_collection(all_smoothed, model)
    return cur_mos

In [11]:
def evaluate_4(x1, x2, x3):
    global data
    all_smoothed = []
    for number, audio_data in enumerate(data):
        audio, sr = audio_data['data']
        smoothed = smooth(audio, sr, audio_data, method_4, (x1, x2, x3))
        all_smoothed.append((smoothed, sr))
    cur_mos = mos.cals_mos_collection(all_smoothed, model)
    return cur_mos

In [12]:
# генетический алгоритм
def crossover(x1, x2):
    n = len(x1)
    child1 = []
    child2 = []
    for i in range(n):
        if random.random() < 0.5:
            child1.append(x1[i])
            child2.append(x2[i])
        else:
            child1.append(x2[i])
            child2.append(x1[i])
    return [child1, child2]


def replacing_mutation(bounds):
    chromosome = [random.uniform(*bound) for bound in bounds]
    return chromosome


def invert_mutation(chromosome, bounds):
    n = len(chromosome)
    i = random.randint(0, n - 1)
    chromosome[i] = bounds[i][1] - chromosome[i]
    chromosome[i] = bounds[i][0] + (bounds[i][1] - chromosome[i])
    return chromosome


def mutation(chromosome, bounds):
    if random.random() < 0.5:
        return replacing_mutation(bounds)
    return invert_mutation(chromosome, bounds)


def genetic_algorithm(bounds, n_generations, n_individuals, crossover_rate, mutation_rate, evaluate, elitism=True):
    global mx
    # Генетический алгоритм
    population = [[random.uniform(*bound) for bound in bounds]
                  for _ in range(n_individuals)]

    values = []
    for generation in tqdm.tqdm(range(n_generations)):
        values = [evaluate(*x) for x in population]
        # for ind, x in enumerate(population):
        #     try:
        #         values.append(evaluate(*x))
        #     except Exception as e:
        #         print(e)
        #         population[ind] = replacing_mutation(bounds)
        #         values.append(0)

        if elitism:
            best_idx = np.argmax(values)
            best_params = population[best_idx]
            mx = max((best_params, values[best_idx]), mx, key=lambda x: x[1])
            

        for i in range(0, len(population), 2):
            if random.random() < crossover_rate:
                parent1, parent2 = population[i], population[i + 1]
                child1, child2 = crossover(parent1, parent2)
                population[i], population[i + 1] = child1, child2

        for i in range(len(population)):
            if random.random() < mutation_rate:
                population[i] = mutation(population[i], bounds)

        if elitism:
            if evaluate(*population[best_idx]) < values[best_idx]:
                population[best_idx] = best_params
            mx = max((best_params, values[best_idx]), mx, key=lambda x: x[1])

    best_index = np.argmax(values)
    best_solution = population[best_index]
    mx = max((best_params, values[best_idx]), mx, key=lambda x: x[1])

    return best_solution, values[best_index]

In [13]:
bounds = [(0, 0.3), (1, 4)]  # границы параметров
n_generations = 5  # количество поколений
n_individuals = 30  # количество особей
crossover_rate = 0.8
mutation_rate = 0.1
elitism = True
evaluate = evaluate_1

mx = ((0, 0), 0)

res = genetic_algorithm(bounds, n_generations, n_individuals, crossover_rate, mutation_rate, evaluate, elitism)
print(res)
print(mx)

100%|██████████| 5/5 [1:18:53<00:00, 946.63s/it]


([0.05448562424331913, 3.5415098357336645], 3.98019945195743)
([0.05448562424331913, 3.5415098357336645], 3.98019945195743)


In [17]:
bounds = [(0, 0.3), (0.01, 0.3)]  # границы параметров
n_generations = 5  # количество поколений
n_individuals = 30  # количество особей
crossover_rate = 0.8
mutation_rate = 0.1
elitism = True
evaluate = evaluate_2

mx = ((0, 0), 0)

res = genetic_algorithm(bounds, n_generations, n_individuals, crossover_rate, mutation_rate, evaluate, elitism)
print(res)
print(mx)

100%|██████████| 5/5 [1:17:52<00:00, 934.53s/it]

([0.03302215527954826, 0.1203492166983655], 3.953359306710107)
([0.03302215527954826, 0.1203492166983655], 3.953359306710107)





In [13]:
bounds = [(0, 0.3), (1, 4), (0.8, 2)]  # границы параметров
n_generations = 5  # количество поколений
n_individuals = 30  # количество особей
crossover_rate = 0.8
mutation_rate = 0.1
elitism = True
evaluate = evaluate_3

mx = ((0, 0), 0)

res = genetic_algorithm(bounds, n_generations, n_individuals, crossover_rate, mutation_rate, evaluate, elitism)
print(res)
print(mx)

100%|██████████| 5/5 [52:31<00:00, 630.40s/it]

([0.010267935706531438, 3.9299669247736997, 1.0884270958335511], 3.9875548252037594)
([0.010267935706531438, 3.9299669247736997, 1.0884270958335511], 3.9875548252037594)





In [14]:
bounds = [(0, 0.3), (0.01, 0.3), (0.8, 2)]  # границы параметров
n_generations = 5  # количество поколений
n_individuals = 30  # количество особей
crossover_rate = 0.8
mutation_rate = 0.1
elitism = True
evaluate = evaluate_4

mx = ((0, 0), 0)

res = genetic_algorithm(bounds, n_generations, n_individuals, crossover_rate, mutation_rate, evaluate, elitism)
print(res)
print(mx)

100%|██████████| 5/5 [51:34<00:00, 618.81s/it]

([0.011676794622551811, 0.07207032424047766, 1.6602603391084787], 3.9556299929107936)
([0.011676794622551811, 0.07207032424047766, 1.6602603391084787], 3.9556299929107936)





**Ищем лучшие параметры для method_1**

In [None]:
# fade_duration
best_fade_len_1 = (0, 0)
for elem in tqdm.tqdm(range(150, 400 + 1, 10)):
    fade_len = elem / 100
    all_smoothed = []
    for number, audio_data in enumerate(data):
        audio, sr = audio_data['data']
        samples = audio_data['splices']

        smoothed = smooth(audio, sr, audio_data, method_1, (0.0, fade_len))
        all_smoothed.append((smoothed, sr))
    cur_mos = (mos.cals_mos_collection(all_smoothed, model), fade_len)
    best_fade_len_1 = max(best_fade_len_1, cur_mos, key=lambda x: x[0])
print(f'BEST FADE DURATION: {best_fade_len_1}')

In [None]:
# center_fade
best_center_fade_1 = (0, 0)
for elem in tqdm.tqdm(range(0, 300 + 1, 10)):
    center_fade = elem / 1000
    all_smoothed = []
    for number, audio_data in enumerate(data):
        audio, sr = audio_data['data']
        samples = audio_data['splices']

        smoothed = smooth(audio, sr, audio_data, method_1, (center_fade, best_fade_len_1[1]))
        all_smoothed.append((smoothed, sr))
    cur_mos = (mos.cals_mos_collection(all_smoothed, model), center_fade)
    best_center_fade_1 = max(best_center_fade_1, cur_mos, key=lambda x: x[0])
print(f'BEST FADE DURATION: {best_center_fade_1}')

**Ищем лучшие параметры для method_2**

In [None]:
# fade_duration
best_fade_duration_2 = (0, 0)
for elem in tqdm.tqdm(range(25, 500 + 1, 25)):
    dur = elem / 1000
    all_smoothed = []
    for number, audio_data in enumerate(data):
        audio, sr = audio_data['data']
        samples = audio_data['splices']

        smoothed = smooth(audio, sr, audio_data, method_2, (0.0, dur))
        all_smoothed.append((smoothed, sr))
    cur_mos = (mos.cals_mos_collection(all_smoothed, model), dur)
    best_fade_duration_2 = max(best_fade_duration_2, cur_mos, key=lambda x: x[0])
print(f'BEST FADE DURATION: {best_fade_duration_2}')

In [None]:
# center fade
best_center_fade_2 = (0, 0)
for elem in tqdm.tqdm(range(0, 300 + 1, 10)):
    center_fade = elem / 1000
    all_smoothed = []
    for number, audio_data in enumerate(data):
        audio, sr = audio_data['data']
        samples = audio_data['splices']

        smoothed = smooth(audio, sr, audio_data, method_2, (center_fade, best_fade_duration_2[1]))
        all_smoothed.append((smoothed, sr))
    cur_mos = (mos.cals_mos_collection(all_smoothed, model), center_fade)
    best_center_fade_2 = max(best_center_fade_2, cur_mos, key=lambda x: x[0])
print(f'BEST CENTER FADE: {best_center_fade_2}')

**Ищем лучшие параметры для method_3**

In [None]:
# fade power and duration
best_fade_power_and_duration_3 = (0, 0, 0)
for len_elem in tqdm.tqdm(range(150, 400 + 1, 10)):
    fade_len = len_elem / 100
    for power_elem in range(800, 2000 + 1, 50):
        power = power_elem / 1000
        all_smoothed = []
        for number, audio_data in enumerate(data):
            audio, sr = audio_data['data']
            samples = audio_data['splices']

            smoothed = smooth(audio, sr, audio_data, method_3, (0.0, fade_len, power))
            all_smoothed.append((smoothed, sr))
        cur_mos = (mos.cals_mos_collection(all_smoothed, model), fade_len, power)
        best_fade_power_and_duration_3 = max(best_fade_power_and_duration_3, cur_mos, key=lambda x: x[0])
print(f'MAX MOS DURATION: {best_fade_power_and_duration_3}')

In [None]:
# center fade
best_center_fade_3 = (0, 0)
for elem in tqdm.tqdm(range(0, 300 + 1, 10)):
    center_fade = elem / 1000
    all_smoothed = []
    for number, audio_data in enumerate(data):
        audio, sr = audio_data['data']
        samples = audio_data['splices']

        smoothed = smooth(audio, sr, audio_data, method_3,
                          (center_fade, best_fade_power_and_duration_3[1], best_fade_power_and_duration_3[2]))
        all_smoothed.append((smoothed, sr))
    cur_mos = (mos.cals_mos_collection(all_smoothed, model), center_fade)
    best_center_fade_3 = max(best_center_fade_3, cur_mos, key=lambda x: x[0])
print(f'BEST CENTER FADE: {best_center_fade_3}')

**Ищем лучшие параметры для method_4**

In [None]:
# fade power and duration
best_fade_power_and_duration_4 = (0, 0, 0)
for dur_elem in tqdm.tqdm(range(25, 500 + 1, 25)):
    fade_dur = dur_elem / 1000
    for power_elem in range(800, 2000 + 1, 50):
        power = power_elem / 1000
        all_smoothed = []
        for number, audio_data in enumerate(data):
            audio, sr = audio_data['data']
            samples = audio_data['splices']

            smoothed = smooth(audio, sr, audio_data, method_4, (0.0, fade_dur, power))
            all_smoothed.append((smoothed, sr))
        cur_mos = (mos.cals_mos_collection(all_smoothed, model), fade_dur, power)
        best_fade_power_and_duration_4 = max(best_fade_power_and_duration_4, cur_mos, key=lambda x: x[0])
print(f'MAX MOS DURATION: {best_fade_power_and_duration_4}')

In [None]:
# center fade
best_center_fade_4 = (0, 0)
for elem in tqdm.tqdm(range(0, 300 + 1, 10)):
    center_fade = elem / 1000
    all_smoothed = []
    for number, audio_data in enumerate(data):
        audio, sr = audio_data['data']
        samples = audio_data['splices']

        smoothed = smooth(audio, sr, audio_data, method_4,
                          (center_fade, best_fade_power_and_duration_4[1], best_fade_power_and_duration_4[2]))
        all_smoothed.append((smoothed, sr))
    cur_mos = (mos.cals_mos_collection(all_smoothed, model), center_fade)
    best_center_fade_4 = max(best_center_fade_4, cur_mos, key=lambda x: x[0])
print(f'BEST CENTER FADE: {best_center_fade_4}')

In [None]:
# Ульта
# max_mos_stats4 = (0, 0, 0, 0)
# for dur in tqdm.tqdm(range(5, 315 + 1, 10)):
#     dur /= 1000
#     for cf in range(10, 35 + 1, 5):
#         cf /= 100
#         for power in range(500, 2000 + 1, 100):
#             power /= 1000
#             moses = []
#             for number, audio_data in enumerate(data):
#                 audio, sr = torchaudio.load(audio_data['myPath'])
#                 audio = audio.squeeze(0)
#                 samples = audio_data['splices']
#         
#                 smoothed_4 = method_4(np.array(audio), sr, cf, dur, power, *samples)
#                 smoothed_4 = torch.FloatTensor(smoothed_4).unsqueeze(0)
#                 moses.append(mos.calc_mos_raw(smoothed_4, sr, model))
#             mos = (np.mean(moses), dur, cf, power)
#             max_mos_stats4 = max(max_mos_stats4, mos, key=lambda x: x[0])
# print(f'MAX MOS STATS ON METHOD_4: {max_mos_stats4}')

In [19]:
# Первый метод
for number, audio_data in enumerate(tqdm.tqdm(data)):
    audio, sr = audio_data['data']
    samples = audio_data['splices']

    # smoothed_1 = smooth(audio, sr, audio_data, method_1, (best_fade_len_1[1], best_center_fade_1[1]))
    # smoothed_2 = smooth(audio, sr, audio_data, method_2, (best_fade_duration_2[1], best_center_fade_2[1]))
    # smoothed_3 = smooth(audio, sr, audio_data, method_3, (*best_fade_power_and_duration_3[1:], best_center_fade_3[1]))
    # smoothed_4 = smooth(audio, sr, audio_data, method_4, (*best_fade_power_and_duration_4[1:], best_center_fade_4[1]))

    # smoothed_1 = smooth(audio, sr, audio_data, method_1, (0.1371507441632939, 1.734866809913312)).squeeze(0)
    smoothed_2 = smooth(audio, sr, audio_data, method_2, (0.03302215527954826, 0.1203492166983655)).squeeze(0)
    # smoothed_3 = smooth(audio, sr, audio_data, method_3, (0.24, 2.3, 0.8)).squeeze(0)
    # smoothed_4 = smooth(audio, sr, audio_data, method_4, (0.11, 0.275, 0.8)).squeeze(0)

    res = []
    # res.append((smoothed_1, 1))
    res.append((smoothed_2, 2))
    # res.append((smoothed_3, 3))
    # res.append((smoothed_4, 4))
    for method, num in res:
        path = f'data/method_{num}/smoothed{number}.wav'
        sf.write(path, np.array(method), int(sr))

100%|██████████| 70/70 [00:02<00:00, 24.22it/s]


In [27]:
print(mos.calc_mos_dir('data/method_1', model))
print(mos.calc_mos_dir('data/method_2', model))
print(mos.calc_mos_dir('data/method_3', model))
print(mos.calc_mos_dir('data/method_4', model))

4.075688129663467
nan
nan
nan


In [None]:
print(mos.calc_mos_dir('data/long_trios', model))

In [None]:
print(mos.calc_mos_dir('data/buckets', model))