In [56]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from scipy.stats import norm

"""
Gaussian Mixture Model (GMM) using PyTorch
This code implements a simple Gaussian Mixture Model (GMM) using PyTorch.
It includes methods for fitting the model to data and calculating the likelihood of new data points.
"""

class GaussianMixture(nn.Module):
    def __init__(self, n_components):
        super(GaussianMixture, self).__init__()
        self.n_components = n_components
        
        # Initialize parameters
        self.means = nn.Parameter(torch.linspace(0.1, 0.9, n_components))
        self.stds = nn.Parameter(torch.ones(n_components) * 0.1)  # Smaller initial std
        self.weights = nn.Parameter(torch.ones(n_components) / n_components)
        self.best_likelihood = -float('inf')
        self.best_params = None
        
    def _compute_responsibilities(self, x):
        # Expand dimensions for broadcasting
        x = x.unsqueeze(-1)  # Shape: (n_samples, 1)
        
        # Calculate Gaussian probability for each component
        gaussian_probs = torch.exp(-0.5 * ((x - self.means) / self.stds)**2) / (self.stds * torch.sqrt(torch.tensor(2 * np.pi)))
        
        # Weight the probabilities
        weighted_probs = self.weights * gaussian_probs
        
        # Sum probabilities for all components and normalize to get responsibilities
        total_probs = torch.sum(weighted_probs, dim=1, keepdim=True)
        responsibilities = weighted_probs / (total_probs + 1e-10)
        return responsibilities, torch.sum(torch.log(total_probs + 1e-10))

    def _m_step(self, x, responsibilities):
        # Update parameters based on responsibilities
        total_resp = responsibilities.sum(0)
        
        # Update means
        self.means.data = (responsibilities * x.unsqueeze(-1)).sum(0) / (total_resp + 1e-10)
        
        # Update standard deviations
        variance = (responsibilities * (x.unsqueeze(-1) - self.means)**2).sum(0) / (total_resp + 1e-10)
        self.stds.data = torch.sqrt(variance + 1e-10)
        
        # Update weights
        self.weights.data = total_resp / x.shape[0]

    
    def fit(self, data, n_iterations=100, n_restarts=10, tol=1e-6):
        """
        Fit GMM using EM algorithm with multiple random restarts
        """
        x = torch.from_numpy(data).float()
        best_likelihood = -float('inf')
        
        for restart in range(n_restarts):
            # Random initialization
            with torch.no_grad():
                self.means.data = torch.FloatTensor(self.n_components).uniform_(0.1, 0.9)
                self.stds.data = torch.ones(self.n_components) * 0.1
                self.weights.data = torch.ones(self.n_components) / self.n_components
            
            prev_likelihood = -float('inf')
            
            for iteration in range(n_iterations):
                # E-step
                responsibilities, log_likelihood = self._compute_responsibilities(x)
                
                # M-step
                self._m_step(x, responsibilities)
                
                ## Ensure parameters stay in valid ranges
                #with torch.no_grad():
                #    self.means.data = torch.clamp(self.means.data, 0.0, 1.0)
                #    self.stds.data = torch.clamp(self.stds.data, 0.001, 0.5)
                #    self.weights.data = torch.softmax(self.weights.data, dim=0)
                
                # Check convergence
                if abs(log_likelihood - prev_likelihood) < tol:
                    break
                    
                prev_likelihood = log_likelihood
                
                # Track best parameters
                if log_likelihood > best_likelihood:
                    best_likelihood = log_likelihood
                    self.best_likelihood = log_likelihood.item()
                    self.best_params = {
                        'means': self.means.data.clone(),
                        'stds': self.stds.data.clone(),
                        'weights': self.weights.data.clone()
                    }
                    print(f'Restart {restart}, Iteration {iteration}: Log Likelihood {log_likelihood.item():.4f}')
        
        # Restore best parameters
        with torch.no_grad():
            self.means.data = self.best_params['means']
            self.stds.data = self.best_params['stds']
            self.weights.data = self.best_params['weights']
            
        #return self.best_likelihood
           




def fit_gmm_to_ab(ind_dat, n_components):
    """
    Fit Gaussian Mixture Model (GMM) to allele balance data.
    
    Parameters:
        ab_dat (np.array): Allele balance data.
        n_components (int): Number of components in the GMM.
    """
    dat = ind_dat
    #print(ind_dat.shape)
    #print(len(ind_dat.shape))
    # Reshape the array to 2D if necessary
    # Example: allele_balance_array = np.random.rand(100, 10)  # Replace with actual data
    if len(ind_dat.shape) == 1:
        dat = ind_dat.reshape(-1, 1)
        #print(dat.shape)
    # Fit GMM to allele balance data   
    gmm = GaussianMixture(n_components = n_components)
    gmm.fit(dat)
    # Print best likelihood
    print(f'Best likelihood: {gmm.best_likelihood}')
    # Print fitted parameters
    print("Fitted GMM parameters:")
    print(f"Means: {gmm.best_params['means'].numpy()}")
    print(f"Std devs: {gmm.best_params['stds'].numpy()}")
    print(f"Weights: {gmm.best_params['weights'].numpy()}")
    return(gmm)

