In [23]:
import numpy as np
import librosa, os, scipy
import soundfile as sf
from IPython.display import Audio
import pandas as pd
import matplotlib.pyplot as plt
np.seterr(divide = 'ignore') 
years = np.arange(1950,2023)
path = './analyze/fft_loudest/avg'
all_year_fft_M = np.loadtxt(path + '/all_year_fft_M.csv', delimiter=",")
all_year_fft_S = np.loadtxt(path + '/all_year_fft_S.csv', delimiter=",")

ZCR_year_avg = pd.read_csv('./analyze/zcr/loudest/zcr_year_avg.csv')
dyn_year = np.loadtxt('./analyze/dynamic_year.txt')
year_avg_rms = np.loadtxt('./analyze/RMS_sm.txt',delimiter=",", dtype=float);
n_fft = 2048

match_eq_rms = 10 ** (-10/20)

In [62]:
def get_song_path(year,rank, data_dir = './data_wav_norm'):
    if year > 2022 or year < 1950: 
        return None
    if rank < 1 or rank > 30:
        return None
    df = pd.read_excel('songs_charts.xlsx', sheet_name=str(year))
    f = df[df['rank'] == rank]['filename'].item()
    return (data_dir + '/' + str(year) + '/' + f + '.wav')


def gen_noise(length,level = None,noise_type = 'white', ch = 0):
    length = int(length)
    if noise_type == 'white':
        y = np.random.standard_normal(size=length)
        y = y / np.max(np.abs(y))
    else:
         y = tape_noise(length,ch)
    if level != None:
        rms_y = np.sqrt(np.mean(y**2))
        if level > 0:
            y = y * level / rms_y
    return y


def tape_noise(length, ch = 0):
    n, _ = sf.read('noise.wav')
    n = n[:,ch]
    rms_n =  np.sqrt(np.mean(n**2))
    while len(n) < length:
        add_sample = min(length - len(n), len(n))
        n= np.hstack([n,n[:add_sample]])
    return n


def find_ref_fft(year, path = './analyze/fft_loudest/avg', get_dB = False, Side = False):
    d = path + '/' + str(year) + '.csv'
    Z = np.loadtxt(d,delimiter=",", dtype=float);
    if Side:
        return Z[:,1]
    if get_dB:
        Z = Z[:,2]
    else:
        Z = Z[:,0]
    return Z

def get_loudest(y, Fs, dur):
    if len(y) <= int(dur*Fs):
        l_t = 0
        rms = np.sqrt(np.mean( y**2))
        return l_t,rms
    song_dur = int( len(y)/Fs )
    if song_dur < 10:
        return 0
    max_rms = 0
    l_t = 0
    num_sample = int(dur * Fs)
    for t in range(song_dur - 10):
        start = int(t * Fs)
        stop = int((t+dur) * Fs)
        rms = np.sqrt(np.mean( y[start:stop]**2))
        if rms > max_rms:
            l_t = t
            max_rms = rms
    return l_t,rms


def get_segment(freqs,segments):
    ids = np.zeros(len(segments))
    for i in range(len(segments)):
        ids[i] = sum(freqs < segments[i])
    return ids.astype(int)

def smooth_band(y,step = 5):
    y = np.array(y)
    if len(y) < 2*step or step < 1: 
        return y
    step = int(step)
    length = len(y)
    y_smooth = np.copy(y)
    for i in range(0, step):
        y_smooth[i] = np.mean(y[:i+step])
    for i in range(step,length - step):
        y_smooth[i] = np.mean(y[i-step:i+step])
    for i in range(length - step,length):
        y_smooth[i] = np.mean(y[i:])
    return y_smooth

