In [131]:
import librosa
import numpy as np
import noisereduce as nr
import webrtcvad
from scipy.signal import find_peaks
from sklearn.preprocessing import StandardScaler
from scipy.signal import find_peaks
from scipy.io.wavfile import write
from audiomentations import Compose, AddGaussianNoise, PitchShift, TimeStretch, Shift, ClippingDistortion, Gain
import numpy as np
import audiomentations 
from joblib import Parallel, delayed

def normalize_audio(audio_segment, target_dBFS=-20.0):
    # Calculate the current dBFS (decibels relative to full scale) of the audio
    current_dBFS = audio_segment.dBFS
    
    # Calculate the dBFS difference needed to reach the target level
    dBFS_difference = target_dBFS - current_dBFS
    
    # Apply the gain adjustment to normalize the audio to the target level
    normalized_audio = audio_segment.apply_gain(dBFS_difference)
    
    return normalized_audio


def augment_audio(audio_signal, sr):
    """
    Augment the audio signal with various transformations.
    
    Args:
    - audio_signal (numpy.ndarray): 1D array representing the audio signal.
    - sr (int): Sampling rate of the audio signal.
    
    Returns:
    - augmented_signal (numpy.ndarray): Augmented audio signal.
    """
    # Define augmentation pipeline

    augmentation_pipeline = Compose([
        AddGaussianNoise(min_amplitude=0.001, max_amplitude=0.015, p=0.3),  # Reduced probability to 0.3
        PitchShift(min_semitones=-4, max_semitones=4, p=0.3),  # Reduced probability to 0.3
        TimeStretch(min_rate=0.8, max_rate=1.2, p=0.3),  # Reduced probability to 0.3
        ClippingDistortion(max_percentile_threshold=95, p=0.3),  # Reduced probability to 0.3
        Gain(min_gain_in_db=-6, max_gain_in_db=6, p=0.3)  # Reduced probability to 0.3
])

    
    # Augment the audio signal
    augmented_signal = augmentation_pipeline(samples=audio_signal, sample_rate=sr)
    
    return augmented_signal

def remove_clipping_artifacts(audio_signal, threshold=0.95):
    """
    Remove clipping artifacts from the audio signal.
    
    Args:
    - audio_signal (numpy.ndarray): 1D array representing the audio signal.
    - threshold (float): Threshold for detecting clipping artifacts (default: 0.95).
    
    Returns:
    - processed_signal (numpy.ndarray): Processed audio signal with clipping artifacts removed.
    """
    # Find the maximum absolute value in the audio signal
    max_abs_value = np.max(np.abs(audio_signal))
    
    # Detect clipping artifacts based on the threshold
    clipping_mask = np.abs(audio_signal) >= threshold * max_abs_value
    
    # Replace clipped samples with interpolated values
    processed_signal = np.copy(audio_signal)
    processed_signal[clipping_mask] = np.nan  # Mark clipped samples as NaN
    processed_signal = np.nan_to_num(processed_signal)  # Replace NaN with interpolated values
    
    return processed_signal


def reduce_noise(audio_data, sample_rate, stationaryTF):

    # Apply spectral subtraction to reduce noise
    chunk_length = 30000  # 30 seconds
    reduced_noise = []
    for i in range(0, len(audio_data), chunk_length):
        chunk = audio_data[i:i+chunk_length]
        reduced_chunk = nr.reduce_noise(y=chunk, sr=sample_rate, stationary = stationaryTF)
        reduced_noise.append(reduced_chunk)

    reduced_noise = np.concatenate(reduced_noise)
    return reduced_noise

def vad_energy_based(audio_data, threshold=0.00003):
    active_segments = []  # Initialize an empty list to store active voice segments
    segment_start = 0  # Variable to store the start index of an active segment
    is_previous_active = False  # Flag to keep track of the previous state of voice activity

    for i, sample in enumerate(audio_data):
        energy = np.sum(sample ** 2)  # Calculate energy of the current sample
        is_active = energy > threshold  # Check if the current sample indicates voice activity

        if is_active:
            if not is_previous_active:
                segment_start = i  # Start of a new active segment
            is_previous_active = True
        else:
            if is_previous_active:
                active_segments.append(audio_data[segment_start:i])  # Append active segment to list
            is_previous_active = False

    # Check if the last segment is active
    if is_previous_active:
        active_segments.append(audio_data[segment_start:])

    # Concatenate all active segments into a single numpy array
    if active_segments:
        active_voice_array = np.concatenate(active_segments)
        return active_voice_array
    else:
        return np.array([])  # Return an empty array if no active segments are found




