In [136]:
%pip install librosa numpy scipy ssspy soundfile openai pyht dotenv 

Note: you may need to restart the kernel to use updated packages.


In [137]:
import librosa
# let's load the starting audio file with 32 channels

start_audio, start_sr = librosa.load("audio.wav", sr=None, mono=False)

In [138]:
# These channels work the best, but some programatic way should be used to determine the best channels
# we tried taking channels with biggest amplitude, with biggest spectral centroid, with biggest snr, etc.
# but still this 4 channels by trial and error gave the best results
four_channels = start_audio[:4]

In [139]:
# first operation we do is resampling to 16kHz
# this is standard sample_rate for many algorithms, so the result will be better
standard_sr = 16000
four_channels = librosa.resample(four_channels, orig_sr =start_sr, target_sr = standard_sr)

In [140]:
import scipy.signal as ss
import numpy as np
from ssspy.bss.fdica import NaturalGradFDICA
from ssspy.bss.ica import NaturalGradICA

# with help of ssspy library we implement Frequency Domain Independent Component Analysis (FDICA)
# and normal Time Domain Independent Component Analysis (ICA)
# combined one after the other these algorithms give the best results
# this approach is based on scientific paper that proposed this method - Multi Stage Independent Component Analysis (MSICA)

# to make this technique even more robust and reliable, we follow another paper published by the same authors
# that proposes to apply whitening before ICA and then dewhiten the data after

# whitening means that we transform the data so that the covariance matrix is the identity matrix
# meaning that the data is uncorrelated and has unit variance

# hypermarameters for these models are chosen by trial and error (not enough time and computational resources)
# but in future a grid search should be done to find the best hyperparameters
# which should make the results even better


def fdica(audio, n_fft=4096, hop_length=2048, n_iter=500):

    def contrast_fn(y):
        return 2 * np.abs(y)

    def score_fn(y):
        denom = np.maximum(np.abs(y), 1e-10)
        return y / denom

    fdica = NaturalGradFDICA(
        step_size=1e-1,
        contrast_fn=contrast_fn,
        score_fn=score_fn,
        is_holonomic=True,
    )
    _, _, spectrogram_mix = ss.stft(
        audio, window="hann", nperseg=n_fft, noverlap=n_fft - hop_length)

    spectrogram_est = fdica(spectrogram_mix, n_iter)
    _, waveform_est = ss.istft(
        spectrogram_est, window="hann", nperseg=n_fft, noverlap=n_fft - hop_length)

    return waveform_est


def ica(audio):

    def contrast_fn(x):
        return np.log(1 + np.exp(x))

    def score_fn(x):
        return 1 / (1 + np.exp(-x))

    ica = NaturalGradICA(
        contrast_fn=contrast_fn, score_fn=score_fn, is_holonomic=True
    )

    waveform_est = ica(audio, n_iter=500)
    return waveform_est

def prewhiten(audio):
    # Compute the covariance matrix
    cov_matrix = np.cov(audio)
    
    # Calculate the eigenvalues and eigenvectors of the covariance matrix
    eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
    
    # Compute the whitening transformation matrix
    whitening_matrix = np.diag(1.0 / np.sqrt(eigenvalues)).dot(eigenvectors.T)
    
    # Whitening the data
    whitened_data = np.dot(whitening_matrix, audio)
    
    return whitened_data

def dewhitening(whitened_data, original_cov_matrix):
    # Calculate the eigenvalues and eigenvectors of the original covariance matrix
    eigenvalues, eigenvectors = np.linalg.eigh(original_cov_matrix)
    
    # Compute the dewhitening transformation matrix
    dewhitening_matrix = np.diag(np.sqrt(eigenvalues)).dot(eigenvectors)
    
    # Dewhitening the data
    dewhitened_data = np.dot(dewhitening_matrix, whitened_data)
    
    return dewhitened_data

In [141]:
# performing our algorithm MSICA with whitening
four_channels_fdica = fdica(four_channels)
four_channels_wh = prewhiten(four_channels_fdica)
four_channels_ica = ica(four_channels_wh)
cleaned_four_channels = dewhitening(four_channels_ica, np.cov(four_channels_fdica))

In [142]:
# Now after blind source separation we have 4 separated channels
# Our job is to determine which channel is the most promising one (the one that the speaker signal is the most clear)
# To achieve that we calculate SNR (Signal to Noise Ratio) for each channel
# It is naive SNR, because we don't have reference (clear) audio, but it should give us a good estimate
# We are making an assumpotion that the noise is Gaussian distributed

def get_most_promising_channel(channels):

    # very naive snr without reference audio
    def calculate_naive_snr(signal):
      # Calculate the power of the signal
      signal_power = np.mean(signal ** 2)

      noise_floor = np.median(np.abs(signal)) / 0.6745  # Assuming Gaussian distribution, median absolute deviation to estimate noise level

      # Calculate SNR 
      snr = 10 * np.log10(signal_power / (noise_floor ** 2))

      return snr


    snrs = []

    for chan in channels:
      snr = calculate_naive_snr(chan)
      snrs.append(snr)

    best_channel_idx = np.argmax(snrs)
    return channels[best_channel_idx]

