In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from scipy.special import wofz

# Helper function: Voigt profile computation
def voigt_profile(x, sigma, gamma):
    """
    Calculate the Voigt profile at x given Gaussian sigma and Lorentzian gamma.
    """
    z = (x + 1j * gamma) / (sigma * torch.sqrt(torch.tensor(2.0)))
    return torch.real(torch.exp(-z**2) * torch.tensor(wofz(z.numpy())))


In [None]:
DIM = 256

# Define the Encoder
class HybridEncoder(nn.Module):
    def __init__(self, input_dim, gaussian_dim, voigt_dim):
        super(HybridEncoder, self).__init__()
        self.fc1 = nn.Linear(input_dim, DIM)
        # Gaussian parameters
        self.fc_mu_g = nn.Linear(DIM, gaussian_dim)
        self.fc_sigma_g = nn.Linear(DIM, gaussian_dim)
        # Voigt parameters
        self.fc_mu_v = nn.Linear(DIM, voigt_dim)
        self.fc_sigma_v = nn.Linear(DIM, voigt_dim)
        self.fc_gamma_v = nn.Linear(DIM, voigt_dim)
        self.relu = nn.ReLU()
    
    def forward(self, x):
        h = self.relu(self.fc1(x))
        # Gaussian latent space
        mu_g = self.fc_mu_g(h)
        sigma_g = torch.exp(0.5 * self.fc_sigma_g(h))  # Ensure positivity
        # Voigt latent space
        mu_v = self.fc_mu_v(h)
        sigma_v = torch.exp(0.5 * self.fc_sigma_v(h))  # Ensure positivity
        gamma_v = torch.exp(0.5 * self.fc_gamma_v(h))  # Ensure positivity
        return mu_g, sigma_g, mu_v, sigma_v, gamma_v

# Define the Decoder
class Decoder(nn.Module):
    def __init__(self, latent_dim, output_dim):
        super(Decoder, self).__init__()
        self.fc1 = nn.Linear(latent_dim, DIM)
        self.fc2 = nn.Linear(DIM, output_dim)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()  # Adjust activation based on your data range
    
    def forward(self, z):
        h = self.relu(self.fc1(z))
        recon_x = self.sigmoid(self.fc2(h))
        return recon_x

# Define the VAE model
class HybridVAE(nn.Module):
    def __init__(self, input_dim, gaussian_dim, voigt_dim):
        super(HybridVAE, self).__init__()
        self.encoder = HybridEncoder(input_dim, gaussian_dim, voigt_dim)
        self.decoder = Decoder(gaussian_dim + voigt_dim, input_dim)
    
    def forward(self, x):
        mu_g, sigma_g, mu_v, sigma_v, gamma_v = self.encoder(x)
        z_g = self.reparameterize_gaussian(mu_g, sigma_g)
        z_v = self.reparameterize_voigt(mu_v, sigma_v, gamma_v)
        z = torch.cat([z_g, z_v], dim=1)  # Combine Gaussian and Voigt latent spaces
        recon_x = self.decoder(z)
        return recon_x, mu_g, sigma_g, mu_v, sigma_v, gamma_v

    def reparameterize_gaussian(self, mu, sigma):
        """
        Reparameterization for Gaussian distribution.
        """
        eps = torch.randn_like(sigma)
        return mu + eps * sigma

    def reparameterize_voigt(self, mu, sigma, gamma):
        """
        Reparameterization for Voigt distribution using Gaussian and Lorentzian widths.
        """
        gaussian_sample = sigma * torch.randn_like(mu)
        lorentzian_sample = gamma * torch.tan(torch.pi * (torch.rand_like(mu) - 0.5))
        return mu + gaussian_sample + lorentzian_sample

# Define the loss function
def hybrid_vae_loss(recon_x, x, mu_g, sigma_g, mu_v, sigma_v, gamma_v):
    recon_loss = nn.MSELoss()(recon_x, x)  # Reconstruction loss
    # Gaussian KL divergence
    kld_gaussian = -0.5 * torch.sum(1 + sigma_g.log() - mu_g.pow(2) - sigma_g.exp())
    # Voigt KL divergence
    kld_voigt = (
        -0.5 * torch.sum(1 + sigma_v.log() - mu_v.pow(2) - sigma_v.exp())
        + torch.sum(gamma_v - gamma_v.log())
    )
    return recon_loss + (kld_gaussian + kld_voigt) / x.size(0)

