# MerJaz

Optimally merge two audio files together!

README:

- For any wav files you want to use as inputs, you will need to place them in a folder called 'SoundInputs' in the same directory as this notebook and they will appear in the widget selectors.

- Ensure that you have a python environment with the imports outlined below installed. Hopefully you should just be able to run each of the cells from top to bottom and you will get the correct widgets to play with at the bottom.

- I would recommend not pressing the 'Press here to Merj the audio!' button until the previous one has finished computing and the spectrograms have appeared. I don't think the ipywidgets like this for some reason.

In [1]:
import librosa
import librosa.display
from matplotlib import pyplot as plt
import numpy as np
import sklearn.metrics.pairwise as smp
import IPython.display as ipd
from IPython.display import display
from IPython.display import clear_output
import ipywidgets as widgets
import os
#from scipy.io.wavfile import write as writeWav

#If you get any warnings about "Import requested from: 'numba.decorators'". These are (probably) fine to ignore.

Import requested from: 'numba.decorators', please update to use 'numba.core.decorators' or pin to Numba version 0.48.0. This alias will not be present in Numba version 0.50.0.
  from numba.decorators import jit as optional_jit
Import of 'jit' requested from: 'numba.decorators', please update to use 'numba.core.decorators' or pin to Numba version 0.48.0. This alias will not be present in Numba version 0.50.0.
  from numba.decorators import jit as optional_jit


In [2]:
def getAndTrimSound(file_1, file_2):
    print('Loading Audio..')
    snd1,sr = librosa.load(file_1)
    snd1 = trimSound(snd1, sr, takeStart=False)

    snd2,sr = librosa.load(file_2)
    snd2 = trimSound(snd2, sr, takeStart=True)

    return snd1,snd2,sr

def trimSound(snd, sr, takeStart=True):
    """If the audio files are too long, trim them down to a manageable number of seconds each.
    And don't take sound from the end 2 seconds as this can often be just quiet."""
    timeInEachFileToKeep = 30
    timeFromTheEnd = 2
    
    if len(snd)/sr > timeInEachFileToKeep + timeFromTheEnd:
        print('Trimming audio to 30s as it\'s a bit long and will take ages to process otherwise..')
        if takeStart:
            #Take the start
            snd = snd[0:(timeInEachFileToKeep+timeFromTheEnd)*sr]
            #Remove the first couple of seconds
            snd = snd[timeFromTheEnd*sr:]
        else:
            #Take the end
            snd = snd[-(timeInEachFileToKeep+timeFromTheEnd)*sr:]
            #Remove the last couple of seconds
            snd = snd[0:-timeFromTheEnd*sr]
    
    return snd

def plotSpectros(cqt1, cqt2):
    plt.figure(figsize=(14,6))
    plt.title('Original spectrograms')
    plt.subplot(2, 1, 1)
    librosa.display.specshow(cqt1, y_axis='cqt_note')
    plt.subplot(2, 1, 2)
    librosa.display.specshow(cqt2, y_axis='cqt_note')

