# Information-Maximization Approach to Blind Separation and Blind Deconvolution

## Constants and Functions

### Imports / Libraries

In [None]:
# Set this flag to True to use GPU (CuPy) if available, or False to use CPU (NumPy/Scipy)
USE_GPU = False

# -------------------------------
# Import libraries and set backend alias
# -------------------------------
import numpy as np      # for CPU-based I/O and plotting helper conversions
import os
import pandas as pd
import matplotlib.pyplot as plt
import soundfile as sf
import time as tick

# Try to import CuPy
try:
    import cupy as cp
    gpu_available = True
except ImportError:
    gpu_available = False

# Decide which backend to use for array operations:
if USE_GPU and gpu_available:
    xp = cp
else:
    import numpy as xp  # if GPU not used or not available, use NumPy
    gpu_available = False  # force CPU mode

# For signal processing, try to use CuPy’s versions if available.
# If not, fall back to SciPy’s functions.
if USE_GPU and gpu_available:
    try:
        from cupyx.scipy.signal import firwin, freqz
        try:
            from cupyx.scipy.signal import lfilter
            has_cupyx = True
        except ImportError:
            # Fall back to SciPy's lfilter if CuPy's is not available.
            from scipy.signal import lfilter
            has_cupyx = False
    except ImportError:
        from scipy.signal import lfilter, firwin, freqz
        has_cupyx = False
else:
    from scipy.signal import lfilter, firwin, freqz
    has_cupyx = False

if USE_GPU and gpu_available:
    try:
        sliding_window_view = xp.lib.stride_tricks.sliding_window_view
    except AttributeError:
        # Fallback implementation for 1D arrays.
        def sliding_window_view(x, window_shape):
            n = x.shape[0] - window_shape + 1
            return xp.stack([x[i:i+window_shape] for i in range(n)])
else:
    from numpy.lib.stride_tricks import sliding_window_view

# Helper to convert arrays to CPU for plotting and I/O.
if USE_GPU and gpu_available:
    to_cpu = cp.asnumpy
else:
    to_cpu = lambda x: x

### Initialize and Load

In [None]:
num_trials = 10
x_speech_cpu, fs_speech = sf.read("speech.WAV")
x_speech = xp.asarray(x_speech_cpu.astype(np.float64))

# Example parameters
noise_variances = [0.1, 0.3, 1.0]
M_candidates = [64, 128, 256, 512]
L_candidates = [4, 8, 16, 32, 64]
eta_candidates = [3**(-i) for i in range(2,8)]
est_cdf_candidates = ['sigmoid', 'tanh']

print("Speech sampling rate:", fs_speech)
plt.figure()
plt.plot(to_cpu(x_speech))
plt.title("Speech Input")
plt.show()

### Create Low-Pass Filter

In [None]:
fc = 1000   # Cutoff frequency in Hz
numtaps = 501  # Filter order (should be odd)

# Design FIR filter (firwin always returns a NumPy array; we then convert to xp array)
fir_coeff_cpu = firwin(numtaps, fc, fs=fs_speech, pass_zero=True)
fir_coeff = xp.asarray(fir_coeff_cpu)

# Compute frequency response (using the chosen signal processing functions)
w, h = freqz(fir_coeff_cpu, worN=8000)
plt.figure(figsize=(8, 4))
plt.plot(to_cpu(w * fs_speech / (2 * np.pi)),
         20 * np.log10((np.abs(h))), 'b')
plt.title('FIR Lowpass Filter Frequency Response')
plt.xlabel('Frequency (Hz)')
plt.ylabel('Gain (dB)')
plt.grid()
plt.show()

### Helper Memory-less Monotonic Mapping Functions

In [None]:
def sigmoid(x):
    return 1 / (1 + xp.exp(-x))

def sigmoid_derivative(x):
    s = sigmoid(x)
    return s * (1 - s)

def tanh(x):
    return xp.tanh(x)

def tanh_derivative(x):
    return 1 - xp.tanh(x)**2

def gaussian(x):
    return xp.exp(-x**2)

def gaussian_derivative(x):
    return -2 * x * xp.exp(-x**2)

### Signal Loading and Processing