In [115]:
# Load the WAV audio file
audio_file = 'output_file.wav'
y, sr = librosa.load(audio_file, sr=None)

def generateOutputAudioFile(outputDestination, inputArray):

    # Scale the values in the array to the range [-32768, 32767] (for 16-bit PCM audio)
    scaled_array = np.int16(inputArray*32767)

    # Write the array to a WAV file
    write(outputDestination, sr, scaled_array)

print(y.shape)


(2980160,)


In [116]:

# Resample the audio to a common sampling rate (e.g., 16 kHz) - Preprocessing 1 Desampling
target_sr = 16000
print(sr)
y_resampled = librosa.resample(y, orig_sr=sr, target_sr=16000)
print(y_resampled.shape)

generateOutputAudioFile('pre_processed_audio/pre_processing_1_output.wav', y_resampled)

16000
(2980160,)


In [117]:
# Perform noise reduction using the NoiseReduce library - Preprocessing 4 Noise Reduction
y_denoised = reduce_noise( y_resampled, sr, True) 

print(y_denoised.shape)

factor = 10**(10 / 20)
    
    # Multiply each element of the array by the factor
y_denoised = y_denoised * factor
generateOutputAudioFile('pre_processed_audio/pre_processing_3_output.wav', y_denoised)

(2980160,)


In [118]:
from spleeter.separator import Separator

# Load Spleeter separator
separator = Separator('spleeter:2stems')  # 'spleeter:2stems' separates into vocal and accompaniment
y_resampled = y_denoised
print(y_resampled.shape)
y_resampled = y_resampled.reshape(-1, 1)
# Separate audio sources
separated_sources = separator.separate( y_resampled)
y_musicremoved = separated_sources['vocals']

y_musicremoved = y_musicremoved[:, 0]
print(y_musicremoved.shape)
generateOutputAudioFile('pre_processed_audio/pre_processing_2_output.wav', y_musicremoved)