def smooth(y, seg = [0,100,500],steps = [3,10,20]):
    y = np.array(y)
    y_smooth = np.copy(y)
    l = len(y)
    seg = np.append(seg, len(y))
    for i in range(len(seg)-1):
        y_smooth[seg[i]:seg[i+1]] = smooth_band(y[seg[i]:seg[i+1]], steps[i])
    for i in range(len(seg)):
        if seg[i] > 0 and seg[i] < l:
            y_smooth[seg[i]-steps[i]:seg[i]+steps[i] ] = smooth_band(y_smooth[seg[i]-steps[i]:seg[i]+steps[i]],step = steps[i])
    return y_smooth

def moving_avg(y, box_pts):
    box_pts = int(box_pts)
    if box_pts < 1:
        return y
    box = np.ones(box_pts)/box_pts
    y_smooth = np.convolve(y, box, mode='same')
    return y_smooth

def zcr_mono(y):
    num_total = len(y)
    zero_cross = 0
    for i in range(num_total):
        if y[i] == 0:
            zero_cross += 1
        elif y[i] < 0:
            if y[i-1]> 0:
                zero_cross += 1
        else:
            if y[i-1]< 0:
                zero_cross += 1
    return zero_cross/num_total

def zcr(y):
    if len(np.shape(y)) <= 1:
        return zcr_mono(y)
    l = y.shape[1]
    ZCR_sum = 0
    for i in range(l):
        ZCR_sum += zcr_mono(y[:,i])
    return ZCR_sum/l
    
def match_zcr(y, zcr_ref, in_place = False, MS = False, lt = [], noise_type = 0):
    '''
    Add noise to audio to match zcr(has to be mono)
    '''
    zcr_y = zcr(y)
    if zcr_ref <= zcr_y:
        return y
    print('ref zcr:',zcr_ref, ', input zcr: ',zcr_y,', noise added')
    coef = (zcr_ref - zcr_y) * 3.03030303
    if len(lt) >=2:
        rms_y = np.sqrt(np.mean( y[lt[0]:lt[1]]**2))
    else:
        rms_y = np.sqrt(np.mean( y**2))
    if len(np.shape(y)) == 1:
        
        noise = gen_noise(len(y), noise_type = 'tape')
        if in_place:
            y += coef * rms_y * noise
            return y
        else:
            return y + coef * rms_y * noise
    l = np.shape(y)[1]
        
    if in_place:
        for i in range(l):
            y[:,i] += coef * rms_y * gen_noise(len(y), noise_type = 'tape')
        return y
    else:
        out = np.copy(y)
        for i in range(l):
            num_sample = y.shape[0]
            out[:,i] += coef * rms_y * gen_noise(num_sample,noise_type = 'tape')
    return out

def rms_sample(x, tav = 0.001):
    '''
    find rms by sample
    '''
    if len( np.shape(x) )>1:
        l = x.shape[1]
        x_rms = np.zeros([len(x),l])
    else:
        x_rms = np.zeros(len(x))
    x_rms[0] = tav * x[0]**2
    for i in range(1, len(x)):
        x_rms[i] = (1-tav) * x_rms[i-1] + tav * x[i]**2;
    return x_rms

def rms_sample(x,  get_db = False, tav = 0.0001):
    if len(np.shape(x))>1:
        l = x.shape[1]
        x_rms = np.zeros([len(x),l])
        y = np.zeros([len(x),l]) 
    else:
        x_rms = np.zeros(len(x))
        y = np.zeros(len(x))
    x_rms[0] = tav * x[0]**2
    for i in range(1, len(x)):
        x_rms[i] = (1-tav) * x_rms[i-1] + tav * x[i]**2;
    if get_db:
        x_rms[x_rms == 0] = 0.000001
        return 20 * np.log10(x_rms)
    return x_rms

