In [10]:
import numpy as np
from typing import Optional, Union, Tuple

def calculate_G_matrix(M: np.ndarray, 
                      method: str = "vanraden1", 
                      missing_value: Optional[Union[int, float]] = None,
                      min_maf: float = 0.01) -> np.ndarray:
    """
    Calculate the genomic relationship matrix G using VanRaden (2008) methods.
    
    Parameters:
    -----------
    M : numpy.ndarray
        Genotype matrix with dimensions (n, m), 
        where n is the number of individuals, m is the number of markers.
        M_ij is the genotype of individual i at marker j (0, 1, 2).
    method : str, optional
        Method to calculate G matrix. Options:
        - "vanraden1": G = ZZ'/total_variance (default)
        - "vanraden2": G = (M-P)(M-P)'/sum(2pq)
    missing_value : int or float, optional
        Value used to represent missing genotypes. If provided, will be replaced with mean.
    min_maf : float, optional
        Minimum minor allele frequency threshold. Markers with MAF < min_maf will be excluded.
        Default is 0.01 (1%).
    
    Returns:
    --------
    G : numpy.ndarray
        Genomic relationship matrix G with dimensions (n, n).
    
    References:
    -----------
    VanRaden, P. M. (2008). Efficient methods to compute genomic predictions.
    Journal of Dairy Science, 91(11), 4414-4423.
    """
    # Get the dimensions of matrix M
    n, m = M.shape  # n: number of individuals, m: number of markers
    
    # Handle missing values if specified
    M_clean = M.copy()
    if missing_value is not None:
        mask = M_clean == missing_value
        if np.any(mask):
            for j in range(m):
                col_mask = mask[:, j]
                if np.any(col_mask):
                    # Replace missing with mean of non-missing values
                    non_missing = M_clean[~col_mask, j]
                    if len(non_missing) > 0:
                        M_clean[col_mask, j] = np.mean(non_missing)
    
    # Step 1: Calculate allele frequency p_j for each marker
    p = np.sum(M_clean, axis=0) / (2 * n)  # Minor allele frequency
    
    # Filter markers by MAF
    valid_markers = np.where((p >= min_maf) & (p <= 1 - min_maf))[0]
    if len(valid_markers) < m:
        print(f"Filtered out {m - len(valid_markers)} markers with MAF < {min_maf}")
        if len(valid_markers) == 0:
            raise ValueError(f"No markers left after MAF filtering. Consider lowering min_maf.")
        M_clean = M_clean[:, valid_markers]
        p = p[valid_markers]
        m = len(valid_markers)
    
    # Step 2: Create matrix P with P_ij = 2 * p_j
    P = np.tile(2 * p, (n, 1))  # Create n x m matrix from vector 2*p
    
    # Step 3: Calculate matrix Z = M - P
    Z = M_clean - P
    
    # Step 4: Calculate denominator (total variance)
    total_variance = np.sum(2 * p * (1 - p))
    
    # Method implementation
    if method.lower() == "vanraden1":
        
        # Step 5: Normalize Z
        std_dev = np.sqrt(2 * p * (1 - p))
        # Avoid division by zero (for very low variance markers)
        std_dev[std_dev < 1e-10] = 1e-10
        W = Z / std_dev
        
        # Calculate G matrix
        G = np.dot(W, W.T) / total_variance
        
    elif method.lower() == "vanraden2":
        # Alternative method directly using Z
        G = np.dot(Z, Z.T) / total_variance
        
    else:
        raise ValueError(f"Unknown method: {method}. Use 'vanraden1' or 'vanraden2'.")
    
    return G

# Example usage
if __name__ == "__main__":
    # Sample genotype matrix (3 individuals, 4 markers)
    M_sample = np.array([
        [0, 1, 2, 1],
        [1, 2, 1, 0],
        [2, 1, 0, 1]
    ])
    G = calculate_G_matrix(M_sample)
    print("G matrix:")
    print(G)
    
    # With missing values (coded as -9)
    M_missing = np.array([
        [0, 1, 2, -9],
        [1, -9, 1, 0],
        [2, 1, -9, 1]
    ])
    G_imputed = calculate_G_matrix(M_missing, missing_value=-9, method="vanraden1")
    print("\nG matrix with imputed missing values:")
    print(G_imputed)

G matrix:
[[ 2.38235294 -0.52941176 -1.85294118]
 [-0.52941176  1.05882353 -0.52941176]
 [-1.85294118 -0.52941176  2.38235294]]

G matrix with imputed missing values:
[[ 1.97419355 -0.05806452 -1.91612903]
 [-0.05806452  0.37741935 -0.31935484]
 [-1.91612903 -0.31935484  2.23548387]]


In [68]:
import numpy as np
from scipy.linalg import inv, pinv