def hybrid_vae_loss1(recon_x, x, mu_g, sigma_g, mu_v, sigma_v, gamma_v):
    reconstruction_loss = nn.MSELoss()(recon_x, x)
    # KL divergence for the Gaussian and Voigt components
    kl_loss_g = -0.5 * torch.sum(1 + torch.log(sigma_g.pow(2)) - mu_g.pow(2) - sigma_g.pow(2))
    kl_loss_v = -0.5 * torch.sum(1 + torch.log(sigma_v.pow(2)) - mu_v.pow(2) - sigma_v.pow(2) - gamma_v.pow(2))
    
    # Combine the reconstruction loss with KL divergence
    loss = reconstruction_loss + kl_loss_g + kl_loss_v
    return loss


In [None]:
import numpy as np
import matplotlib.pyplot as plt

def gaussian(x, center, width, intensity):
    """Gaussian peak function."""
    return intensity * np.exp(-((x - center) ** 2) / (2 * width ** 2))

def lorentzian(x, center, width, intensity):
    """Lorentzian peak function."""
    return intensity * (width ** 2 / ((x - center) ** 2 + width ** 2))

def generate_raman_spectrum(x, peaks):
    """
    Generate a synthetic Raman spectrum.
    :param x: Array of Raman shift values.
    :param peaks: List of (center, width, intensity, type) for each peak.
                  type: 'gaussian' or 'lorentzian'.
    :return: Array of intensity values.
    """
    spectrum = np.zeros_like(x)
    for center, width, intensity, peak_type in peaks:
        if peak_type == "gaussian":
            spectrum += gaussian(x, center, width, intensity)
        elif peak_type == "lorentzian":
            spectrum += lorentzian(x, center, width, intensity)
    # Add a random baseline signal
    spectrum += 0.05 * np.sin(0.01 * x) + 0.02 * np.random.normal(size=len(x))
    return spectrum

# Generate synthetic spectra
num_spectra = 100
x = np.linspace(100, 4000, 1024)  # Raman shift range (100-4000 cm^-1)
spectra = []

for _ in range(num_spectra):
    num_peaks = np.random.randint(3, 7)  # Number of peaks in the spectrum
    peaks = []
    for _ in range(num_peaks):
        center = np.random.uniform(200, 3800)  # Peak position
        width = np.random.uniform(10, 50)     # Peak width
        intensity = np.random.uniform(0.5, 1.5)  # Peak intensity
        peak_type = np.random.choice(["gaussian", "lorentzian"])
        peaks.append((center, width, intensity, peak_type))
    spectrum = generate_raman_spectrum(x, peaks)
    spectra.append(spectrum)

spectra = np.array(spectra)

# Plot a few example spectra
plt.figure(figsize=(10, 6))
for i in range(5):
    plt.plot(x, spectra[i], label=f"Spectrum {i+1}")
plt.xlabel("Raman Shift (cm⁻¹)")
plt.ylabel("Intensity (a.u.)")
plt.title("Synthetic Raman Spectra")
plt.legend()
plt.show()


In [None]:
import torch
from torch.utils.data import TensorDataset, DataLoader

# Convert spectra to a PyTorch tensor
spectra_tensor = torch.tensor(spectra, dtype=torch.float32)

# Prepare DataLoader
dataset = TensorDataset(spectra_tensor)
data_loader = DataLoader(dataset, batch_size=16, shuffle=True)

# Initialize the Hybrid VAE
input_dim = spectra.shape[1]
gaussian_dim = 512   # Gaussian latent space dimensionality
voigt_dim = 512      # Voigt latent space dimensionality
vae = HybridVAE(input_dim, gaussian_dim, voigt_dim)

# Optimizer
optimizer = optim.Adam(vae.parameters(), lr=1e-3)

# Training loop
epochs = 500
for epoch in range(epochs):
    vae.train()
    train_loss = 0
    for batch in data_loader:
        xx = batch[0]
        optimizer.zero_grad()
        recon_x, mu_g, sigma_g, mu_v, sigma_v, gamma_v = vae(xx)
        loss = hybrid_vae_loss(recon_x, xx, mu_g, sigma_g, mu_v, sigma_v, gamma_v)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    print(f"Epoch {epoch+1}, Loss: {train_loss / len(data_loader):.4f}")

    # Plot reconstruction every few epochs
    if epoch % 10 == 0:
        vae.eval()
        with torch.no_grad():
            sample = spectra_tensor[0:1]  # Take a single spectrum
            recon, _, _, _, _, _ = vae(sample)
            plt.plot(x, sample.numpy()[0], label="Original Spectrum")
            plt.plot(x, recon.numpy()[0], label="Reconstructed Spectrum", linestyle="--")
            plt.legend()
            plt.show()    

# Save the model
torch.save(vae.state_dict(), "hybrid_vae_raman.pth")


Reconstruction

In [None]:
x = np.linspace(100, 4000, 1024)
vae.eval()
sample = spectra_tensor[1:2]  # Take a single spectrum for reconstruction
print(sample)
original_spectrum = sample.numpy().squeeze() 
print(len(x),len(original_spectrum))
reconstructed, _, _, _, _, _ = vae(sample)