def get_rms(y,hop_length = 500, frame_length = 4410, get_db = False, tav = 0):
    if tav > 0:
        return rms_sample(y,tav = tav)
    hop_length = int(hop_length)
    frame_length = int(frame_length)
    if hop_length < 1:
        hop_length = int(frame_length/2)
    if frame_length <1:
        frame_length = 4410
    n_frames = int(np.ceil( (len(y)-frame_length)/hop_length ) )
        
    if n_frames <= 1:
        rms = np.sqrt(np.mean( y**2, axis = 0))
        if get_db:
            return 20 * np.log10(rms)
        return rms
    if len(y.shape) > 1:
        rms = np.zeros( [n_frames, y.shape[1]] )
    else:
        rms = np.zeros(n_frames)
    for i in range(n_frames-1):
        start = int(i * hop_length)
        stop = start + frame_length
        rms[i] = np.sqrt(np.mean( y[start:stop]**2, axis = 0))
    rms[-1] = np.sqrt(np.mean( y[int(n_frames * hop_length):] **2, axis = 0))
    if get_db:
        return 20 * np.log10(rms)
    return rms

def get_density(rms, is_db = False):  
    if is_db:
        rms_db = np.copy(rms)
    if len(rms.shape) > 1:
        l = rms.shape[1]
        d = np.zeros([101,l])
        for i in range(l):
            if is_db:
                d[:,i] = density_helper(rms_db[:,i])
            else:
                rms0 = rms[:,i]
                rms0 = rms0[rms0 > 0]
                rms_db = 20 * np.log10(rms0)
                d[:,i] = density_helper(rms_db)
        return d
    else:
        rms = rms[rms > 0]
        rms_db = 20 * np.log10(rms)
        return density_helper(rms_db)

def density_helper(rms_db):
    d = np.zeros(101)
    r = np.copy(rms_db)
    
    r = np.floor(rms_db).astype(int)
    r = r[r >= -100]
    for i in range(len(r)):
        d[r[i] + 100] += 1
    return d / np.sum(d)

def cumden(d):
    cd = np.cumsum(d,axis = 0)
    if len(d.shape) > 1:
        for i in range(d.shape[1]):
            cd[:,i] /= cd[-1,i]
        return cd
    return cd/cd[-1]

def get_peak(d):
    pk = np.argmax(d,axis = 0)
    return pk - 100, d[pk]

def getpoint(cd, p = 0.95):
    l = len(cd.shape)
    if l == 1:
        p2 = np.where(cd > 0.95)[0][0] 
        p1 = p2 - 1
        return p1 + (p - cd[p1])/(cd[p2] - cd[p1])
    p2 = [np.where(cd[:,i] > 0.95)[0][0] for i in range(l)]
    p1 = [p2[i] - 1 for i in range(l)]
    p0 = [0 for i in range(l)]
    for i in range(l):
        p0[i] = p1[i] + (p - cd[p1[i],i])/(cd[p2[i],i] - cd[p1[i],i])
    return p0

def analyze_dyn(y,hop_length = 441, frame_length = 4410, get_db = False, rms = [], d = [], p = [0.99], fix = True, tav = 0):
    if len(rms) == 0:
        r = get_rms(y,hop_length, frame_length, get_db,tav = tav)
    else:
        r = np.copy(rms)
    if len(d) == 0:
        d = get_density(r, get_db) 
    if len(np.shape(d)) > 1:
        d = np.mean(d, axis = 1)
    cd = cumden(d)
    peak , peak_d = get_peak(d)
    pks = get_peak(d)
    if fix:
        peak = check_den(r, peak)
    p0 = np.array([getpoint(cd, p[i]) for i in range(len(p))]) - 100
    return peak, p0 - peak, peak_d

def check_den(r, peak):
    med = np.median(20 * np.log10(r))
    dev = peak - med
    if dev >= 1:
        peak -= 1 + 0.5 * (dev-1)
    elif dev > 0: 
        peak = med
    return peak

