<a href="https://colab.research.google.com/github/AliHeydari1385/Unified-Quantum-Gravity-Particle-Framework/blob/main/Untitled12.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Step 0: Install packages with force-reinstall to avoid version conflicts
!pip install --force-reinstall numpy==2.0.1 scipy pandas==2.2.2 matplotlib emcee corner camb getdist zipfile36 requests



Collecting numpy==2.0.1
  Using cached numpy-2.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (60 kB)
Collecting scipy
  Using cached scipy-1.16.2-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (62 kB)
Collecting pandas==2.2.2
  Using cached pandas-2.2.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (19 kB)
Collecting matplotlib
  Using cached matplotlib-3.10.6-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (11 kB)
Collecting emcee
  Using cached emcee-3.1.6-py2.py3-none-any.whl.metadata (3.0 kB)
Collecting corner
  Using cached corner-2.2.3-py3-none-any.whl.metadata (2.2 kB)
Collecting camb
  Using cached camb-1.6.4-py3-none-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (3.0 kB)
Collecting getdist
  Using cached getdist-1.7.2-py3-none-any.whl.metadata (13 kB)
Collecting zipfile36
  Using cached zipfile36-0.1.3-py3-none-any.whl.metadata (736 bytes)
Collecting requests
  Using cached requ

Imports successful! Proceeding...


In [None]:
# Step 1: Manual upload and extract ZIP files
print("Please upload actlite_3yr_v2p2.zip and Reference.zip")
uploaded = files.upload()  # Upload the ZIP files here (button should appear)

# Extract uploaded ZIPs
for filename in uploaded.keys():
    if filename.endswith('.zip'):
        extract_to = f'/content/{filename.split(".")[0]}_extract'
        os.makedirs(extract_to, exist_ok=True)
        with zipfile.ZipFile(filename, 'r') as zip_ref:
            zip_ref.extractall(extract_to)
        print(f"Extracted {filename} to {extract_to}")


Imports successful! Proceeding...
Please upload actlite_3yr_v2p2.zip and Reference.zip


Saving actlite_3yr_v2p2.zip to actlite_3yr_v2p2.zip
Saving Reference.zip to Reference.zip
Extracted actlite_3yr_v2p2.zip to /content/actlite_3yr_v2p2_extract
Extracted Reference.zip to /content/Reference_extract


In [None]:
# UQGPF v3.1: Enhanced Unified Quantum Gravity Phenomenology Framework
# Author: Grok-4 (incorporating user feedback and file handling)
# Key Features:
# - Reads two ZIP files: actlite_3yr_v2p2.zip and Reference.zip
# - Handles nested directory structures within ZIPs using recursive glob.
# - Installs necessary libraries (emcee, corner, getdist, pandas, scipy, matplotlib) using subprocess.
# - Processes ACT+SPT CMB data, Pantheon+SH0ES SNIa data, and BAO data (placeholder if not found).
# - Implements improved physical model (UQGPF v3.0) with curvature, axion, string, neutrino terms.
# - Includes sigma clipping for SNIa data.
# - Performs MCMC analysis using emcee.
# - Generates corner plots, residuals, fit plots, and getdist analysis.
# - Creates a final ZIP archive 'UQGPF_Project_v3.1.zip' with all outputs.
# - Designed for execution in a new Google Colab notebook.

import numpy as np
import emcee
import corner
import matplotlib.pyplot as plt
from scipy.linalg import cholesky, solve_triangular
from scipy.integrate import quad, simps
from scipy.optimize import minimize
import pandas as pd
import zipfile
import os
import glob
import subprocess
import sys
from getdist import MCSamples, plots
import time
import warnings

# --- Installation of necessary libraries ---
def install_packages():
    packages = ['emcee', 'corner', 'getdist', 'pandas', 'matplotlib', 'scipy']
    for pkg in packages:
        try:
            __import__(pkg)
            print(f"{pkg} is already installed.")
        except ImportError:
            print(f"Installing {pkg}...")
            subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])
            print(f"Successfully installed {pkg}.")

# Run installation
install_packages()

# --- Constants ---
c = 299792.458  # km/s
H0_ref = 70.0  # km/s/Mpc
l_p = 1.616e-35  # Planck length in meters (for scaling)
r_d = 147.78  # Sound horizon (Mpc)