In [None]:
def unknown_plant(x, fir_coeff=None):
    """
    Applies an unknown plant operation:
      - If fir_coeff is provided, filters x using the FIR filter.
      - Otherwise, applies a recursive difference equation.
    """
    N = x.shape[0]
    if fir_coeff is not None:
        # If using GPU with cupyx support, call lfilter directly on xp arrays.
        if USE_GPU and gpu_available and has_cupyx:
            y = lfilter(fir_coeff, 1, x)
        else:
            # Otherwise, convert inputs to NumPy arrays for SciPy's lfilter
            x_cpu = to_cpu(x)
            fir_cpu = to_cpu(fir_coeff)
            y_cpu = lfilter(fir_cpu, 1, x_cpu)
            y = xp.asarray(y_cpu)  # Convert the result back to the chosen backend
    else:
        y = xp.zeros(N, dtype=x.dtype)
        for n in range(N):
            if n == 0:
                y[n] = x[n]
            elif n < 10:
                y[n] = y[n-1] + x[n]
            else:
                y[n] = y[n-1] + x[n] - x[n-10]
    return y

def add_noise(y, noise_var=0.1, seed=0):
    xp.random.seed(seed)
    noise = xp.sqrt(noise_var) * xp.random.randn(y.shape[0])
    return y + noise

### Filter Measurements

In [None]:
def get_w_true(M, FIR_coeffs=None):
    if FIR_coeffs is None:
        w_true = xp.ones(10)
    else:
        w_true = FIR_coeffs
    if M <= 10:
        return w_true
    else:
        return xp.concatenate((w_true, xp.zeros(M-10)))

def compute_weighted_snr(w_est, FIR_coeffs=None):
    M = w_est.shape[0]
    if FIR_coeffs is not None:
        w_true = get_w_true(M, FIR_coeffs)
    else:
        w_true = get_w_true(M)
    if w_true.shape[0] > w_est.shape[0]:
        w_est = xp.concatenate((w_est, xp.zeros(w_true.shape[0]-w_est.shape[0])))
    num = xp.dot(w_true, w_true)
    diff = w_true - w_est
    den = xp.dot(diff, diff)
    if den < 1e-15:
        return 999.0
    return 10.0 * xp.log10(num / den)

### Helper Plotting Functions

In [None]:
CSV_FILENAME = "all_results.csv"
PLOT_DIR = "plots"
if not os.path.exists(PLOT_DIR):
    os.makedirs(PLOT_DIR)
def plot_and_save(sig_true, sig_pred, figname, title="Comparison"):
    plt.figure(figsize=(8, 4))
    plt.plot(to_cpu(sig_true), label="Actual Output", alpha=0.7)
    plt.plot(to_cpu(sig_pred), label="Predicted Output", alpha=0.7)
    plt.title(title)
    plt.xlabel("Sample index")
    plt.ylabel("Amplitude")
    plt.legend()
    plt.tight_layout()
    plt.savefig(os.path.join(PLOT_DIR, figname), dpi=150)
def plot_comparison(d, x, y, title="", dir='/comparison'):
    length = min(d.shape[0], x.shape[0], y.shape[0])
    plt.figure(figsize=(8,4))
    plt.plot(to_cpu(d[:length]), label="True Signal", alpha=0.7)
    plt.plot(to_cpu(x[:length]), label="Noisy Observed Input", alpha=0.7)
    plt.plot(to_cpu(y[:length]), label="Predicted Input", alpha=0.7)
    plt.title(title)
    plt.xlabel("Sample index")
    plt.ylabel("Amplitude")
    plt.legend()
    plt.tight_layout()
    plt.show()
    plt.savefig(os.path.join(PLOT_DIR+dir, title + ".png"), dpi=150)
def plot_comparison_fft(d, x, y, title="", dir='/comparison_fft'):
    length = min(d.shape[0], x.shape[0], y.shape[0])
    plt.figure(figsize=(8,4))
    xticks = np.fft.fftfreq(length, 1/fs_speech)
    plt.xticks(xticks)
    plt.plot(np.abs(np.fft.fft(to_cpu(d), length)), label="True Signal", alpha=0.7)
    plt.plot(np.abs(np.fft.fft(to_cpu(x), length)), label="Noisy Observed Input", alpha=0.7)
    plt.plot(np.abs(np.fft.fft(to_cpu(y), length)), label="Predicted Input", alpha=0.7)
    plt.title(title)
    plt.xlabel("Sample index")
    plt.ylabel("Amplitude")
    plt.legend()
    plt.tight_layout()
    plt.show()
    plt.savefig(os.path.join(PLOT_DIR+dir, title + ".png"), dpi=150)