def compress_helper(x,x_rms,th,AT,REL,rt):
    G = np.min([np.zeros(len(x)),(th - 10 * np.log10(x_rms))], axis = 0)
    comp_on = (G < 0)
    comp_time = comp_on.astype(float)
    comp_time[comp_time == 0] = -1 / REL
    comp_time[comp_time > 0] = 1/AT
    comp_ratio = np.copy(comp_time)
    if comp_ratio[0] < 0:
        comp_ratio[0] = 0
    for i in range(1, len(comp_time)):
        comp_ratio[i] += comp_ratio[i-1]
        if comp_ratio[i] > 1:
            comp_ratio[i] = 1
        elif comp_ratio[i] < 0:
            comp_ratio[i] = 0
    ratio = 1 + (rt - 1) * comp_ratio
    gain = 10 ** ((ratio - 1)/ratio * G/20)
    return gain

def compressor(x,Fs,th,rt,input_gain = 1., is_db = False,at = 0.01, rel = 0.003, tav = 0.01, in_place = False,x_rms = [], as_limiter = False,plot = True, savefig = False):
    if is_db:
        input_gain = 10 ** (input_gain / 20)
    if input_gain <= 0:
        input_gain = 1
    y = input_gain * x
    if rt == 1:
        y /= np.max(np.abs(y))
        return y
    AT = int(at * Fs)
    REL = int(rel * Fs)
    l = 1

    if len(np.shape(x)) > 1:
        l = x.shape[1]
    if len(x_rms) == 0:
        x_rms = rms_sample(y,  get_db = False, tav = tav)
    if plot:
        plt.gcf().set_visible(plot)
        plt.figure(figsize=(20,5))
        plt.xlabel('time[s]')
        plt.ylabel('gain')
        plt.ylim([-0.2,1.2])
    t = np.arange(len(y))/Fs
    
    if l == 1:
        gain = compress_helper(y,x_rms,th,AT,REL,rt)
        plt.plot(t,gain)
        y = gain * y
    else:
        y = np.copy(x)
        for i in range(l):
            gain = compress_helper(y[:,i],x_rms[:,i],th,AT,REL,rt)
            y[:,i] = gain * y[:,i]
            if plot:
                plt.plot(t,gain, alpha = 0.7)
        if plot:
            plt.legend(['L','R'])
    if savefig:
        if plot:
            plt.savefig('./analyze/compressor gain.png')
    if as_limiter:
        y = hard_clip(y)
    y /= np.max(np.abs(y))
    if in_place:
        x = y
    return y
    
def get_image(M = [],S = [],y = []):
    if len(y) == 0:
        rms_m = np.sqrt(np.mean(M**2))
        rms_s = np.sqrt(np.mean(S**2))
    else:
        if len(np.shape(y)) <= 1:
            return 0
        m = np.mean(y,axis=-1)
        s = 0.5 * (y[:,0] - y[:,1])
        rms_m = np.sqrt(np.mean(m**2))
        rms_s = np.sqrt(np.mean(s**2))
    if rms_m == 0 and rms_s == 0:
        return -1
    return rms_s/(rms_s + rms_m)  

def match_imager(y,ref,in_place = False):
    if len(np.shape(y)) <= 1:
        return y
    M = np.mean(y,axis=-1)
    if ref == 0:
        return np.array([M,M]).T
    S = (y[:,0] - y[:,1]) / 2
    rms_m = np.sqrt(np.mean(M**2))
    rms_s = np.sqrt(np.mean(S**2))
    target = 1/ref - 1
    if target <= 0:
        return np.array([S,-S]).T
    ratio = np.sqrt(target/rms_m*rms_s)
    M *= ratio
    S /= ratio
    y_im = np.copy(y)
    y_im[:,0] = (M+S)/2
    y_im[:,1] = (M-S)/2
    if in_place:
        y = y_im
    return y_im

def get_comp_ratio(width,ref):
    return ref/width

def hard_clip(y,th = 1):
    y[y > th] = th
    y[y < -th] = -th
    return y

def match_rms_gain(y, ref):
    rms_y = np.sqrt(np.mean( y**2))
    return ref/rms_y

def mag2db_point(x):
    if x == 0: 
        return float('-inf')
    return 20 * np.log10(x)