def estimate_variance_components(y: np.ndarray, X: np.ndarray, Z: np.ndarray, G: np.ndarray, 
                                 epsilon=1e-2, max_iter=100, regularization=1e-5
                                 ) -> Tuple[float, float]:
    """
    Estimate the genomic variance (sigma_g^2) and error variance (sigma_e^2) using the REML method with EM algorithm.

    Parameters:
    - y: numpy array, phenotype vector (n x 1)
    - X: numpy array, design matrix for fixed effects (n x p)
    - Z: numpy array, design matrix for random effects (n x q)
    - G: numpy array, genomic relationship matrix (q x q)
    - epsilon: float, convergence threshold
    - max_iter: int, maximum number of iterations

    Returns:
    - sigma_g2: float, estimated genomic variance
    - sigma_e2: float, estimated error variance
    """
    # Get dimensions
    n, p = X.shape  # n: number of observations, p: number of fixed effects
    q = Z.shape[1]  # q: number of random effects
    
    # Initialize variance components with starting values
    var_y = np.var(y)
    sigma_g2 = 0.5 * var_y  # Initial guess for genomic variance
    sigma_e2 = 0.5 * var_y  # Initial guess for error variance
    
    # Precompute inverse of G for efficiency
    G_inv = inv(G)
    
    # Iterative process
    for iteration in range(max_iter):
        print("Iteration", iteration)
        print("sigma_g2:", sigma_g2, "sigma_e2:", sigma_e2)
        # Step 1: Compute the variance matrix V
        V = Z @ (G * sigma_g2) @ Z.T + np.eye(n) * sigma_e2
        
        # Step 2: Compute the inverse of V
        try:
            V_inv = inv(V)
        except np.linalg.LinAlgError:
            print("Warning: V matrix is near-singular. Adding regularization to proceed.")
            # Add regularization to diagonal elements
            V_reg = V + np.eye(n) * regularization
            V_inv = inv(V_reg)
        
        # Step 3: Compute the projection matrix P
        XVX = X.T @ V_inv @ X
        try:
            XVX_inv = inv(XVX)
        except np.linalg.LinAlgError:
            print("Warning: Singular matrix XVX. Using pseudo-inverse instead.")
            # Add regularization to diagonal elements
            XVX_reg = XVX + np.eye(p) * regularization
            XVX_inv = inv(XVX_reg)

        P = V_inv - V_inv @ X @ XVX_inv @ X.T @ V_inv
        
        # Step 4: Update sigma_e^2 using y' P y
        yPy = y.T @ P @ y
        sigma_e2_new = yPy / (n - p)
        
        # Step 5: Compute temporary estimate of random effects (g_hat)
        g_hat = sigma_g2 * Z.T @ P @ y
        
        # Step 6: Compute the covariance matrix C_gg
        C_gg = sigma_g2 * np.eye(q) - sigma_g2**2 * Z.T @ P @ Z
        
        # Step 7: Update sigma_g^2 using g_hat and trace term
        term1 = g_hat.T @ G_inv @ g_hat
        term2 = np.trace(G_inv @ C_gg)
        sigma_g2_new = (term1 + term2) / q
        
        # Step 8: Check convergence
        if abs(sigma_g2_new - sigma_g2) < epsilon and abs(sigma_e2_new - sigma_e2) < epsilon:
            print("EM algorithm converged after", iteration, "iterations!")
            break
        
        # Update variance components for the next iteration
        sigma_g2_new = max(sigma_g2_new, 1e-6)
        sigma_e2_new = max(sigma_e2_new, 1e-6)
        sigma_g2 = sigma_g2_new
        sigma_e2 = sigma_e2_new
    
    return sigma_g2, sigma_e2

In [69]:
def calculate_pev_and_accuracy(sigma_g: float, 
                               sigma_e: float, 
                               C_inv: np.ndarray,
                               n: int   # number of individuals
                               ) -> Tuple[np.ndarray, np.ndarray]:
    """
    Calculate the Prediction Error Variance (PEV) and accuracy for each individual.

    Parameters:
    - sigma_g: float, genomic variance (sigma_g)
    - sigma_e: float, error variance (sigma_e)
    - C_inv: numpy array, inverse of the coefficient matrix C from the mixed model equations

    Returns:
    - PEV: numpy array, prediction error variance for each individual
    - accuracy: numpy array, accuracy for each individual
    """

    # C_inv is the inverse of the coefficient matrix C from the mixed model equations
    # Extract the lower right block of C_inv corresponding to G^-1
    C_gg = C_inv[-n:, -n:]

    # Extract the diagonal elements of C^gg
    diag_C_gg = np.diag(C_gg)

    # Calculate PEV for each individual
    PEV = sigma_g - diag_C_gg * sigma_e

    # Calculate accuracy for each individual
    accuracy = np.sqrt(1 - PEV / sigma_g)

    return PEV, accuracy