# --- Data Loading and Preprocessing ---
def extract_and_find_files(zip_filename_pattern, extract_subdir, file_patterns):
    zip_files = glob.glob(f'{zip_filename_pattern}*.zip')
    if not zip_files:
        raise FileNotFoundError(f"Could not find zip file matching pattern: {zip_filename_pattern}")
    zip_path = zip_files[0]
    print(f"Found zip file: {zip_path}")

    if not os.path.exists(extract_subdir):
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(extract_subdir)
        print(f"Successfully extracted {zip_path} to {extract_subdir}")

    found_files = []
    for pattern in file_patterns:
        files = glob.glob(os.path.join(extract_subdir, '**', pattern), recursive=True)
        if not files:
            raise FileNotFoundError(f"Could not find file matching pattern: {pattern} in {extract_subdir}")
        found_files.append(files[0])  # Assume first found is correct
        print(f"Found file: {files[0]}")
    return found_files

# Extract and load CMB data from actlite_3yr_v2p2.zip
cmb_zip_pattern = 'actlite_3yr_v2p2'
cmb_extract_dir = 'actlite_3yr_v2p2_extract'
cmb_patterns = ['ACT+SPT_cl.dat', 'ACT+SPT_cov.dat']
cmb_cl_path, cmb_cov_path = extract_and_find_files(cmb_zip_pattern, cmb_extract_dir, cmb_patterns)

# Extract and load SNIa data from Reference.zip
ref_zip_pattern = 'Reference'
ref_extract_dir = 'reference_extract'
snia_patterns = ['ES_AND_COVARPantheon%2BSH0ES.dat.txt']
snia_path = extract_and_find_files(ref_zip_pattern, ref_extract_dir, snia_patterns)[0]

# Look for BAO data in Reference.zip (adjust pattern if needed; use placeholder if not found)
bao_data_path = None
bao_files = glob.glob(os.path.join(ref_extract_dir, '**', '*bao*.csv'), recursive=True)  # Example pattern; adjust if real file exists
if bao_files:
    bao_data_path = bao_files[0]
    print(f"Found BAO data file: {bao_data_path}")

# Loading functions with error handling
def load_act_spt_data(cl_path, cov_path):
    try:
        cl_data = np.loadtxt(cl_path)
        ell = cl_data[:, 0]
        cl_tt = cl_data[:, 1]
        cl_err = cl_data[:, 2] if cl_data.shape[1] > 2 else np.sqrt(np.diag(np.loadtxt(cov_path)))  # Fallback to diag if needed
    except Exception as e:
        raise IOError(f"Error loading CMB cl data from {cl_path}: {e}")

    try:
        cov = np.loadtxt(cov_path)
        if cov.ndim == 1:
            cov = np.diag(cov)
        cov = (cov + cov.T) / 2  # Symmetrize
        chol_cov = cholesky(cov, lower=True)
        inv_cov = solve_triangular(chol_cov, np.eye(len(cov)), lower=True).T @ solve_triangular(chol_cov, np.eye(len(cov)), lower=True)
    except Exception as e:
        warnings.warn(f"Error loading/processing cov from {cov_path}: {e}. Using diagonal approximation.")
        inv_cov = np.diag(1 / cl_err**2)

    return ell, cl_tt, inv_cov, cl_err

def load_snia_data(snia_path):
    try:
        snia_df = pd.read_csv(snia_path, delim_whitespace=True, on_bad_lines='skip')
        z = snia_df['zHD'].values if 'zHD' in snia_df.columns else snia_df.iloc[:, 2].values  # Flexible column access
        mu = snia_df['MU_SH0ES'].values if 'MU_SH0ES' in snia_df.columns else snia_df.iloc[:, 10].values
        mu_err = snia_df['MU_SH0ES_ERR_DIAG'].values if 'MU_SH0ES_ERR_DIAG' in snia_df.columns else snia_df.iloc[:, 11].values
        return z, mu, mu_err
    except Exception as e:
        raise IOError(f"Error loading SNIa data from {snia_path}: {e}")

def load_bao_data(bao_path):
    if bao_path and os.path.exists(bao_path):
        try:
            bao_df = pd.read_csv(bao_path)
            z_bao = bao_df['z'].values
            dm_rd_obs = bao_df['DM_RD'].values
            dm_rd_err = bao_df['DM_RD_ERR'].values
            return z_bao, dm_rd_obs, dm_rd_err
        except Exception as e:
            warnings.warn(f"Error loading BAO data: {e}. Using placeholder.")
    # Placeholder BAO data
    print("Using placeholder BAO data.")
    return np.array([0.38, 0.51, 0.61, 2.33]), np.array([10.2, 13.4, 16.1, 37.5]), np.array([0.4, 0.5, 0.6, 1.2])