def db2mag_point(x):
    return 10 ** (x/20)

def mag2db(x):
    if isinstance(x,float) or isinstance(x,int) or isinstance(x, np.int64):
        return mag2db_point(x)
    sh = np.shape(x)
    y = np.copy(x).reshape(np.prod(sh))
    return np.array( [mag2db_point(y[i]) for i in range(len(y))]).reshape(sh)

def db2mag(x):
    if isinstance(x,float) or isinstance(x,int) or isinstance(x, np.int64):
        return db2mag_point(x)
    sh = np.shape(x)
    y = np.copy(x).reshape(np.prod(sh))
    return np.array( [db2mag_point(y[i]) for i in range(len(y))]).reshape(sh)

def get_filter(fc = 16000,fstop = 20000, sr = 44100, passband_ripple = 3, stopband_atten = 10):
    fc = 15000 
    fstop = 16000  
    passband_ripple = 3
    stopband_atten = 10
    order = scipy.signal.buttord(fc, fstop, passband_ripple, stopband_atten, fs=sr)
    b, a = scipy.signal.butter(order[0], fc, fs=sr)
    return b, a

def matchEQ(x, Z_ref, Z_ref_S,zcr_ref, sr = 44100, l_t = None, dur = 10):
    if len(np.shape(x)) == 1:
        y = np.vstack([x,x]).T
    else:
        y = np.copy(x)
    n_fft = 2048
    match_eq_rms = 10 ** (-10/20)
    if len(y) <= dur * sr:
        start = 0
        stop = len(y)
        rms_y = np.sqrt(np.mean( y**2))
    else:
        l_t, rms_y = get_loudest(y, sr, dur)
        start = int(l_t*sr)
        stop =  int((l_t+dur)*sr)
    
    rms_y = np.mean(y[start:stop] ** 2)
    y /= rms_y
    y *= match_eq_rms
    y = match_zcr(y, zcr_ref)
#     print(zcr(y))
    M = 0.5 * (y[:,0] + y[:,1])
    S = 0.5 * (y[:,0] - y[:,1])
    
    Z_m = librosa.stft(M[start:stop], n_fft=2048, hop_length=None, win_length=None, window='hann', center=True, dtype=None, pad_mode='constant')
    Z_avg = librosa.amplitude_to_db( np.mean(np.abs(Z_m),axis = 1) )
    Z_s = librosa.stft(S[start:stop], n_fft=2048, hop_length=None, win_length=None, window='hann', center=True, dtype=None, pad_mode='constant')
    Z_avg_s = librosa.amplitude_to_db( np.mean(np.abs(Z_s),axis = 1) )
    f = np.linspace(0, sr/2, num = 1025,endpoint = True)
    diff = Z_ref - Z_avg
    diff_s = Z_ref_S - Z_avg_s
    
    diff_sm = smooth(diff,get_segment(f, [0,500,1000,5000, 16000]),[3,5,10,50,20])
    diff_sm = smooth(diff_sm, get_segment(f, [0,500,1000,5000, 16000]), [0,1,20,50,20])
    diff_sm_s = smooth(diff_s,get_segment(f, [0,500,1000,5000, 16000]),[3,5,10,50,20])
    diff_sm_s = smooth(diff_sm_s, get_segment(f, [0,500,1000,5000, 16000]), [0,1,20,50,20])
    coef_m = 10**(np.array(diff_sm)/20)
    coef_s = 10**(np.array(diff_sm_s)/20)
    y_out = applyEQ(y,coef_m,coef_s)
    return coef_m,coef_s,y_out, [start,stop]

