In [18]:
import os
import glob
import numpy as np
import torch
import torchaudio
from pydub import AudioSegment
from tqdm import tqdm
import csv

In [19]:
def normalize_audio(audio, target_dBFS=-20.0):
    change_in_dBFS = target_dBFS - audio.dBFS
    return audio.apply_gain(change_in_dBFS)

In [4]:
wav_paths = glob.glob('./test_samples/*/*.wav')

In [9]:
dBFS_list = list()

In [20]:
for wav_path in tqdm(wav_paths):
    new_path = wav_path.replace('test_samples', 'test_samples_ampnorm')
    os.makedirs(os.path.dirname(new_path), exist_ok=True)
    
    audio = AudioSegment.from_wav(wav_path)
    dBFS_list.append(audio.dBFS)
    normalized_audio = normalize_audio(audio, target_dBFS=-25.0)
    normalized_audio.export(new_path, format='wav')

100%|██████████| 750/750 [00:01<00:00, 554.82it/s]


In [2]:
wav_paths = glob.glob('./test_samples_ampnorm/*/*.wav')
filters = [
    {'pSNR': 5, 'pGD': [6.25, 8.75, 11.25, 13.75, 16.25]},
    {'pSNR': 15, 'pGD': [3.75, 6.25, 8.75, 11.25]},
    {'pSNR': 25, 'pGD': [1.25, 3.75, 6.25]},
    {'pSNR': 35, 'pGD': [1.25]},
    {'pSNR': 45, 'pGD': [1.25]}
]

In [4]:
for f in filters:
    pSNR_target = f['pSNR']
    pGD_targets = f['pGD']
    for pGD_target in pGD_targets:
        print(pSNR_target, pGD_target)

5 6.25
5 8.75
5 11.25
5 13.75
5 16.25
15 3.75
15 6.25
15 8.75
15 11.25
25 1.25
25 3.75
25 6.25
35 1.25
45 1.25


In [7]:
for wav_path in tqdm(wav_paths):
    wav_name = os.path.basename(wav_path)
    
    if wav_name[0] != 'p':
        snr = wav_name[:-4].split('-')[0].split('_')[1]
        gd = wav_name[:-4].split('-')[1].split('_')[1]
        
        for f in filters:
            pSNR_target = f['pSNR']
            pGD_targets = f['pGD']
            
            for pGD_target in pGD_targets:
                
                if abs(float(snr)-pSNR_target)<=1.0 and abs(float(gd)-pGD_target)<=1.0:
                    new_dir = f"./test_samples_by_system/SNR{pSNR_target}-GD{str(pGD_target).replace('.', 'd')}"
                    os.makedirs(new_dir, exist_ok=True)
                    new_wav_name = f"{wav_path.split('/')[-2]}-SNR{str(snr).replace('.', 'd')}-GD{str(gd).replace('.', 'd')}.wav"
                    !cp {wav_path} {os.path.join(new_dir, new_wav_name)}

  0%|          | 0/750 [00:00<?, ?it/s]

100%|██████████| 750/750 [02:42<00:00,  4.61it/s]


In [4]:
original_wav_paths = glob.glob('./test_samples_by_system/original/*.wav')
wav_paths = glob.glob('./test_samples_by_system/*/*.wav')
print(original_wav_paths)
print(len(wav_paths))

['./test_samples_by_system/original/p226_263.wav', './test_samples_by_system/original/p227_298.wav', './test_samples_by_system/original/p228_220.wav', './test_samples_by_system/original/p229_112.wav', './test_samples_by_system/original/p230_122.wav', './test_samples_by_system/original/p231_029.wav', './test_samples_by_system/original/p232_260.wav', './test_samples_by_system/original/p236_040.wav', './test_samples_by_system/original/p239_312.wav', './test_samples_by_system/original/p240_059.wav', './test_samples_by_system/original/p243_066.wav', './test_samples_by_system/original/p244_156.wav', './test_samples_by_system/original/p254_213.wav', './test_samples_by_system/original/p256_233.wav', './test_samples_by_system/original/p257_013.wav', './test_samples_by_system/original/p258_125.wav', './test_samples_by_system/original/p259_135.wav', './test_samples_by_system/original/p267_059.wav', './test_samples_by_system/original/p268_210.wav', './test_samples_by_system/original/p269_283.wav',

In [17]:
for wav_path in tqdm(wav_paths):
    dir_name = os.path.dirname(wav_path)
    base_name = os.path.basename(wav_path)
    utt_name = base_name[:8]

    y, sr = torchaudio.load(wav_path)
    x, sr = torchaudio.load(f'./test_samples_by_system/original/{utt_name}.wav')
    z = torch.concat([x, torch.zeros(1, sr), y], axis=1)
    
    new_dir_name = dir_name.replace('test_samples_by_system', 'test_samples_dmos')
    os.makedirs(new_dir_name, exist_ok=True)
    torchaudio.save(os.path.join(new_dir_name, base_name), z, sr)

100%|██████████| 750/750 [00:31<00:00, 23.59it/s]


In [20]:
directory = 'test_samples_dmos'
for subdir in os.listdir(directory):
    data = []
    for wav_path in os.listdir(os.path.join(directory, subdir)):
        file_path = f'/nas02/internal/d_yoon/nii-listening-test-scripts/my_listening_test_dmos/test_samples_dmos/{subdir}/{wav_path}'
        data.append({"system_name": subdir, "sample_name": wav_path[:-4], "sample_file_path": file_path})
    
    with open(f'{subdir}.csv', mode='w', newline='') as f:
        fieldnames = ["system_name", "sample_name", "sample_file_path"]
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        
        writer.writeheader()
        writer.writerows(data)

In [35]:
# for manifest csv
directory = 'test_samples_dmos'
data = []
for _ in range(4):
    for subdir in os.listdir(directory):
        for wav_path in os.listdir(os.path.join(directory, subdir)):
            file_path = f'/nas02/internal/d_yoon/nii-listening-test-scripts/my_listening_test_dmos/test_samples_dmos/{subdir}/{wav_path}'
            data.append({"system_name": f"{subdir}-dmos", "sample_name": wav_path[:-4]})

In [36]:
import random
random.shuffle(data)
print(len(data))

3000


In [37]:
with open(f'sample_list.csv', mode='w', newline='') as f:
    fieldnames = ["system_name", "sample_name"]
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    
    writer.writerows(data)