In [21]:
import numpy as np
import librosa 
from scipy.io import wavfile
from scipy import signal
import IPython.display as ipd
from copy import copy, deepcopy
import samplerate
import matplotlib.pyplot as plt

import soundfile as sf
import random as rd
import wave
import os
import math

def butter_lowpass(cutoff: float, fs: int, order: int=3) -> tuple:
    """Find lowpass filter coefficients.
    
    Params
    ------
    cutoff: cutoff frequency in Hz
    fs: sampling frequency in Hz
    order: filter coefficient size
    
    Returns
    -------
    (b, a): numerator and denominator polynomials of the IIR filter.
    
    """
    b, a = signal.butter(order, cutoff, btype='low', fs=fs, analog=False)
    
    return (b, a)


def butter_lowpass_filter(data: np.array, cutoff: float, fs: int, order: int=3) -> np.array:
    """Apply lowpass filter to audio data to remove aliasing effect. Used for preprocessing step.
    
    Params
    ------
    data: audio data of 0.5 second length
    cutoff: cutoff frequency in Hz
    fs: sampling frequency in Hz
    order: filter coefficient size
    
    Returns
    -------
    y: lowpass filtered audio data
    """
    b, a = butter_lowpass(cutoff, fs, order=order)
    y = signal.filtfilt(b, a, data)
    
    return y


def frameize(x: np.array, N: int, H_a: int, hfilt: np.array) -> list:
    """Truncate audio sample into frames.
    
    Params
    ------
    x: audio array
    N: segment size
    H_a: analysis hop size
    hfilt: windowing filter
    
    Returns
    -------
    frames: segments of audio sample
    """
    frames = []
    idx = 0 
    
    while True:
        try: frames += [hfilt*x[H_a*idx:H_a*idx+N]]
        except: break   
        idx += 1
    
    return frames


def find_hfilt_norm(hfilt: np.array, H_s: int, delta: int=0) -> np.array:
    """Compute normalization filter array for windowing effect.
    
    Params
    ------
    hfilt: filter window used for our purpose
    H_s: synthesis hop size
    delta: small shift for synchronization
    
    Returns
    -------
    hf_norm: normalization filter array 
    """
    hf_norm = copy(hfilt)
    N = len(hfilt)
    
    if (H_s+delta) < N and (H_s+delta) >= 0:
        # add right superposed
        hf_norm[(H_s+delta):] += hfilt[:N-(H_s+delta)]
        # add left superposed
        hf_norm[:N-(H_s+delta)] += hfilt[(H_s+delta):]
        
    return hf_norm

    
def scale_time(x: np.array, N: int, H_a: int,
                 hfilt: np.array, alpha: float) -> np.array:
    """Scale time of audio sample by given ratio.
    
    Params
    ------
    x: audio data
    N: segment size
    H_a: analysis hop size
    hfilt: windowing filter
    alpha: time-scaling factor
    
    Returns
    -------
    out_x: time-scaled data 
    """
    # put into frames
    frames = frameize(x, N, H_a, hfilt)
    
    
    H_s = int(np.round(H_a*alpha))
    out_x = np.zeros(len(frames)*H_s+N)

    # time-scaling
    for i, frame in enumerate(frames):
        hfilt_norm = find_hfilt_norm(hfilt, H_s)    
        out_x[i*H_s:i*H_s+N] += frame/hfilt_norm

    return out_x

def synthesize_pitch(x: np.array, sr: int, N: int, H_a: int,
                      hfilt: np.array, alpha: float) -> np.array:
    """Synthesize sound sample into new one with different pitch using PSOLA algorithm.
    
    Params
    ------
    x: audio data
    sr: sampling rate
    N: segment size
    H_a: analysis hop size
    hfilt: windowing filter
    alpha: pitch factor
    
    Returns
    -------
    syn_x: synthesized data
    """
    syn_x = scale_time(x, N, H_a, hfilt, alpha)
    
    # apply anti-aliasing
    if alpha >= 1: syn_x = butter_lowpass_filter(syn_x, sr/2*(1/alpha)*0.6, fs=sr, order=3)

    # resampling
    syn_x = samplerate.resample(syn_x, 1/alpha, 'sinc_best')
    syn_x = syn_x/np.max(abs(syn_x))
        
    return syn_x

In [22]:


# First load the file
file_name = "example.wav"
audio, sr = librosa.load(file_name)