def applyEQ(y,coef_m,coef_s):
    if len(np.shape(y)) == 1:
        y_M = y
        Z_M = librosa.stft(y_M,  n_fft=2048, hop_length=None, win_length=None, window='hann', center=True, dtype=None, pad_mode='constant')
        coef_M = np.array( [[coef_m[j] for i in range(Z_M.shape[1]) ] for j in range(Z_M.shape[0])] )
        Z_M *= coef_M
        return librosa.istft(Z_M, hop_length=None, win_length=None, n_fft=None, window='hann', center=True, dtype=None, length=len(y_M))
    else:
        y_M = 0.5*(y[:,0]+y[:,1])
        y_S = 0.5*(y[:,0]-y[:,1])
        Z_M = librosa.stft(y_M,  n_fft=2048, hop_length=None, window='hann', center=True, pad_mode='constant')
        Z_S = librosa.stft(y_S,  n_fft=2048, hop_length=None, window='hann', center=True, pad_mode='constant')
        coef_M = np.array( [[coef_m[j] for i in range(Z_M.shape[1]) ] for j in range(Z_M.shape[0])] )
        Z_M *= coef_M
        coef_S = np.array( [[coef_s[j] for i in range(Z_S.shape[1]) ] for j in range(Z_S.shape[0])] )
        Z_S *= coef_S
        M = librosa.istft(Z_M, hop_length=None, win_length=None, n_fft=None, window='hann', center=True, dtype=None, length=len(y_M))
        S = librosa.istft(Z_S, n_fft=None, window='hann', center=True, dtype=None, length=len(y_S))
        out = np.zeros([len(y),2])
        out[:,0] = 0.5*(M+S)
        out[:,1] = 0.5*(M-S)
        out = out / np.max(np.abs(out))
        return out
    
def map2years(input_dir, years, save_dir = '.'):
    y, sr = sf.read(input_dir)
    for year in years:
    _, _, y_eq,lt = matchEQ(y, all_year_fft_M[:,year - 1950], all_year_fft_S[:,year - 1950],zcr_ref = ZCR_year_avg.M[year - 1950], sr = 44100, l_t = None, dur = 10)
    pk,width,pk_d = analyze_dyn(y_eq)
    target_w = dyn_year[year - 1950]
    y_comp = compressor(y_eq,sr,pk,(width[0] )/target_w, plot = False )
    pk,width,pk_d = analyze_dyn(y_comp)
    ref = 10**(year_avg_rms[year-1950]*0.05)
    gain = match_rms_gain(y_comp, ref)
    y_lim = compressor(y_comp* gain,sr,year_avg_rms[year-1950],20,input_gain = 1,at = 0.07, rel = 0.3, tav = 0.01, as_limiter = True,plot = False, savefig = False)
    write_dir = save_dir + '/' + str(year) + '.wav'
    sf.write(write_dir,y_lim,sr)
    
def soundwalk(years, folder = './soul', olap = 0.1, length = 0):
    year = years[0]
    year_num = len(years)
    song_dir = folder + '/' + str(year) + '.wav'
    y,sr = sf.read(song_dir)
    if length == 0 or length > len(y):
        sample_num = len(y)
    else:
        sample_num = int(sr * length)
    out = np.zeros(y.shape)
    seg = np.linspace(0, sample_num, year_num + 1).astype(int)
    olap_num = int( olap * sr )
    if olap_num > 0.5 * (seg[1] - seg[0]):
        olap_num = int( 0.5 * (seg[1] - seg[0]) )
    win = np.linspace(0, 1, num = olap_num, endpoint = True)
    if len(np.shape(y)) > 1:
        win = np.vstack([win] * y.shape[1]).T
    for i in range(year_num):
        print(round(seg[i] / sr, 2),'s: year', years[i])
        song_dir = folder + '/' + str(i + 1950) + '.wav'
        y,sr = sf.read(song_dir)
        start = seg[i]
        stop = seg[i+1] + olap_num if i < year_num - 1 else seg[i+1]
        y_temp = y[start:stop]
        y_temp[-olap_num:] *= win[::-1]
        if i < year_num - 1:
            y_temp[:olap_num] *= win
        out[start:stop] += y_temp
    # avoid clipping at the end 
    out[stop:] *= 0
    return (out[:sample_num], seg)
    