def plot_weight_tracks(avg_w_track, std_w_track, time_steps, M, title, xlabel="Sample index", ylabel="Weight", max_pts=500, dir='/weight_tracks'):
    plt.figure(figsize=(10, 5))
    for m in range(M):
        plt.plot(to_cpu(time_steps), to_cpu(avg_w_track[:, m]), label=f"w[{m}]")
    plt.title(f"Weight Tracks: {title}")
    plt.xlabel("Time Steps")
    plt.ylabel("Weight Value")
    plt.legend()
    plt.grid()
    plt.tight_layout()
    plt.savefig(os.path.join(PLOT_DIR+'/weight_tracks', title + ".png"), dpi=150)
    plt.show()

## Blind Deconvolution Code

### Blind Deconvolution Algorithms CPU

In [None]:
def info_max_deconvolution_gaussian_pdf(x, w, filter_length=50, learning_rate=0.01, non_linearity='sigmoid'):
    if w.shape[0] != filter_length:
        raise ValueError("Filter length must match the length of the initial filter w.")
    if x.shape[0] != filter_length:
        raise ValueError("Signal x must match the length of the initial filter w")
    if w is None or xp.allclose(w, xp.zeros(filter_length)):
        w = xp.random.randn(filter_length)
        w = w / xp.linalg.norm(w)
    v = xp.dot(x, w)
    if non_linearity == 'sigmoid':
        y = sigmoid(v)
        grad = 1/w + x * (1.0 - 2.0 * y)
        w_update = w + learning_rate * grad
    elif non_linearity == 'tanh':
        y = tanh(v)
        grad = 1/w - 2 * x * y
        w_update = w + learning_rate * grad
    else:
        raise ValueError("Non-linearity not implemented. Try 'sigmoid' or 'tanh'.")
    return w_update, y