def plot_gmm_fit(data, gmm, n_points=1000, title="GMM Fit to Data"):
    """
    Plot histogram of observed data with fitted GMM components.
    
    Parameters:
        data (np.array): Original data used to fit the GMM
        gmm (GaussianMixture): Fitted GMM model
        n_points (int): Number of points for plotting the GMM curves
        title (str): Plot title
    """
    # Create figure
    plt.figure(figsize=(10, 6))
    
    # Plot histogram of observed data
    plt.hist(data, bins=50, density=True, alpha=0.5, label='Observed Data')
    
    # Generate points for plotting the GMM components
    x = np.linspace(min(data), max(data), n_points)
    x_tensor = torch.from_numpy(x).float()
    
    # Plot individual components
    for i in range(gmm.n_components):
        # Calculate Gaussian distribution for this component
        mu = gmm.means.data[i].item()
        sigma = gmm.stds.data[i].item()
        weight = gmm.weights.data[i].item()
        
        component = weight * norm.pdf(x, mu, sigma)
        plt.plot(x, component, '--', label=f'Component {i+1}')
    
    # Plot total mixture
    total = torch.zeros_like(x_tensor)
    for i in range(gmm.n_components):
        mu = gmm.means.data[i]
        sigma = gmm.stds.data[i]
        weight = gmm.weights.data[i]
        component = weight * torch.exp(-0.5 * ((x_tensor - mu) / sigma)**2) / (sigma * torch.sqrt(torch.tensor(2 * np.pi)))
        total += component
    
    plt.plot(x, total.numpy(), 'r-', label='Total Mixture', linewidth=2)
    
    plt.xlabel('Value')
    plt.ylabel('Density')
    plt.title(title)
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()



#allele_balance_array = np.random.rand(1000, 3)
ab_left = np.random.normal(0.25, 0.05, (500, 4))
ab_middle = np.random.normal(0.5, 0.05, (1000, 4))
ab_right = np.random.normal(0.75, 0.05, (500,4))
allele_balance_array = np.concatenate([ab_left, ab_middle, ab_right], axis=0)
#print(allele_balance_array)
allele_mask_array = np.random.randint(0, 2, size=(2000, 4))
ab_dat = np.array([allele_balance_array,allele_mask_array])
#print(ab_dat)
for i in range(len(ab_dat[0,0,:])):
    print(i)
    ind_dat = ab_dat[0,:,i]
    #print(ind_dat)
    ind_mask = (ab_dat[1,:,i] == 1)
    #print(ind_mask)
    ind_dat_filtered = ind_dat[ind_mask]
    print(ind_dat_filtered)
    gmm = fit_gmm_to_ab(ind_dat_filtered, 3)
    
    #plot_gmm_fit(ind_dat_filtered, gmm, title="Allele Balance Distribution")

#print(allele_balance_array)
#print(allele_balance_array[:,2])
#allele_balance_df = pd.DataFrame(allele_balance_array, columns=[f"Sample_{i}" for i in range(1, 11)])
#allele_balance_df['Sample_ID'] = [f"Sample_{i}" for i in range(1, 101)]
#allele_balance_df.set_index('Sample_ID', inplace=True)
#allele_balance_df = allele_balance_df.transpose()
#print(allele_balance_df)
#allele_balance_df = allele_balance_df.reset_index()
#allele_balance_df.rename(columns={'index': 'Sample_ID'}, inplace=True)

0
[0.25821231 0.30172295 0.23200198 0.29394135 0.21472745 0.32247892
 0.2849275  0.20660196 0.22767541 0.27185435 0.18857985 0.18129039
 0.19257389 0.33281773 0.34008468 0.25019763 0.22274705 0.25178482
 0.24413654 0.26029173 0.33902724 0.29094962 0.28267677 0.22850129
 0.23284266 0.22946836 0.23489306 0.28680289 0.32755954 0.23569595
 0.31756162 0.21015094 0.29145329 0.2789894  0.29074686 0.20574142
 0.18003506 0.19918101 0.35782841 0.26085658 0.19307014 0.28677103
 0.20727517 0.21902598 0.20245988 0.25986811 0.21292731 0.2905099
 0.33710651 0.18783341 0.25711817 0.1487615  0.23424912 0.32314208
 0.18832677 0.24289654 0.25341691 0.23948789 0.28848627 0.28789422
 0.27865951 0.23201972 0.27019054 0.13135476 0.15460793 0.25859746
 0.35336115 0.24119133 0.28740981 0.25826318 0.25174257 0.36044485
 0.29174526 0.25755114 0.1900859  0.23193826 0.28825481 0.24293227
 0.28120394 0.30597476 0.30340595 0.21712643 0.25794815 0.31237413
 0.26151397 0.26288547 0.28484754 0.29991491 0.22416667 0.225