<a href="https://colab.research.google.com/github/HernanDL/Noise-Cancellation-Using-GenAI/blob/main/TCN_Urban_Noise_Cancellation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# TCN-based Urban Noise Prediction and Phase Cancellation

This Colab notebook implements a Temporal Convolutional Network (TCN) for predicting and canceling urban noise patterns. It generates synthetic urban noise, trains a TCN model on this data, and evaluates its performance on unseen uploaded WAV files.

## Table of Contents:
1. **Introduction**
2. **Importing Libraries**
3. **Define TCN Model Class**
4. **Synthetic Urban Noise Generation**
5. **Data Preparation**
6. **Building the TCN Model**
7. **Training the Model**
8. **Evaluating the Model**
9. **Saving and Loading the Model**
10. **Testing on Uploaded WAV File**
11. **Conclusion**

---

## 1. Introduction:
Urban noise includes sounds such as those from machinery, engines, and constant background hums. This noise spans a wide frequency range, typically from **20 Hz** (low hums) to **20,000 Hz** (high-pitched sounds). Traditional noise cancellation techniques may struggle with such a diverse spectrum, so we explore a TCN model that uses historical context to predict and invert these noises for better cancellation.

In this notebook, we'll:
- Generate synthetic urban-like noise patterns.
- Train a TCN model to predict and invert these noise patterns.
- Test the model on both synthetic data and real-world WAV files.
- Visualize the model's predictions and listen to the results.

## 2. Importing Libraries:
The necessary libraries for the TCN model, audio handling, and plotting are imported.


## 3. Define TCN Model Class:
The `WaveformPredictor` class manages the generation of synthetic noise, the training process, and predictions. It includes:
- Parameters for waveform generation, model architecture, and training.
- Methods to generate urban-like noise, prepare training data, build the TCN model, and plot results.


## 4. Synthetic Urban Noise Generation:
Synthetic noise is generated using a combination of random sine and cosine waves to simulate various urban noise patterns (e.g., engines, machinery). Each call to `generate_waveform` creates a unique pattern to encourage model generalization.


## 5. Data Preparation:
Data is prepared by generating multiple waveforms, segmenting them into sequences, and preparing input-output pairs for training. The output is the inverted phase of the noise, which helps the TCN model learn to predict signals that could cancel out the original noise.


## 6. Building the TCN Model:
The TCN model is constructed using convolutional layers with dilations for long-range dependencies. It uses the `Conv1D` layers with increasing dilation rates to capture complex temporal patterns.


## 7. Training the Model:
The model is trained on synthetic waveforms using a low learning rate for stability. Early stopping is used to prevent overfitting.


## 8. Evaluating the Model:
After training, the model is evaluated on new synthetic noise and the results are visualized. The following plots are generated:
- **Input Waveform**: The original test waveform.
- **Predicted Waveform**: The TCN model’s prediction.
- **Combined Waveform**: The sum of the input and predicted waveforms to evaluate phase cancellation.
- **Residual Noise (dB)**: Difference between input and predicted waveforms in decibels.


## 9. Saving and Loading the Model:
The trained model is saved for future use, allowing users to reload the model without retraining. This is helpful for testing the model with new inputs like WAV files.


## 10. Testing on Uploaded WAV File:
This section allows users to upload a WAV file, make predictions using the loaded TCN model, and visualize the results.


## 11. Conclusion:
The TCN model offers a robust approach for modeling and predicting complex noise patterns over time. By leveraging its ability to capture long-term dependencies, it can better predict the inverted phases needed for noise cancellation. While the training can be slow, especially for long sequences, it provides flexibility in handling diverse and non-stationary signals like urban noise.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import Audio, display
import tensorflow as tf
import librosa
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adam
from google.colab import files