(2980160,)
INFO:tensorflow:Using config: {'_model_dir': 'pretrained_models\\2stems', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': gpu_options {
  per_process_gpu_memory_fraction: 0.7
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Apply unet for vocals_spectrogram
INFO:tensorflow:Apply unet for accompaniment_spectrogram
INFO:te

In [119]:
# Perform noise reduction using the NoiseReduce library - Preprocessing 4 Noise Reduction

# print(y_denoised.shape)

# factor = 10**(10 / 20)
    
    # Multiply each element of the array by the factor
# y_musicremoved = y_musicremoved * factor
# y_denoised = reduce_noise( y_musicremoved, sr, True) 
# generateOutputAudioFile('pre_processed_audio/pre_processing_3_output.wav', y_denoised)
# from pydub import AudioSegment
# from pydub.silence import split_on_silence
# # Split audio into segments based on silence
# audio = AudioSegment.from_wav('pre_processed_audio/pre_processing_4_output.wav')
# segments = split_on_silence(audio, min_silence_len=100, silence_thresh=-90)

# # Modify video segments
# modified_video_clips = []
# # Trim segments to a fixed duration (e.g., 5 seconds)
# fixed_duration = 5000  # 5 seconds in milliseconds

# # Create new segments with fixed duration
# trimmed_segments = [segment[:fixed_duration] for segment in segments]

# # Export trimmed segments
# concatenated_audio = trimmed_segments[0]
# for segment in trimmed_segments[1:]:
#     concatenated_audio += segment

# # Export concatenated audio
# concatenated_audio.export("concatenated_audio.wav", format="wav")




In [120]:
# Remove silence using a threshold (e.g., -40 dB) - Preprocessing 3 Silence Removal
y_trimmed, _ = librosa.effects.trim(y_musicremoved, top_db=8)
print(y_trimmed.shape)

generateOutputAudioFile('pre_processed_audio/pre_processing_6_output.wav', y_trimmed)

# Normalize the audio to ensure consistent amplitude levels - Preprocessing 2 Normalization




(2754560,)


In [121]:


y_normalized = librosa.util.normalize(y_trimmed, norm=np.inf)

print(y_normalized.shape)
generateOutputAudioFile('pre_processed_audio/pre_processing_5_output.wav', y_normalized)

(2754560,)


In [122]:

# # speech_segments = perform_vad(y_denoised, sr) - Preprocessing 5 Voice Activity Detection (VAD):
# active_voice_array = vad_energy_based(y_denoised)
# print("Active Voice Array:", active_voice_array.shape)
# y_preprocessed5 = active_voice_array
# # y_preprocessed5 = np.concatenate(speech_segments)
y_preprocessed5 = y_normalized
# print(y_preprocessed5.shape)

# generateOutputAudioFile('pre_processed_audio/pre_processing_5_output.wav', y_preprocessed5)


In [123]:

# Find peaks in the audio signal - Preprocessing 6  Dynamic Range Compression:
peaks, _ = find_peaks(np.abs(y_preprocessed5), height=1)

# Apply compression by reducing the amplitude of peaks
compression_factor = 0.5
y_compressed = np.copy(y_preprocessed5)

y_compressed[peaks] *= compression_factor
print(y_compressed.shape)
# Now y_compressed contains the audio with dynamic range compression applied

generateOutputAudioFile('pre_processed_audio/pre_processing_7_output.wav', y_compressed)




(2754560,)


In [142]:
import tensorflow as tf
from tensorflow.keras.layers import Conv1D, LSTM, Dense, Attention, Dropout

class Tacotron(tf.keras.Model):
    def __init__(self, num_encoder_layers, num_decoder_layers, encoder_units, decoder_units, attention_units, dropout_rate):
        super(Tacotron, self).__init__()
        self.encoder = Encoder(num_encoder_layers, encoder_units, dropout_rate)
        self.decoder = Decoder(num_decoder_layers, decoder_units, attention_units, dropout_rate)
        
    def call(self, inputs, training=False):
        source_audio, target_audio = inputs
        encoder_output = self.encoder(source_audio, training=training)
        decoder_output = self.decoder([encoder_output, target_audio], training=training)
        return decoder_output

class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, units, dropout_rate):
        super(Encoder, self).__init__()
        self.num_layers = num_layers
        self.units = units
        self.dropout_rate = dropout_rate
        self.conv_layers = [Conv1D(units, kernel_size=5, strides=1, padding='same', activation='relu') 
                            for _ in range(num_layers)]
        self.dropout_layers = [Dropout(dropout_rate) for _ in range(num_layers)]
        
    def call(self, inputs, training=False):
        x = inputs
        for i in range(self.num_layers):
            x = self.conv_layers[i](x)
            x = self.dropout_layers[i](x, training=training)
        return x

class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, units, attention_units, dropout_rate):
        super(Decoder, self).__init__()
        self.num_layers = num_layers
        self.units = units
        self.attention = Attention()
        self.lstm_layers = [LSTM(units, return_sequences=True, return_state=True) 
                            for _ in range(num_layers)]
        self.fc = Dense(units)
        self.dropout = Dropout(dropout_rate)
        
    def call(self, inputs, training=False):
        encoder_output, target_audio = inputs
        context_vector, attention_weights = self.attention([encoder_output, target_audio])
        x = tf.concat([tf.expand_dims(context_vector, 1), target_audio], axis=-1)
        x = self.fc(x)
        x = self.dropout(x, training=training)
        for i in range(self.num_layers):
            x, _, _ = self.lstm_layers[i](x)
        return x


In [None]:
import torch
import numpy as np
import librosa
import soundfile as sf

# Define hyperparameters
num_encoder_layers = 3
num_decoder_layers = 2
encoder_units = 256
decoder_units = 512
attention_units = 256
dropout_rate = 0.1
batch_size = 32

# Load Tacotron model
model = Tacotron(num_encoder_layers, num_decoder_layers, encoder_units, decoder_units, attention_units, dropout_rate)
model.load_weights('model_new.pth') #Works upon training of the model, takes some time
# Assuming y_compressed is a numpy array containing audio data
y_compressed = np.random.randn(1, 10, 128)  # Example data, replace with your data

# Generate audio from numpy array
generated_audio = model([y_compressed, y_compressed], training=False)

# Convert generated audio tensor to numpy array
generated_audio_np = generated_audio.numpy()

# Define audio parameters
sampling_rate = 16000  # Update with your desired sampling rate
output_file = "generated_audio.wav"

# Save generated audio as WAV file
sf.write(output_file, generated_audio_np[0], sampling_rate)

print("Generated audio saved as", output_file)