In [143]:
# extract the most promissing channel
best_channel = get_most_promising_channel(cleaned_four_channels)

In [144]:
# displaying current results

display(Audio(best_channel, rate=standard_sr))

In [145]:
# at this stage we have the most promising channel extracted
# speaker voice there is much more clear than in starting audio
# so in theory this could be our final result of enhancement (maybe with added spectral subtraction in order to remove noise even further)

# however, living thorugh a boom of generative AI it would be a shame not to use its fascinating properties for our use-case
# of course, we are going to use pre-trained models, because training such a models is completely of our reach (both in terms of time and computational resources)
# because the audio is already quite good, we can generate transcription with quite high accuracy using Open-AI whisper model
# then by using a sample of speaker voice and the transcription we can generate a new noiseless audio with the speaker voice
# of course during this information some data is lost, but the overall result in our opinion is very good and promising to investigate it even further


# lets load secrets for APIs that we are going to use to transcribe and then generate audio

import os
from dotenv import load_dotenv
load_dotenv()

True

In [146]:
from openai import OpenAI
# Initialize OpenAI's client with an api key
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))

In [147]:
# we need to tmp save the best channel to disk, because the OpenAI API requires a file (not numpy array) as input
import soundfile as sf

tmp_name = "tmp.wav"

sf.write(tmp_name, best_channel, standard_sr)

In [148]:
# let's get our transcription

audio_file = open(tmp_name, "rb")
transcription = client.audio.transcriptions.create(
  model="whisper-1", 
  file=audio_file,
).text

transcription

"For my last holiday, I did a trip to Poland and a trip to Scotland. So I started with a trip to Scotland to visit my friend's friend's wedding. So this was a guy called Adam, and he used to work at EdiPond as well. And he moved back to Poland to be with his girlfriend, who then very quickly became his fiancée. And he told us..."

In [149]:
# secrets for generating audio service
pyht_client_id = os.getenv('PYHT_CLIENT_ID')
pyht_secret = os.getenv('PYHT_SECRET')

In [150]:
# we need a sample of the clear speaker voice to generate the audio
# we are using the assumption that audio at the beginning of the file is the best sample of the speaker voice

# therefore the extract a tiny sample of the speaker voice we use onset detection
# onsets are the points in audio when there is a change in the audio signal (usually a new note starts or a speaker starts speaking)
# we get strength of onsets and take the biggest one as the point where the noise starts
# now we cut the audio at this point and we have a sample of the speaker voice

def cut_audio_fn(audio, sr):
    onset_strengths = librosa.onset.onset_strength(y=audio, sr=sr)
    biggest_strength_idx = np.argmax(onset_strengths)
    onset_time = librosa.frames_to_time(
        biggest_strength_idx, sr=sr, hop_length=512)
    cut_audio = audio[:int(onset_time * sr)]

    return cut_audio


In [151]:
cut_audio = cut_audio_fn(best_channel, standard_sr)

display(Audio(cut_audio, rate=standard_sr))

In [152]:
# we also need to temporarily save the cut audio to disk as the API requires a file as input
sf.write(tmp_name, cut_audio, standard_sr)

In [153]:
# prepare headers and base url for our http requests to API
import requests

request_headers = {
    "accept": "application/json",
    "AUTHORIZATION": pyht_secret,
    "X-USER-ID": pyht_client_id
}

base_url = "https://api.play.ht/api/v2/cloned-voices"

In [154]:
# due to the fact that we are using basic tier of the API we can only have one cloned voice at a time
# therefore we need to delete the previous cloned voice before creating a new one
def clean_previous_cloned_voices():
    response = requests.get(base_url, headers=request_headers)
    cloned_voices = response.json()

    for cloned_voice in cloned_voices:
        body = { "voice_id": cloned_voice["id"] }
        response = requests.delete(f"{base_url}", headers=request_headers, data=body)

clean_previous_cloned_voices()

In [155]:
# now we can create a new cloned voice by providing the cut audio sample
files = { "sample_file": (tmp_name, open(tmp_name, "rb"), "audio/wav") }

# Now we need to give a name to our voice actor
payload = { "voice_name": "Bob" }

response = requests.post(f"{base_url}/instant", data=payload, files=files, headers=request_headers)

# in response we get the id of the cloned voice
voice_id = response.json()["id"]


In [156]:
# now we can generate the final audio with the speaker voice and the transcription 

from pyht import Client, TTSOptions, Format

# Initialize PlayHT API with our credentials
client = Client(pyht_client_id, pyht_secret)

options = TTSOptions(
    # voice ID from our clonned voice
    voice=voice_id,
    sample_rate=standard_sr,
    format=Format.FORMAT_WAV,
    # playback rate of generated speech, we found out 0.75 results in pretty normal human voice speed
    speed=0.75,
)

# We stream the final response directly to the file
with open('final_output.wav', 'wb') as audio_file:
    for chunk in client.tts(text=transcription, voice_engine="PlayHT2.0-turbo", options=options):
        audio_file.write(chunk)

In [157]:
# for demonstration purposes we display the final audio

final_audio, sr = librosa.load("final_output.wav", sr=None)

display(Audio(final_audio, rate=sr))