class WaveformPredictorTCN:
    # Parameters for easy tuning
    num_training_waves = 100  # Number of waves to use for training
    duration = 1.0           # Duration in seconds
    sample_rate = 44100      # Samples per second
    epochs = 1               # Number of training epochs
    learning_rate = 0.0001   # Learning rate for the optimizer
    sequence_length = 100    # Number of previous samples to consider for prediction
    max_freq = 10000         # Maximum frequency for urban noise generation
    batch_size = 32          # Batch size for training

    def __init__(self):
        self.model = self.build_model()
        self.device = self.set_device()

    def set_device(self):
        if tf.config.list_physical_devices('GPU'):
            print("Using GPU for training.")
            return 'GPU'
        else:
            print("Using CPU for training.")
            return 'CPU'

    def generate_waveform(self):
        t = np.linspace(0, self.duration, int(self.sample_rate * self.duration), endpoint=False)
        # Generate random urban noise-like signal by combining sine waves with varying amplitudes
        frequencies = np.random.uniform(20, self.max_freq, np.random.randint(3, 10))  # Random number of waves
        waveform = np.sum([np.random.uniform(0.2, 1.0) * np.sin(2 * np.pi * f * t) for f in frequencies], axis=0)
        return waveform / np.max(np.abs(waveform))  # Normalize to [-1, 1]

    def prepare_data(self):
        input_waveforms = np.array([self.generate_waveform() for _ in range(self.num_training_waves)])

        X, y = [], []
        for waveform in input_waveforms:
            for i in range(len(waveform) - self.sequence_length):
                X.append(waveform[i:i + self.sequence_length])
                y.append(-waveform[i + self.sequence_length])  # Predict the inverted phase

        X = np.array(X).reshape(-1, self.sequence_length, 1)
        y = np.array(y)
        return X, y

    def build_model(self):
        input_layer = layers.Input(shape=(self.sequence_length, 1))
        # First TCN block
        tcn = layers.Conv1D(filters=64, kernel_size=3, padding='causal', activation='relu', dilation_rate=1)(input_layer)
        tcn = layers.BatchNormalization()(tcn)
        tcn = layers.Conv1D(filters=64, kernel_size=3, padding='causal', activation='relu', dilation_rate=2)(tcn)
        tcn = layers.BatchNormalization()(tcn)

        # Second TCN block with increased dilation
        tcn = layers.Conv1D(filters=64, kernel_size=3, padding='causal', activation='relu', dilation_rate=4)(tcn)
        tcn = layers.BatchNormalization()(tcn)

        # Flatten and dense layers for output
        flatten = layers.Flatten()(tcn)
        output_layer = layers.Dense(1)(flatten)

        model = models.Model(inputs=input_layer, outputs=output_layer)
        model.compile(optimizer=Adam(learning_rate=self.learning_rate), loss='mean_squared_error')
        return model

    def train(self):
        X, y = self.prepare_data()
        early_stopping = EarlyStopping(monitor='loss', patience=5, restore_best_weights=True)
        history = self.model.fit(X, y, epochs=self.epochs, batch_size=self.batch_size, verbose=1, callbacks=[early_stopping])
        return history

    def predict(self, test_waveform):
        X_test = []
        for i in range(len(test_waveform) - self.sequence_length):
            X_test.append(test_waveform[i:i + self.sequence_length])
        X_test = np.array(X_test).reshape(len(X_test), self.sequence_length, 1)
        predicted_waveform = self.model.predict(X_test)
        return predicted_waveform.flatten()  # Ensure it returns a 1D array

    def plot_results(self, test_waveform, predicted_waveform, combined_waveform):
        time_axis = np.linspace(0, self.duration, int(self.sample_rate * self.duration), endpoint=False)
        zoom_duration = 0.05  # 50 ms
        zoom_samples = int(self.sample_rate * zoom_duration)

        plt.figure(figsize=(15, 10))

        plt.subplot(5, 1, 1)
        plt.title('Input Waveform')
        plt.plot(time_axis[:zoom_samples], test_waveform[:zoom_samples], color='blue')
        plt.xlim(0, zoom_duration)
        plt.xlabel('Time (s)')
        plt.ylabel('Amplitude')

        plt.subplot(5, 1, 2)
        plt.title('Predicted Waveform')
        plt.plot(time_axis[self.sequence_length:self.sequence_length + zoom_samples], predicted_waveform[:zoom_samples], color='orange')
        plt.xlim(0, zoom_duration)
        plt.xlabel('Time (s)')
        plt.ylabel('Amplitude')

        plt.subplot(5, 1, 3)
        plt.title('Combined Waveform (Input + Predicted)')
        plt.plot(time_axis[self.sequence_length:self.sequence_length + zoom_samples], combined_waveform[:zoom_samples], color='green')
        plt.xlim(0, zoom_duration)
        plt.xlabel('Time (s)')
        plt.ylabel('Amplitude')

        plt.subplot(5, 1, 4)
        plt.title('Combined vs Input')
        plt.plot(time_axis[self.sequence_length:self.sequence_length + zoom_samples], combined_waveform[:zoom_samples], color='orange', label='Combined')
        plt.plot(time_axis[self.sequence_length:self.sequence_length + zoom_samples], test_waveform[self.sequence_length:self.sequence_length + zoom_samples], color='red', label='Input')
        plt.xlim(0, zoom_duration)
        plt.xlabel('Time (s)')
        plt.ylabel('Amplitude')
        plt.legend()

        residuals = combined_waveform[:zoom_samples]
        residuals_db = 20 * np.log10(np.abs(residuals) + 1e-10)

        plt.subplot(5, 1, 5)
        plt.title('Residual Noise (dB)')
        plt.plot(time_axis[self.sequence_length:self.sequence_length + zoom_samples], residuals_db, color='red')
        plt.xlabel('Time (s)')
        plt.ylabel('Residual (dB)')
        plt.ylim(-100, 0)

        plt.tight_layout()
        plt.show()

    def save_model(self, path='waveform_model.h5'):
        self.model.save(path)
        print(f"Model saved to {path}")

    def load_model(self, path='waveform_model.h5'):
        self.model = models.load_model(path)
        print(f"Model loaded from {path}")

    def test_with_wav(self, wav_path):
        test_waveform, _ = librosa.load(wav_path, sr=self.sample_rate)
        predicted_waveform = self.predict(test_waveform)
        combined_waveform = test_waveform[self.sequence_length:] + predicted_waveform

        # Plot results
        self.plot_results(test_waveform, predicted_waveform, combined_waveform)

        # Provide audio playback
        print("Test Input Waveform:")
        display(Audio(test_waveform, rate=self.sample_rate))
        print("Predicted Inverted Waveform:")
        display(Audio(predicted_waveform, rate=self.sample_rate))
        print("Combined Waveform:")
        display(Audio(combined_waveform, rate=self.sample_rate))

    def model_summary(self):
        self.model.summary()