In [70]:
def solve_gblup(y: np.ndarray,
                X: np.ndarray, 
                Z: np.ndarray, 
                G: np.ndarray,
                sigma_g: float,
                sigma_e: float, 
                reg_factor: float = 1e-5
                ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """
    Solve the GBLUP equation system to find beta and g using numpy.linalg.solve.
    
    Parameters
    ----------
    X : numpy.ndarray
        Design matrix for fixed effects (n x p).
    Z : numpy.ndarray
        Design matrix for random effects (n x q).
    G : numpy.ndarray
        Genomic relationship matrix (q x q).
    lambda_ : float
        Lambda ratio = sigma_e^2 / sigma_g^2.
    y : numpy.ndarray
        Phenotypes vector (n x 1).
    reg_factor : float, optional
        Regularization factor added to the diagonal for numerical stability.
        Default is 1e-6.
    
    Returns
    -------
    beta : numpy.ndarray
        Estimated fixed effects vector (p x 1).
    g : numpy.ndarray
        Predicted genomic breeding values vector (q x 1).
    
    Notes
    -----
    This function implements the GBLUP method to estimate fixed effects and 
    genomic breeding values by solving a mixed linear model equation system.
    The approach combines fixed and random effects to predict genomic breeding 
    values, crucial in genomic selection studies.
    """
    
    # Step 1: Validate input dimensions
    n = X.shape[0]
    p = X.shape[1]
    q = Z.shape[1]
    
    if X.shape[0] != Z.shape[0]:
        raise ValueError("X and Z must have the same number of rows (samples).")
    if G.shape[0] != q:
        raise ValueError("G must be a square matrix of size q x q.")
    if y.shape[0] != n or y.shape[1] != 1:
        raise ValueError("y must be a column vector of length n.")
    
    # Step 2: Compute matrix components
    XtX = np.dot(X.T, X)
    XtZ = np.dot(X.T, Z)
    ZtX = XtZ.T  # Since XtZ is (p x q), ZtX is (q x p)
    ZtZ = np.dot(Z.T, Z)
    
    # Step 3: Compute inverse of G with regularization
    try:
        # Compute condition number to assess singularity
        cond_G = np.linalg.cond(G)
        if cond_G > 1e10:  # High condition number indicates ill-conditioning
            print("Warning: G matrix is ill-conditioned. Adding regularization.")
            G_reg = G + np.eye(G.shape[0]) * reg_factor
            G_inv = np.linalg.inv(G_reg)
        else:
            G_inv = np.linalg.inv(G)
    except np.linalg.LinAlgError:
        print("G matrix is singular. Adding regularization to proceed.")
        G_reg = G + np.eye(G.shape[0]) * reg_factor
        G_inv = np.linalg.inv(G_reg)
    
    # Step 4: Construct matrix A with regularization
    lambda_ = sigma_e / sigma_g
    A_upper = np.hstack((XtX, XtZ))
    A_lower = np.hstack((ZtX, ZtZ + lambda_ * G_inv))
    A = np.vstack((A_upper, A_lower))
    
    # Add regularization to A for numerical stability
    A_reg = A + np.eye(A.shape[0]) * reg_factor
    
    # Step 5: Construct vector b
    Xty = np.dot(X.T, y)
    Zty = np.dot(Z.T, y)
    b = np.vstack((Xty, Zty))
    
    # Step 6: Solve the equation system A_reg * x = b
    try:
        x = np.linalg.solve(A_reg, b)
    except np.linalg.LinAlgError as e:
        print(f"Error solving the system: {e}")
        raise
    
    # Step 7: Split beta and g
    fixed_effects = p
    beta = x[:fixed_effects]
    g = x[fixed_effects:]

    # Calculate PEV: Predicted Error Variance
    A_inv = np.linalg.inv(A_reg)
    PEV, acc = calculate_pev_and_accuracy(sigma_g, sigma_e, A_inv, n)
    
    return beta, g, PEV, acc

In [None]:
# Example usage
if __name__ == "__main__":
    
    # Create example data with specified dimensions
    np.random.seed(2025)
    n_samples = 1000  # Number of samples/observations
    n_markers = 10  # Number of markers
    
    # X matrix: n samples, 5 fixed effects
    X = np.ones((n_samples, 5))
    # add random values to X with normal distribution mean=0, std=0.5
    X[:, 1:] = np.random.normal(0, 0.5, size=(n_samples, 4))
    
    # Z matrix: samples x markers (random 0, 1, 2 genotypes)
    M = np.random.randint(0, 3, size=(n_samples, n_markers))
    Z = np.eye(n_samples)  # Identity matrix for simplicity
    
    # Calculate genomic relationship matrix G
    G = calculate_G_matrix(M, method='vanraden1')
    print("G matrix:")
    print(G)
    if not np.all(np.linalg.eigvals(G) > 0):
        print("Warning: G matrix is not positive definite. Adding regularization.")
        G = G + np.eye(G.shape[0]) * 1e-4
    
    # Generate random phenotypes
    y = np.random.normal(0, 1, size=(n_samples, 1))

    # Estimate variance components
    sigma_g, sigma_e = estimate_variance_components(y, X, Z, G)
    print(f"Genomic variance (sigma_g^2): {sigma_g:.6f}")
    print(f"Error variance (sigma_e^2): {sigma_e}")
    
    # Solve GBLUP equation system
    beta, g, PEV, acc = solve_gblup(y, X, Z, G, sigma_g, sigma_e)
    print("\nEstimated fixed effects beta:")
    print(beta)
    print("\nEstimated genomic breeding values g:")
    print(g)
    print("\nPrediction Error Variance (PEV):")
    print(PEV)
    print("\nAccuracy:")
    print(acc)

In [43]:
np.all(np.linalg.eigvals(G + np.eye(G.shape[0]) * 1e-05) > 0)

True

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import pearsonr, spearmanr

def analyze_genomic_correlations(y: np.ndarray, g: np.ndarray, X=None, beta=None, plot=True):
    """
    Calculate correlations between genomic breeding values and phenotypes,
    with optional analysis of fixed effects contribution.
    
    Parameters:
    -----------
    y : np.ndarray
        Observed phenotypes (n x 1)
    g : np.ndarray
        Estimated genomic breeding values (n x 1)
    X : np.ndarray, optional
        Design matrix for fixed effects (n x p)
    beta : np.ndarray, optional
        Fixed effects estimates (p x 1)
    plot : bool, optional
        Whether to create correlation plots (default: True)
        
    Returns:
    --------
    dict : Dictionary of correlation statistics
    """
    results = {}
    
    # Calculate direct correlation between g and y
    pearson_corr, pearson_p = pearsonr(g.flatten(), y.flatten())
    spearman_corr, spearman_p = spearmanr(g.flatten(), y.flatten())
    
    results['g_y_pearson'] = (pearson_corr, pearson_p)
    results['g_y_spearman'] = (spearman_corr, spearman_p)
    
    # Calculate fixed effects contribution if provided
    if X is not None and beta is not None:
        fixed_effects = np.dot(X, beta)
        results['fixed_var'] = np.var(fixed_effects)
        results['random_var'] = np.var(g)
        results['total_var'] = np.var(y)
        
        # Calculate proportion of variance explained
        results['fixed_proportion'] = results['fixed_var'] / results['total_var']
        results['random_proportion'] = results['random_var'] / results['total_var']
    
    # Create plots if requested
    if plot:
        # Plot 1: g vs y
        plt.figure(figsize=(12, 5))
        
        plt.subplot(1, 2, 1)
        plt.scatter(g, y, alpha=0.5, edgecolor='k')
        plt.xlabel('Genomic Breeding Values (g)')
        plt.ylabel('Phenotypes (y)')
        plt.title(f'Correlation: {pearson_corr:.3f} (p={pearson_p:.3e})')
        plt.grid(alpha=0.3)
        
        # Add regression line
        m, b = np.polyfit(g.flatten(), y.flatten(), 1)
        x_line = np.linspace(min(g), max(g), 100)
        y_line = m * x_line + b
        plt.plot(x_line, y_line, 'r--', label="regression line")

        # Add y = x line
        # plt.plot([min(y), max(y)], [min(y), max(y)], 'k--', alpha=1, label="y = x line")
        plt.legend()
        
        # Plot 2: Histogram of genomic values
        plt.subplot(1, 2, 2)
        plt.hist(g, bins=50, alpha=0.7, edgecolor='k', density=True)
        plt.xlabel('Genomic Breeding Values (g)')
        plt.ylabel('Frequency')
        plt.title(f'Distribution of Genomic Values\nMean={np.mean(g):.3f}, SD={np.std(g):.3f}')
        plt.tight_layout()
        plt.show()
        
    return results

# Run the analysis
corr_results = analyze_genomic_correlations(y, g, X, beta)

# Print summary
print(f"Correlation between genomic breeding values and phenotypes:")
print(f"Pearson:  r = {corr_results['g_y_pearson'][0]:.4f}, p = {corr_results['g_y_pearson'][1]:.4e}")
print(f"Spearman: ρ = {corr_results['g_y_spearman'][0]:.4f}, p = {corr_results['g_y_spearman'][1]:.4e}")

if 'fixed_proportion' in corr_results:
    print(f"\nVariance proportions:")
    print(f"Fixed effects: {corr_results['fixed_proportion']*100:.2f}%")
    print(f"Genomic values: {corr_results['random_proportion']*100:.2f}%")
    print(f"Unexplained: {(1-corr_results['fixed_proportion']-corr_results['random_proportion'])*100:.2f}%")