def info_max_deconvolution_gaussian_pdf_simulation(x, filter_length=50, num_iterations=1000, learning_rate=0.01):
    T = x.shape[0] - filter_length + 1
    if T <= 0:
        raise ValueError("Signal x must be longer than the filter length.")
    if sliding_window_view is not None:
        X = sliding_window_view(x, window_shape=filter_length)
    else:
        X = xp.array([x[i:i+filter_length] for i in range(T)])
    w = xp.random.randn(filter_length)
    w = w / xp.linalg.norm(w)
    for it in range(num_iterations):
        y = xp.dot(X, w)
        g_y = sigmoid(y)
        error = 1.0 - 2.0 * g_y
        grad = xp.dot(X.T, error)
        w += learning_rate * grad
        w = w / xp.linalg.norm(w)
        if (it % (num_iterations // 10) == 0) or (it == num_iterations - 1):
            entropy_est = -xp.mean(xp.log(g_y * (1 - g_y) + 1e-8))
            print(f"Iteration {it}/{num_iterations}, entropy estimate: {to_cpu(entropy_est):.4f}")
    y_final = xp.dot(X, w)
    return w, y_final
def info_max_deconvolution_parzen_window_sampler(x, w, M=50, L=10, eta=0.01, est_cdf='sigmoid', kernel='gaussian'):
    if w.shape[0] != M:
        raise ValueError("M must match the length of the initial filter w.")
    if x.shape[0] != M + L:
        raise ValueError("Signal x must match the length of the initial filter (M + L) windows")
    if w is None or xp.allclose(w, xp.zeros(M)):
        w = xp.random.randn(M)
        w = w / xp.linalg.norm(w)
    if est_cdf == 'sigmoid':
        activation = sigmoid
        activation_derivative = sigmoid_derivative
    elif est_cdf == 'tanh':
        activation = tanh
        activation_derivative = tanh_derivative
    else:
        raise ValueError("Invalid est_cdf. Try 'sigmoid' or 'tanh'.")
    if kernel == 'gaussian':
        kernel_func = gaussian
        kernel_derivative_func = gaussian_derivative
    else:
        raise ValueError("Invalid kernel. Only 'gaussian' is supported.")
    
    x_fin = x[L:L+M]
    X_windows = (sliding_window_view(x, M)[:L]
                 if sliding_window_view is not None
                 else xp.array([x[i:i+M] for i in range(L)]))
    y_fin = xp.dot(x_fin, w)
    Y_fin = xp.full((L, 1), y_fin)
    Y_windows = xp.reshape(xp.dot(X_windows, w), (L, 1))
    Z_fin = activation(Y_fin)
    Z_windows = activation(Y_windows)
    xt_fin = x_fin * activation_derivative(y_fin)
    Xt_fin = xp.repeat(xt_fin[xp.newaxis, :], L, axis=0)
    Xt_windows = X_windows * activation_derivative(Y_windows)
    kernel_weight = kernel_derivative_func(Z_fin - Z_windows)
    chain_rule_kernel_weight = Xt_fin - Xt_windows
    entropy_grad = xp.mean(kernel_weight * chain_rule_kernel_weight, axis=0)
    w_new = w - eta * entropy_grad
    return w_new
def info_max_deconvolution_parzen_window_sampler_simulation(x, d, M=50, L=10, eta=0.01,
                                                            est_cdf='sigmoid', kernel='gaussian', trial=None):
    x = xp.asarray(x)
    N = x.shape[0]
    if trial is not None:
        xp.random.seed(trial)
    x = xp.pad(x, (M+L-1, 0), mode='constant')
    w = xp.random.randn(M)
    w = w / xp.linalg.norm(w)
    weight_tracks = xp.zeros((N, M))
    predictions = xp.zeros(N)
    for i in range(N):
        segment = x[i: i + M + L]
        x_fin = segment[L:L+M]
        current_prediction = xp.dot(x_fin, w)
        new_w = info_max_deconvolution_parzen_window_sampler(segment, w, M, L, eta, est_cdf, kernel)
        y = xp.dot(x_fin, new_w)
        if xp.isnan(y):
            print("y is NaN")
        predictions[i] = current_prediction
        weight_tracks[i] = new_w
        w = new_w
    return weight_tracks, predictions

### Blind Deconvolution GPU

In [None]:
def info_max_deconvolution_parzen_window_sampler_gpu(x, w, M=50, L=10, eta=0.01, est_cdf='sigmoid', kernel='gaussian'):
    if w.shape[0] != M:
        raise ValueError("M must match the length of the initial filter w.")
    if x.shape[0] != M + L:
        raise ValueError("Signal x must match the length of the initial filter M + L previous windows")
    if w is None or cp.allclose(w, cp.zeros(M)):
        w = cp.random.randn(M)
        w = w / cp.linalg.norm(w)
    if est_cdf == 'sigmoid':
        activation = sigmoid
        activation_derivative = sigmoid_derivative
    elif est_cdf == 'tanh':
        activation = tanh
        activation_derivative = tanh_derivative
    else:
        raise ValueError("Non-linearity is either not implemented, or invalid. Try 'sigmoid' or 'tanh'.")
    if kernel == 'gaussian':
        kernel_func = gaussian
        kernel_derivative_func = gaussian_derivative
    else:
        raise ValueError("Window is either not implemented, or invalid. Try 'gaussian'.")
    
    x_fin = x[L:L+M]
    X_windows = cp.lib.stride_tricks.sliding_window_view(x, M)[:L]
    y_fin = cp.dot(x_fin, w)
    Y_fin = cp.full((L, 1), y_fin)
    Y_windows = cp.reshape(cp.dot(X_windows, w), (L, 1))
    Z_fin = activation(Y_fin)
    Z_windows = activation(Y_windows)
    xt_fin = x_fin * activation_derivative(y_fin)
    Xt_fin = cp.repeat(xt_fin[cp.newaxis, :], L, axis=0)
    Xt_windows = X_windows * activation_derivative(Y_windows)
    kernel_weight = kernel_derivative_func(Z_fin - Z_windows)
    chain_rule_kernel_weight = Xt_fin - Xt_windows
    entropy_grad = cp.mean(kernel_weight * chain_rule_kernel_weight, axis=0)
    w_new = w - eta * entropy_grad
    return w_new

def info_max_deconvolution_parzen_window_sampler_simulation_gpu(x, d, M=50, L=10, eta=0.01,
                                                            est_cdf='sigmoid', kernel='gaussian', trial=None):
    x = cp.asarray(x)
    N = x.shape[0]
    if trial is not None:
        cp.random.seed(trial)
    x = cp.pad(x, (M+L-1, 0), mode='constant')
    w = cp.random.randn(M)
    w = w / cp.linalg.norm(w)
    weight_tracks = cp.zeros((N, M))
    predictions = cp.zeros(N)
    for i in range(N):
        segment = x[i: i + M + L]
        x_fin = segment[L:L+M]
        current_prediction = cp.dot(x_fin, w)
        new_w = info_max_deconvolution_parzen_window_sampler(segment, w, M, L, eta, est_cdf, kernel)
        y = cp.dot(x_fin, new_w)
        if cp.isnan(y):
            print("y is NaN")
        predictions[i] = current_prediction
        weight_tracks[i] = new_w
        w = new_w
    return weight_tracks, predictions

### Cross-Validation Grid Search for Blind Deconvolution

In [None]:
def grid_search_deconvolution(x, d, M=50, L=10, candidate_etas=None, candidate_est_cdfs=None, num_trials=10):
    if candidate_etas is None:
        candidate_etas = [0.001, 0.005, 0.01, 0.05, 0.1]
    if candidate_est_cdfs is None:
        candidate_est_cdfs = ['sigmoid', 'tanh']
    performance_dict = {}
    best_avg_nmse = xp.inf
    best_params = None
    best_trial_outputs = None

    for eta_candidate in candidate_etas:
        for est_candidate in candidate_est_cdfs:
            trial_namses = xp.zeros(num_trials)
            trial_predictions = xp.zeros((num_trials, x.shape[0]))
            trial_weights = xp.zeros((num_trials, x.shape[0], M))
            for trial in range(num_trials):
                tw, tp = info_max_deconvolution_parzen_window_sampler_simulation(
                    x, d, M=M, L=L, eta=eta_candidate, est_cdf=est_candidate, kernel='gaussian', trial=trial)
                trial_weights[trial] = tw
                trial_predictions[trial] = tp
                valid_mask = ~xp.isnan(tp)
                y_pred = tp[valid_mask]
                d_valid = d[valid_mask]
                y_pred_norm = xp.linalg.norm(y_pred)
                d_valid_norm = xp.linalg.norm(d_valid)
                if y_pred.size > 0:
                    amse = xp.mean((d_valid - y_pred*(d_valid_norm/y_pred_norm)) ** 2)
                    namse = amse / d_valid_norm
                else:
                    namse = xp.nan
                trial_namses[trial] = namse
            avg_weights = xp.nanmean(trial_weights, axis=0)
            std_weights = xp.nanstd(trial_weights, axis=0)
            avg_predictions = xp.nanmean(trial_predictions, axis=0)
            std_predictions = xp.nanstd(trial_predictions, axis=0)
            avg_namse = xp.nanmean(trial_namses)
            std_namse = xp.nanstd(trial_namses)
            performance_dict[(eta_candidate, est_candidate)] = (avg_namse, std_namse)
            print(f"Candidate eta={eta_candidate}, est_cdf={est_candidate} --> Avg NMSE: {to_cpu(avg_namse):.6f} +/- {to_cpu(std_namse):.6f}")
            if avg_namse < best_avg_nmse:
                best_avg_nmse = avg_namse
                best_params = (eta_candidate, est_candidate)
                best_trial_outputs = {
                    'avg_weights': avg_weights,
                    'avg_predictions': avg_predictions,
                    'std_weights': std_weights,
                    'std_predictions': std_predictions
                }
    print(f"\nBest parameters: eta={best_params[0]}, est_cdf={best_params[1]} with average NMSE: {to_cpu(best_avg_nmse):.6f}")
    return best_avg_nmse, best_params, best_trial_outputs, performance_dict

## Results for different M and L values

### Testing Script

In [None]:
# I was using this block to test stuff out, ignore this

### True Search

In [None]:
best_params_list = []  # to store best (eta, est_cdf) tuples for each candidate run
p1_df_deconv = pd.DataFrame(columns=['Input', 'Noise Variance', 'Model Order',
                                       'Best Eta', 'Best est_cdf', 'W-SNR', 'NMSE'])

for input_name, d in zip(["speech"], [x_speech]):
    print(f"\n=== {input_name.upper()} Input ===")
    N = d.shape[0]
    best_y_pred = None
    best_namse = xp.inf

    for nv in noise_variances:
        d_noisy = add_noise(d, noise_var=nv)
        x_noisy = unknown_plant(d_noisy, fir_coeff=fir_coeff)
        print(f"  Noise variance={nv}")
        for M in M_candidates:
            for L in L_candidates:
                time_steps = xp.arange(N)
                x_w = x_noisy[:N]
                d_w = d_noisy[:N]
                t_start = tick.perf_counter()
                best_avg_namse, best_params, best_stat_outputs, performance = grid_search_deconvolution(
                    x=x_w, d=d_w, M=M, L=L,
                    candidate_etas=eta_candidates,
                    candidate_est_cdfs=est_cdf_candidates,
                    num_trials=num_trials
                )
                t_stop = tick.perf_counter()
                best_eta, best_est_cdf = best_params
                best_params_list.append(best_params)
                y_avg = best_stat_outputs['avg_predictions']
                avg_weights = best_stat_outputs['avg_weights']
                if avg_weights[-1] is not None:
                    final_w_est = avg_weights[-1]
                    w_snr = compute_weighted_snr(final_w_est, fir_coeff)
                else:
                    w_snr = xp.nan
                print(f"M={M}, best_eta={best_eta}, best_est_cdf={best_est_cdf}, W-SNR={to_cpu(w_snr):.2f} dB, "
                      f"NMSE={to_cpu(best_avg_namse):.6f}, Time to find best params={t_stop-t_start:.2f} s")
                title_str_val = (f"{input_name.upper()}, noise={nv}, M={M}, eta={best_eta}, est_cdf={best_est_cdf}\n"
                                 f"W-SNR_val={to_cpu(w_snr):.2f} dB, NMSE={to_cpu(best_avg_namse):.6f}")
                x_w_normalized = x_w / xp.linalg.norm(x_w)
                d_w_normalized = d_w / xp.linalg.norm(d_w)
                y_avg_normalized = y_avg / xp.linalg.norm(y_avg)
                mask = ~xp.isnan(y_avg_normalized)
                plot_comparison(d_w_normalized[mask], x_w_normalized[mask], y_avg_normalized[mask], title=title_str_val)
                plot_comparison_fft(d_w_normalized[mask], x_w_normalized[mask], y_avg_normalized[mask], title=title_str_val)
                plot_weight_tracks(avg_weights, best_stat_outputs['std_weights'], time_steps, M, title_str_val)
                if best_avg_namse < best_namse:
                    print(f"  ** New best NMSE found: {to_cpu(best_avg_namse):.6f} **")
                    best_namse = best_avg_namse
                    best_y_pred = y_avg
                new_row = pd.DataFrame([[input_name, nv, M, best_eta, best_est_cdf,
                                          to_cpu(w_snr) if USE_GPU and gpu_available else w_snr,
                                          to_cpu(best_avg_namse) if USE_GPU and gpu_available else best_avg_namse]],
                                       columns=p1_df_deconv.columns)
                p1_df_deconv = pd.concat([p1_df_deconv, new_row], ignore_index=True)
    
    out_fname = f"speech_imax_pred.wav"
    nonnan_best_y_pred = best_y_pred[~xp.isnan(best_y_pred)]
    sf.write(out_fname, to_cpu(nonnan_best_y_pred).reshape(-1, 1), fs_speech)

avg_best_eta = np.mean([params[0] for params in best_params_list])
print(f"\nAverage best eta: {avg_best_eta:.6f}")
p1_df_deconv.to_csv(os.path.join(PLOT_DIR, "deconv_grid_search_results.csv"), index=False)

In [None]:
p1_df_deconv