# Load all data
ell, cl_obs, inv_cov, cl_err = load_act_spt_data(cmb_cl_path, cmb_cov_path)
z_snia_full, mu_obs_full, mu_err_full = load_snia_data(snia_path)
z_bao, dm_rd_obs, dm_rd_err = load_bao_data(bao_data_path)

# Subsample option for SNIa
use_subsample = True  # Set to False for full 1701 points (longer runtime)
if use_subsample:
    n_snia_subsample = 500
    indices = np.random.choice(len(z_snia_full), n_snia_subsample, replace=False)
    z_snia = z_snia_full[indices]
    mu_obs = mu_obs_full[indices]
    mu_err = mu_err_full[indices]
    print(f"Using {n_snia_subsample} subsampled SNIa points.")
else:
    z_snia, mu_obs, mu_err = z_snia_full, mu_obs_full, mu_err_full
    print(f"Using full {len(z_snia_full)} SNIa points.")

N_cmb = len(ell)
N_snia = len(z_snia)
N_bao = len(z_bao)

# --- UQGPF Model Functions ---
def uqgpf_cmb_cl(ell, theta, scale_factor=1.0):
    Omega_m, h, gamma, f_a, m_a, lambda_s, k, ell_damp, nu_cross = theta
    term1 = (ell * (ell + 1) / (2 * np.pi)) * (Omega_m ** gamma)
    term2 = (m_a / f_a) * np.sin(ell * np.pi / 180)**2 * np.exp(-ell / ell_damp)  # Axion damping
    term3 = lambda_s * (l_p**2 / ell**2) * np.cos(ell / 10) * (1 + nu_cross * 1e-38)  # String + neutrino
    term4 = k * (ell / 1000)**2 * Omega_m  # Curvature term
    return scale_factor * (term1 + term2 + term3 + term4)

def hubble_z(z, theta):
    Omega_m, h, gamma, f_a, m_a, lambda_s, k, ell_damp, nu_cross = theta
    H0 = h * 100
    term_matter = Omega_m * (1 + z)**3
    term_lambda = (1 - Omega_m)
    term_qg = gamma * l_p**2 * (1 + z)**(-2)
    term_curvature = k * (c / H0)**2 * (1 + z)**2
    term_neutrino = nu_cross * (1 + z)**0.5
    return H0 * np.sqrt(term_matter + term_lambda + term_qg + term_curvature + term_neutrino)

def uqgpf_mu(z, theta):
    H0 = theta[1] * 100
    dl = c * (1 + z) / H0 * np.array([simps(1 / (hubble_z(zz, theta) / H0), zz)
                                      for zz in [np.linspace(0, zi, 100) for zi in z]])
    return 5 * np.log10(dl) + 25

# Sigma clipping for SNIa
def sigma_clip_snia(z, mu_obs, mu_err, theta, sigma_threshold=3.0):
    mu_model = uqgpf_mu(z, theta)
    residuals = np.abs(mu_obs - mu_model) / mu_err
    mask = residuals < sigma_threshold
    return z[mask], mu_obs[mask], mu_err[mask]

# Log likelihood with CMB + SNIa + BAO
def log_likelihood(theta, ell, cl_obs, inv_cov, z_snia, mu_obs_snia, mu_err_snia, z_bao, dm_rd_obs, dm_rd_err, scale_factor=1.0):
    cl_model = uqgpf_cmb_cl(ell, theta, scale_factor)
    delta_cl = cl_obs - cl_model
    chi2_cmb = np.dot(delta_cl, np.dot(inv_cov, delta_cl))

    mu_model = uqgpf_mu(z_snia, theta)
    chi2_snia = np.sum(((mu_obs_snia - mu_model) / mu_err_snia)**2)

    # BAO chi2
    H0 = theta[1] * 100
    dm_model = np.array([c * quad(lambda zz: 1 / (hubble_z(zz, theta) / H0), 0, zb)[0] for zb in z_bao]) / r_d
    chi2_bao = np.sum(((dm_rd_obs - dm_model) / dm_rd_err)**2)

    total_chi2 = chi2_cmb + chi2_snia + chi2_bao
    norm_chi2 = total_chi2 / (N_cmb + N_snia + N_bao - len(theta))  # Normalized
    return -0.5 * total_chi2, norm_chi2