# Usage
predictor_tcn = WaveformPredictorTCN()
history = predictor_tcn.train()

# Generate a new test waveform for prediction
test_waveform = predictor_tcn.generate_waveform()
predicted_waveform = predictor_tcn.predict(test_waveform)

# Calculate combined waveform
combined_waveform = test_waveform[predictor_tcn.sequence_length:] + predicted_waveform

# Show model summary
predictor_tcn.model_summary()

# Plot results
predictor_tcn.plot_results(test_waveform, predicted_waveform, combined_waveform)

# Provide audio playback
print("Test Input Waveform:")
display(Audio(test_waveform, rate=predictor_tcn.sample_rate))
print("Predicted Inverted Waveform:")
display(Audio(predicted_waveform, rate=predictor_tcn.sample_rate))
print("Combined Waveform:")
display(Audio(combined_waveform, rate=predictor_tcn.sample_rate))

# Save the model for later use
predictor.save_model('waveform_model.h5')


Using CPU for training.
[1m 32553/137500[0m [32m━━━━[0m[37m━━━━━━━━━━━━━━━━[0m [1m53:15[0m 30ms/step - loss: 0.0880

In [None]:
# Test with a local WAV file
predictor.load_model('waveform_model.h5')

# Upload WAV file
uploaded = files.upload()
wav_filename = list(uploaded.keys())[0]

predictor.test_with_wav(wav_filename)