reconstructed_spectrum = reconstructed.detach().numpy().squeeze() 
print(reconstructed)

# Plot original and reconstructed spectra
plt.figure(figsize=(10, 6))
plt.plot(x, original_spectrum, label="Original Spectrum")
plt.plot(x, reconstructed_spectrum, label="Reconstructed Spectrum", linestyle="--")
plt.xlabel("Raman Shift (cm⁻¹)")
plt.ylabel("Intensity (a.u.)")
plt.title("Original vs Reconstructed Spectrum")
plt.legend()
plt.show()


Plot Latent Space Components

In [None]:
# After the training loop, let's extract and visualize latent components
import matplotlib.pyplot as plt

# Extract latent components (mean, std, gamma) for a few samples from the dataset
vae.eval()  # Set model to evaluation mode

# Take a batch from the dataset
sample = spectra_tensor[0:10]  # Taking the first 10 spectra as an example
with torch.no_grad():  # No need to compute gradients
    mu_g, sigma_g, mu_v, sigma_v, gamma_v = vae.encoder(sample)

# Convert the latent components to numpy arrays for plotting
mu_g = mu_g.numpy()  # Gaussian means
sigma_g = sigma_g.numpy()  # Gaussian standard deviations
mu_v = mu_v.numpy()  # Voigt means
sigma_v = sigma_v.numpy()  # Voigt Gaussian widths
gamma_v = gamma_v.numpy()  # Voigt Lorentzian widths

# Plot Gaussian means and standard deviations
plt.figure(figsize=(10, 6))
plt.subplot(2, 1, 1)
plt.plot(mu_g.T, label="Gaussian Mean (mu_g)")
plt.plot(sigma_g.T, label="Gaussian Std Dev (sigma_g)")
plt.xlabel("Sample Index")
plt.ylabel("Value")
plt.title("Gaussian Latent Space Components")
plt.legend()

# Plot Voigt means, Gaussian widths, and Lorentzian widths
plt.subplot(2, 1, 2)
plt.plot(mu_v.T, label="Voigt Mean (mu_v)")
plt.plot(sigma_v.T, label="Voigt Gaussian Width (sigma_v)")
plt.plot(gamma_v.T, label="Voigt Lorentzian Width (gamma_v)")
plt.xlabel("Sample Index")
plt.ylabel("Value")
plt.title("Voigt Latent Space Components")
plt.legend()

plt.tight_layout()
plt.show()


In [None]:
from sklearn.decomposition import PCA

# Take a batch from the dataset
sample = spectra_tensor[0:100]  # Taking the first 100 spectra as an example

with torch.no_grad():
    mu_g, sigma_g, mu_v, sigma_v, gamma_v = vae.encoder(sample)

# Combine the Gaussian and Voigt components into a single latent space (concatenating)
latent_space = torch.cat([mu_g, mu_v], dim=1).numpy()  # Concatenate Gaussian and Voigt means

# Apply PCA to reduce the dimensions to 2D for visualization
pca = PCA(n_components=2)
latent_space_2d = pca.fit_transform(latent_space)

# Plot the 2D latent space
plt.figure(figsize=(10, 6))
plt.scatter(latent_space_2d[:, 0], latent_space_2d[:, 1], c='blue', label='Latent Space')
plt.xlabel("Latent Dimension 1")
plt.ylabel("Latent Dimension 2")
plt.title("2D Visualization of the Latent Space")
plt.legend()
plt.show()


In [None]:

# Load data (example using random tensors)
input_dim = 1024  # Number of intensity values per spectrum
gaussian_dim = 5   # Gaussian latent space dimensionality
voigt_dim = 5      # Voigt latent space dimensionality
num_samples = 100
spectra = torch.rand(num_samples, input_dim)  # Replace with your data

# Prepare DataLoader
dataset = TensorDataset(spectra)
data_loader = DataLoader(dataset, batch_size=16, shuffle=True)

# Initialize model, optimizer, and train
vae = HybridVAE(input_dim, gaussian_dim, voigt_dim)
optimizer = optim.Adam(vae.parameters(), lr=1e-3)

# Training loop
epochs = 50
for epoch in range(epochs):
    vae.train()
    train_loss = 0
    for batch in data_loader:
        x = batch[0]
        optimizer.zero_grad()
        recon_x, mu_g, sigma_g, mu_v, sigma_v, gamma_v = vae(x)
        loss = hybrid_vae_loss(recon_x, x, mu_g, sigma_g, mu_v, sigma_v, gamma_v)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    print(f"Epoch {epoch+1}, Loss: {train_loss / len(data_loader):.4f}")

# Save the model
torch.save(vae.state_dict(), "hybrid_vae_raman.pth")