# 분할
# Get number of samples for 2 seconds; replace 2 by any number
buffer = 1 * sr

samples_total = len(audio)
samples_wrote = 0
counter = 1

while samples_wrote < samples_total:

    #check if the buffer is not exceeding total samples 
    if buffer > (samples_total - samples_wrote):
        buffer = samples_total - samples_wrote

    block = audio[samples_wrote : (samples_wrote + buffer)]
    out_filename = "split_" + str(counter) + "_" + file_name

    # Write 2 second segment
    sf.write(out_filename, block, sr)
    counter += 1
    samples_wrote += buffer

print("counter: ")
print(counter)

# 분할된 파일들 변조
# make segments of 0.05-seconds (2205)
N = 1024 # segment size for sampling rate 44100 Hz
H_a = int(N*0.6) # analysis hop size between 0.5 ~ 1
hfilt = np.hanning(N) # filter type

file_list = []

alpha_max = 1.65
alpha_min = 0.35
alpha_before = 0

for i in range (1,counter):
    file_name_2 = "split_" + str(i) + "_" + file_name
    data, sr = librosa.load(file_name_2, sr=None)
    # 이전 값을 반영하는 alpha 값
    while True:
        if(i==1):
            alpha = rd.randrange(400,1610)/1000 ############# 0.4 ~ 1.61까지의 랜덤 값
            alpha_before = alpha
            break
            
        else:
            alpha = alpha + rd.randrange(-200,200)/1000
            if (alpha>alpha_max or alpha<alpha_min or (alpha<1.04 and alpha>0.96)):
                alpha=alpha_before
                continue
            alpha_before = alpha
            break
                
            # 평범한 랜덤 alpha 값    
            '''
            alpha = alpha * rd.randrange(400,1610)/1000 ############# 0.4 ~ 1.6까지의 랜덤 값
            alpha_before = alpha
            '''
            #alpha 값으로 함수
            # alpha = math.sin(math.pi * (i / 180))
            
    deid_data = synthesize_pitch(data, sr, N, H_a, hfilt, alpha)
    out_filename_deid = "split_" + str(i) + "_" + file_name
    sf.write(out_filename_deid, deid_data, sr)
    file_list.append(out_filename_deid)
    
print("완료1")
print(file_list)

# 분할된 파일들 합치기
outfile = "output.wav"
data_2 = []
for infile in file_list:
    w = wave.open(infile, 'rb')
    data_2.append( [w.getparams(), w.readframes(w.getnframes())] )
    w.close()
    
output = wave.open(outfile, 'wb')
output.setparams(data_2[0][0])
for i in range(len(data_2)):
    output.writeframes(data_2[i][1])
output.close()

print("완료2")

# 남겨져 있는 분할된 파일들 삭제
for i in range (1,counter):
    file_name_3 = "split_" + str(i) + "_" + file_name
    os.remove(file_name_3)
    
print("완료3")

counter: 
202
완료1
['split_1_1122.wav', 'split_2_1122.wav', 'split_3_1122.wav', 'split_4_1122.wav', 'split_5_1122.wav', 'split_6_1122.wav', 'split_7_1122.wav', 'split_8_1122.wav', 'split_9_1122.wav', 'split_10_1122.wav', 'split_11_1122.wav', 'split_12_1122.wav', 'split_13_1122.wav', 'split_14_1122.wav', 'split_15_1122.wav', 'split_16_1122.wav', 'split_17_1122.wav', 'split_18_1122.wav', 'split_19_1122.wav', 'split_20_1122.wav', 'split_21_1122.wav', 'split_22_1122.wav', 'split_23_1122.wav', 'split_24_1122.wav', 'split_25_1122.wav', 'split_26_1122.wav', 'split_27_1122.wav', 'split_28_1122.wav', 'split_29_1122.wav', 'split_30_1122.wav', 'split_31_1122.wav', 'split_32_1122.wav', 'split_33_1122.wav', 'split_34_1122.wav', 'split_35_1122.wav', 'split_36_1122.wav', 'split_37_1122.wav', 'split_38_1122.wav', 'split_39_1122.wav', 'split_40_1122.wav', 'split_41_1122.wav', 'split_42_1122.wav', 'split_43_1122.wav', 'split_44_1122.wav', 'split_45_1122.wav', 'split_46_1122.wav', 'split_47_1122.wav', 'sp