In [3]:
def getOptimalCrossoverPoints(cqtA, cqtB, fadeSize=60):
    """Finds the indices (in hop lengths) of the two inputs cqts that most closely match each other.
    fadeSize is in STFT frames. We don't want to find a good match within fadeSize
    of either end of the spectrogram, otherwise we can't fade nicely. 
    
    Returns: bestCentreA, bestCentreB. Ideal centres of spectrogram match points with width of fadeSize.
    """
    print('Computing optimal crossover points..')
    
    cqtA = np.swapaxes(cqtA, 0, 1)
    cqtB = np.swapaxes(cqtB, 0, 1)
    
    bestCosineSimilarity = -1
    bestCentreA = -1
    bestCentreB = -1
    
    sectionSize=fadeSize #Each section contains this many STFT vertical columns.
    hop = sectionSize//4 #Hop between this and the next section. Needs to be as small as possible for the best match.
    A_section_centre = fadeSize//2 #A_section search starts at fadeSize//2
    
    num_of_comparisons=0
    #First search, with a coarse hop size.
    while A_section_centre < cqtA.shape[0]-(fadeSize//2):
        #A_section to compare with the B_section
        A_section = cqtA[A_section_centre-(sectionSize//2):A_section_centre+(sectionSize//2),:]
        
        B_section_centre = fadeSize//2 #B_section search also starts at fadeSize//2
        
        while B_section_centre < cqtB.shape[0]-(fadeSize//2):
            #B_section to compare with the A_section
            B_section = cqtB[B_section_centre-(sectionSize//2):B_section_centre+(sectionSize//2),:]
            
            #Measure the cosine similarity between the two sections.
            #Just take the diagonal to get the similarity between specific frequency bins.
            #Off diagonal elements tell us if one frequency bins content is related to a different bin.
            cos_sim = np.sum(np.diagonal(smp.cosine_similarity(A_section, B_section)))
            
            #Record if we've found a new best matching position.
            if cos_sim > bestCosineSimilarity:
                bestCosineSimilarity = cos_sim
                bestCentreA = A_section_centre
                bestCentreB = B_section_centre
            num_of_comparisons += 1
                
            B_section_centre += hop
        A_section_centre += hop
    
    
    #Do another search in a close-by area but more in depth to find a better match if possible.
    A_section_centre = bestCentreA-hop
    B_section_centre = bestCentreB-hop
    newHop = hop//10
    newBestCentreA = -1
    newBestCentreB = -1
    newBestCosineSimilarity = -1
    

    while A_section_centre < bestCentreA+hop:
        if A_section_centre > fadeSize//2 and A_section_centre < cqtA.shape[0]-(fadeSize//2):
            #A_section to compare with the B_section
            A_section = cqtA[A_section_centre-(sectionSize//2):A_section_centre+(sectionSize//2),:]

            B_section_centre = bestCentreB-hop

            while B_section_centre < bestCentreB+hop:
                if B_section_centre > fadeSize//2 and B_section_centre < cqtB.shape[0]-(fadeSize//2):
                    #B_section to compare with the A_section
                    B_section = cqtB[B_section_centre-(sectionSize//2):B_section_centre+(sectionSize//2),:]

                    #Measure the cosine similarity between the two sections.
                    cos_sim = np.sum(np.diagonal(smp.cosine_similarity(A_section, B_section)))

                    #Record if we've found a new best matching position.
                    if cos_sim > newBestCosineSimilarity:
                        newBestCosineSimilarity = cos_sim
                        newBestCentreA = A_section_centre
                        newBestCentreB = B_section_centre
                    num_of_comparisons += 1

                B_section_centre += newHop
        A_section_centre += newHop
    
    print('Number of spectrogram\'s compared:', num_of_comparisons)
    
    return newBestCentreA, newBestCentreB


In [4]:
def plotMergeSections(stft1, stft2, top_dB):
    plt.figure(figsize=(16,6))
    plt.subplot(1, 2, 1)
    librosa.display.specshow(librosa.amplitude_to_db(np.abs(stft1), ref=np.max, top_db=top_dB), y_axis='log')
    #plt.colorbar(format='%+2.0f dB')
    plt.title('Merge section spectrograms')
    plt.subplot(1, 2, 2)
    librosa.display.specshow(librosa.amplitude_to_db(np.abs(stft2), ref=np.max, top_db=top_dB), y_axis='log')
    #plt.colorbar(format='%+2.0f dB')
    plt.title('Merge section spectrograms')

In [5]:
def plotStitchedSection(stft_stitched, top_dB):
    plt.figure(figsize=(16,6))
    plt.title('Stitched Spectrograms')
    librosa.display.specshow(librosa.amplitude_to_db(np.abs(stft_stitched), ref=np.max,top_db=top_dB), y_axis='log')
    #plt.colorbar(format='%+2.0f dB')

In [6]:
def stitchTogetherFreqTransforms(stft1, stft2, method=0, bin_merge_length=31):
    """Finds the optimum points for each frequency bin to flip from stft1 to stft2.
    
    bin_merge_length: defines how long it takes for each bin's magnitude to flip in the first two methods
    
    method: 'minimumPhaseDiff' - looks for the minimum phase difference between each bin.
            'minimumMagnitudeDiff' - looks for the minimum magnitude difference between each bin.
            'minimumPhaseAndMagnitudeDiff' - switches the phase and magnitude at their respective minima
                                                independant of each other.
            'minimumComplexDiff' - looks for the minimum euclidian distance for each bin.
    
    returns: new_stft - a spectrogram of the same size as the inputs that is the inputs 'optimally' merged."""
    
    print('Stitching together audio..')
    
    mergeLength=bin_merge_length #How fast the magnitude switches for each frequency band, in STFT frames.
    assert (mergeLength & 0x1 == 1), 'Merge length should be odd!'
    new_stft = np.zeros(stft1.shape, dtype=complex)
        
    if method == 'minimumComplexDiff':
        #Difference is euclidean distance between two complex numbers.
        diffMatrix = np.abs(stft1 - stft2)
        
        #Iterate over frequency bins
        for k, row in enumerate(diffMatrix):
            #Get time of the minimum complex difference for this frequency bin.
            bestT = np.argmin(np.abs(row[mergeLength//2:-mergeLength//2]))+(mergeLength//2)
            #Add the first spectrogram up until this time
            new_stft[k,:bestT] = stft1[k,:bestT]
            #Add the second spectrogram after this time
            new_stft[k,bestT:] = stft2[k,bestT:]
    
    else:# method == 'minimumPhaseAndMagnitudeDiff' | 'minimumPhaseDiff' | 'minimumMagnitudeDiff'
        assert(method == 'minimumPhaseAndMagnitudeDiff' or method == 'minimumPhaseDiff' or method == 'minimumMagnitudeDiff'), 'Not selected an appropriate method for stitching'
        
        phaseDiffMatrix = np.angle(stft1)-np.angle(stft2)
        #Doesn't add a noticeable difference and takes so long to unwrap all the phases.
        #for i in range(phaseDiffMatrix.shape[0]):
        #    for j in range(phaseDiffMatrix.shape[1]):
        #        if phaseDiffMatrix[i,j] < -np.pi:
        #            phaseDiffMatrix[i,j] += 2*np.pi
        #        elif phaseDiffMatrix[i,j] >= np.pi:
        #            phaseDiffMatrix[i,j] -= 2*np.pi
        #print('Computing magnitude diff matrix')
        magnitudeDiffMatrix = np.abs(stft1)-np.abs(stft2)
        
        #Work out the point at which the phase should flip over for each frequency bin.
        #Work out the point at which the magnitude should flip over for each frequency bin.
        
        phaseDifferences = []
        magnitudeDifferences = []
        
        for k,phaseRow in enumerate(phaseDiffMatrix):
            
            magnitudeRow = magnitudeDiffMatrix[k,:]
            #The time at which the magnitude has minimum difference.
            magnitudeBestT = np.argmin(np.abs(magnitudeRow[mergeLength//2:-mergeLength//2]))+mergeLength//2
            #The time at which the phase has minimum difference
            phaseBestT = np.argmin(np.abs(phaseRow[mergeLength//2:-mergeLength//2]))+(mergeLength//2)
            
            #PHASES
            if method == 'minimumPhaseDiff' or method == 'minimumPhaseAndMagnitudeDiff':
                #Up until phaseBestT, add stft1's phase vals.
                newPhases = np.angle(stft1[k,0:phaseBestT])
                # From phaseBestT onwards, add stft2's phase vals.
                newPhases = np.concatenate([newPhases, np.angle(stft2[k,phaseBestT:])])
            else: #method = 'minimumMagnitudeDiff'
                #Up until magnitudeBestT, add stft1's phase vals.
                newPhases = np.angle(stft1[k,0:magnitudeBestT])
                # From phaseBestT onwards, add stft2's phase vals.
                newPhases = np.concatenate([newPhases, np.angle(stft2[k,magnitudeBestT:])])
            
            #MAGNITUDES
            if method == 'minimumMagnitudeDiff' or method == 'minimumPhaseAndMagnitudeDiff':
                #Up until magnitudeBestT, add stft1's magnitude vals
                newMagnitudes = np.abs(stft1[k,0:magnitudeBestT])
                #From magnitudeBestT onwards, add stft2's magnitude vals
                newMagnitudes = np.concatenate([newMagnitudes, np.abs(stft2[k,magnitudeBestT:])])
            else: #method = 'minimumPhaseDiff'
                #Up until phaseBestT, add stft1's magnitude vals
                newMagnitudes = np.abs(stft1[k,0:phaseBestT])
                #From phaseBestT onwards, add stft2's magnitude vals
                newMagnitudes = np.concatenate([newMagnitudes, np.abs(stft2[k,phaseBestT:])])
            
            #Combine the phases and magnitudes into complex numbers again.
            new_stft[k,:] = np.exp(newPhases*1j)*newMagnitudes
    
    return new_stft


In [7]:
def MerjTwoFiles(file_1_name, file_2_name, fade_size=1, method='minimumPhaseDiff', n_fft=4096, plotSpectrograms=True):
    """
    Perform CQT to find optimally matching points.
    Perform STFT on and then merge the two audio files at those optimally matching points.
    Prepend and Append the unchanged frequency spectrum of the audio files and Inverse-STFT.
    """
    
    snd1,snd2,sr = getAndTrimSound(file_1_name, file_2_name)

    #Hop length used when calculating the STFTs and CQTs. The smaller the hop the better the resolution of matches.
    hopL=64
    #STFT size. Increase this to increase the number of frequency bins.
    n_fft=n_fft
    #fade time in seconds
    fadeTime = fade_size

    #Calculate CTQs
    cqt1 = librosa.cqt(snd1, sr=sr, hop_length=hopL)
    cqt2 = librosa.cqt(snd2, sr=sr, hop_length=hopL)

    #Calculate STFTs
    stft1 = librosa.core.stft(snd1, hop_length=hopL, n_fft=n_fft);
    stft2 = librosa.core.stft(snd2, hop_length=hopL, n_fft=n_fft);

    top_dB=80.0
    #Calculate the dB level of the CQT as this is more perceptually useful.
    cqt1_Magn = librosa.amplitude_to_db(np.abs(cqt1), ref=1.0, top_db=top_dB)
    cqt2_Magn = librosa.amplitude_to_db(np.abs(cqt2), ref=1.0, top_db=top_dB)
    #plotSpectros(cqt1_Magn, cqt2_Magn)
    
    fadeTimeInFrames = round(fadeTime*sr/hopL)

    #best location in hop lengths!
    snd1BestLoc,snd2BestLoc = getOptimalCrossoverPoints(cqt1_Magn,cqt2_Magn,fadeSize=fadeTimeInFrames)
    print('Best File 1 time:', '{:.2f}'.format(snd1BestLoc*hopL/sr) + 's')
    print('Best File 2 time:', '{:.2f}'.format(snd2BestLoc*hopL/sr) + 's')

    #Get the sections out of the STFTs that we are going to do the merge in.
    stft1_MergeSection = stft1[:,snd1BestLoc-(fadeTimeInFrames//2):snd1BestLoc+(fadeTimeInFrames//2)]
    stft2_MergeSection = stft2[:,snd2BestLoc-(fadeTimeInFrames//2):snd2BestLoc+(fadeTimeInFrames//2)]
    if plotSpectrograms:
        plotMergeSections(stft1_MergeSection, stft2_MergeSection, top_dB)

    #Merge the two sections together.
    stitched_merge_transform = stitchTogetherFreqTransforms(stft1_MergeSection, stft2_MergeSection, method=method,
                                                            bin_merge_length=1)
    if plotSpectrograms:
        plotStitchedSection(stitched_merge_transform, top_dB)

    #Stick on the unchanged stfts either side of the merged section
    unchangedStft1 = stft1[:,0:snd1BestLoc-(fadeTimeInFrames//2)]
    unchangedStft2 = stft2[:,snd2BestLoc-1+(fadeTimeInFrames//2):]
    entireStitchedFrequencySpectrum = np.concatenate([unchangedStft1, stitched_merge_transform, unchangedStft2], axis=1)

    #Inverse STFT to get back audio
    mergedAudio = librosa.core.istft(entireStitchedFrequencySpectrum, hop_length=hopL);
    
    return mergedAudio, sr


In [8]:
def process(writeFile=False):
    
    file1 = sounds_directory + file_1_selector.value
    file2 = sounds_directory + file_2_selector.value
    fade_size = fadeSizeSlider.value
    methodToMethodNum = {'Minimise Phase Differences':'minimumPhaseDiff',
                         'Minimise Magnitude Differences':'minimumMagnitudeDiff',
                         'Minimise Phase and Magnitude Differences':'minimumPhaseAndMagnitudeDiff',
                         'Minimise Complex Differences':'minimumComplexDiff'}
    method = methodToMethodNum[method_selector.value]
    n_fft = fftsize_selector.value
    
    print('Merjing ' + file_1_selector.value + ' into ' + file_2_selector.value + ' with fade duration ' + str(fade_size)+'s')

    assert(file1.endswith('.wav')), file1 + ' is not a wav file.'
    assert(file2.endswith('.wav')), file2 + ' is not a wav file.'

    merjed_audio,sr = MerjTwoFiles(file1, file2, fade_size=fade_size, method=method, n_fft=n_fft)

    display(ipd.Audio(merjed_audio,rate=sr))
    
    if writeFile==True:
        writeWav('Output.wav',sr,merjed_audio)

In [9]:
#Change this folder if you want to change where input sounds are kept.
sounds_directory = 'SoundInputs/'

soundsList = os.listdir(sounds_directory)
soundsList.sort()

for soundFileName in soundsList:
    if not (soundFileName.endswith('.wav')):
        soundsList.remove(soundFileName)

In [10]:
file_1_selector = widgets.Dropdown(
    options=soundsList,
    value=soundsList[0],
    description='File1:',
    disabled=False,
)
display(file_1_selector)


file_2_selector = widgets.Dropdown(
    options=soundsList,
    value=soundsList[1],
    description='File2:',
    disabled=False,
)
display(file_2_selector)

style = {'description_width': 'initial'}
layout = widgets.Layout(width='auto', height='40px')

method_selector = widgets.Dropdown(
    options=['Minimise Phase Differences', 'Minimise Magnitude Differences',
    'Minimise Phase and Magnitude Differences', 'Minimise Complex Differences'],
    value='Minimise Phase Differences',
    description='Merjing method',
    disabled=False,
    style=style,
    layout=layout
)
display(method_selector)


fadeSizeSlider = widgets.FloatSlider(
    value=1.0,
    min=0.2,
    max=5.1,
    step=0.1,
    description='Duration of merj:',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='.1f',
    style=style
)
display(fadeSizeSlider)

fftsize_selector = widgets.Dropdown(
    options=[128, 256, 512, 1024, 2048, 4096, 8192, 16384],
    value=4096,
    description='FFT Size',
    disabled=False,
    style=style
)
display(fftsize_selector)


button = widgets.Button(description="Press here to Merj the audio!", layout=layout)
output = widgets.Output()
display(button, output)


def on_button_clicked(b):
    clear_output(wait=True)
    display(file_1_selector)
    display(file_2_selector)
    display(method_selector)
    display(fadeSizeSlider)
    display(fftsize_selector)
    display(button, output)
    process()


button.on_click(on_button_clicked)

Dropdown(description='File1:', options=('0_5sChopin.wav', '0_8sChopin.wav', '5_10sChopin.wav', 'Beat1.wav', 'B…

Dropdown(description='File2:', index=1, options=('0_5sChopin.wav', '0_8sChopin.wav', '5_10sChopin.wav', 'Beat1…

Dropdown(description='Merjing method', layout=Layout(height='40px', width='auto'), options=('Minimise Phase Di…

FloatSlider(value=1.0, continuous_update=False, description='Duration of merj:', max=5.1, min=0.2, readout_for…

Dropdown(description='FFT Size', index=5, options=(128, 256, 512, 1024, 2048, 4096, 8192, 16384), style=Descri…

Button(description='Press here to Merj the audio!', layout=Layout(height='40px', width='auto'), style=ButtonSt…

Output()