# Log prior (wider ranges)
def log_prior(theta):
    Omega_m, h, gamma, f_a, m_a, lambda_s, k, ell_damp, nu_cross = theta
    if (0.1 < Omega_m < 0.5 and 0.5 < h < 0.9 and 0.1 < gamma < 0.4 and
        1e10 < f_a < 1e12 and 1e-10 < m_a < 1e-5 and 0.1 < lambda_s < 10 and
        -1.5 < k < 1.5 and 1000 < ell_damp < 5000 and 1e-40 < nu_cross < 1e-36):
        return 0.0
    return -np.inf

# Log probability
def log_probability(theta, *args):
    lp = log_prior(theta)
    if not np.isfinite(lp):
        return -np.inf
    ll, _ = log_likelihood(theta, *args)
    return lp + ll

# Main MCMC run
def run_mcmc(subsample=True, nwalkers=64, nsteps=100000, discard=500, thin=10, scale_factor=1.0):
    ndim = 9  # Omega_m, h, gamma, f_a, m_a, lambda_s, k, ell_damp, nu_cross
    p0 = np.random.rand(nwalkers, ndim) * 0.1 + np.array([0.3, 0.7, 0.25, 1e11, 1e-8, 1.0, 0.0, 3000, 1e-38])  # Initial guess

    # Apply sigma clipping on initial theta
    initial_theta = p0[0]
    z_snia_clip, mu_obs_clip, mu_err_clip = sigma_clip_snia(z_snia, mu_obs, mu_err, initial_theta)

    args = (ell, cl_obs, inv_cov, z_snia_clip, mu_obs_clip, mu_err_clip, z_bao, dm_rd_obs, dm_rd_err, scale_factor)

    sampler = emcee.EnsembleSampler(nwalkers, ndim, log_probability, args=args)
    try:
        sampler.run_mcmc(p0, nsteps, progress=True)
    except KeyboardInterrupt:
        pass  # Save on interrupt

    samples = sampler.get_chain(discard=discard, thin=thin, flat=True)

    # Posteriors plot
    fig = corner.corner(samples, labels=["$\Omega_m$", "$h$", "$\gamma$", "$f_a$", "$m_a$", "$\lambda_s$", "$k$", "$\ell_{damp}$", "$\nu_{cross}$"])
    fig.savefig('uqgpf_posteriors_v3.1.pdf')

    # Residuals
    best_theta = np.median(samples, axis=0)
    cl_model = uqgpf_cmb_cl(ell, best_theta)
    residuals_cmb = cl_obs - cl_model
    pd.DataFrame({'ell': ell, 'residual': residuals_cmb}).to_csv('residuals_cmb_v3.1.csv', index=False)

    mu_model = uqgpf_mu(z_snia_clip, best_theta)
    residuals_snia = mu_obs_clip - mu_model
    pd.DataFrame({'z': z_snia_clip, 'residual': residuals_snia}).to_csv('residuals_snia_v3.1.csv', index=False)

    # Getdist analysis
    gdsamples = MCSamples(samples=samples, names=["Omega_m", "h", "gamma", "f_a", "m_a", "lambda_s", "k", "ell_damp", "nu_cross"])
    g = plots.get_subplot_plotter()
    g.triangle_plot(gdsamples, filled=True)
    g.export('uqgpf_getdist_v3.1.pdf')

    # Save chains and ZIP outputs
    np.save('mcmc_samples_v3.1.npy', samples)
    with zipfile.ZipFile('UQGPF_Project_v3.1.zip', 'w') as zipf:
        zipf.write('uqgpf_posteriors_v3.1.pdf')
        zipf.write('residuals_cmb_v3.1.csv')
        zipf.write('residuals_snia_v3.1.csv')
        zipf.write('uqgpf_getdist_v3.1.pdf')
        zipf.write('mcmc_samples_v3.1.npy')

    return samples, best_theta

# Run the code
if __name__ == "__main__":
    start_time = time.time()
    samples, best_theta = run_mcmc(subsample=use_subsample, scale_factor=1.0)  # Adjust scale if needed
    print(f"Best parameters: {best_theta}")
    print(f"Execution time: {time.time() - start_time} seconds")
