# SN Ia секторный анализ (δΛ / δσ²)

Источник данных выбирается автоматически:
1) ваш CSV (`SN_CSV`),
2) **Pantheon+** из локального файла `lcparams.txt` (`SN_PPLUS_PATH`),
3) **JLA** через `sndata.jla` (если установлен пакет `sndata`),
4) синтетика (fallback).

HEALPix разбиение выполняется через `astropy-healpix` (без `healpy`).
Перевод `dpar → Δμ(z)` сделан как плейсхолдер — замените на ваш расчёт из half-dilation уравнения.

## 0. Импорты и проверка окружения

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from typing import Optional
import importlib, traceback, os

try:
    from scipy.optimize import minimize
    HAVE_SCIPY = True
except Exception:
    HAVE_SCIPY = False

try:
    from astropy.cosmology import FlatLambdaCDM
    from astropy.coordinates import SkyCoord
    import astropy.units as u
    HAVE_ASTROPY = True
except Exception:
    HAVE_ASTROPY = False

try:
    from astropy_healpix import HEALPix
    HAVE_AHPIX = True
except Exception:
    HAVE_AHPIX = False

try:
    import sndata
    HAVE_SNDATA = True
except Exception:
    HAVE_SNDATA = False

print('scipy:', HAVE_SCIPY, '| astropy:', HAVE_ASTROPY, '| astropy-healpix:', HAVE_AHPIX, '| sndata:', HAVE_SNDATA)

scipy: True | astropy: True | astropy-healpix: True | sndata: True


## 1. Параметры анализа

In [4]:
NSIDE = 2                  # HEALPix разрешение (12*NSIDE^2)
GRID_DIV = (4, 2)          # fallback-сетка (lon x lat), если нет astropy-healpix
Z_MIN = 0.02               # отсечка низких z
USE_SIGMA2 = False         # True → dpar трактуется как δσ², False → как δΛ
KAPPA_HD = 1.0             # (Λ c^2)/3 = κ * σ²
C = 299792.458             # km/s
H0_GUESS = 70.0
OMEGA_M_GUESS = 0.3
M_ABS_GUESS = -19.3

# Пути к данным (заполните любой из них):
SN_CSV = ''                # CSV с колонками: z, ra_deg, dec_deg, mu, mu_err
SN_PPLUS_PATH = ''         # Путь к Pantheon+ lcparams.txt

RNG_SEED = 123
np.random.seed(RNG_SEED)

## 2. Загрузка данных (CSV → Pantheon+ файл → JLA via sndata → синтетика)

In [5]:
def load_sn_from_csv(csv_path: str) -> pd.DataFrame:
    df = pd.read_csv(csv_path)
    required = {'z','ra_deg','dec_deg','mu','mu_err'}
    missing = required - set(df.columns)
    if missing:
        raise ValueError(f'Missing columns: {missing}')
    return df

def load_pantheonplus_from_file(path: str) -> Optional[pd.DataFrame]:
    """Чтение Pantheon+ lcparams.txt без sndata (гибкий парсер)."""
    if not path or not os.path.exists(path):
        print(f'Pantheon+: файл не найден: {path}')
        return None
    tries = [
        dict(delim_whitespace=True, comment='#', engine='python'),
        dict(sep=r'\s+|\t+|,', engine='python', comment='#', na_values=['NA','NaN'])
    ]
    df = None
    for kw in tries:
        try:
            t = pd.read_csv(path, **kw)
            if len(t)>0:
                df = t; break
        except Exception:
            pass
    if df is None or len(df)==0:
        print('Pantheon+: не удалось прочитать таблицу.')
        return None
    low = {c.lower(): c for c in df.columns}
    def pick(*names):
        for n in names:
            if n in df.columns: return n
            if n.lower() in low: return low[n.lower()]
        return None
    zcol  = pick('z','zhd','zcmb','zhel','z_cmb')
    racol = pick('ra','ra_deg','radeg')
    decol = pick('dec','dec_deg','dcdeg')
    mucol = pick('mu','mu_d','MU')
    muerr = pick('mu_err','sigma_mu','dmu','MUERR')
    if mucol is None:
        print('Pantheon+: колонка μ не найдена — нужен файл с готовым μ.')
        return None
    out = pd.DataFrame()
    out['z'] = df[zcol].astype(float)
    out['ra_deg']  = df[racol].astype(float) if racol else np.nan
    out['dec_deg'] = df[decol].astype(float) if decol else np.nan
    out['mu'] = df[mucol].astype(float)
    out['mu_err'] = df[muerr].astype(float) if (muerr and muerr in df.columns) else 0.15
    return out

def load_jla_via_sndata() -> Optional[pd.DataFrame]:
    try:
        spec = importlib.util.find_spec('sndata')
        if spec is None:
            print('JLA: пакет sndata не установлен.'); return None
        from sndata.jla import JLA
        JLA.download_module_data()
        tbl = JLA.load_table('jla_lcparams.txt')
        cols = {c.lower(): c for c in tbl.columns}
        def pick(*names):
            for n in names:
                if n in tbl.columns: return n
                if n.lower() in cols: return cols[n.lower()]
            return None
        zcol  = pick('zcmb','z','zhel')
        racol = pick('ra','RA')
        decol = pick('dec','DEC')
        mucol = pick('mu','MU')
        muerr = pick('mu_err','dmu','sigma_mu')
        if mucol is None:
            print('JLA: поле μ не найдено — пропуск.'); return None
        df = tbl[[zcol, racol, decol, mucol]].copy()
        df.columns = ['z','ra_deg','dec_deg','mu']
        df['mu_err'] = tbl[muerr].values if (muerr and muerr in tbl.columns) else 0.15
        return df
    except Exception:
        print('JLA: ошибка импорта/загрузки — подробности ниже:')
        traceback.print_exc()
        return None

def get_sn_dataframe() -> pd.DataFrame:
    # 1) CSV
    if SN_CSV:
        print('Загружаю CSV:', SN_CSV)
        return load_sn_from_csv(SN_CSV)
    # 2) Pantheon+ из файла
    if SN_PPLUS_PATH:
        print('Загружаю Pantheon+ из файла:', SN_PPLUS_PATH)
        df = load_pantheonplus_from_file(SN_PPLUS_PATH)
        if df is not None:
            return df
    # 3) JLA через sndata
    df = load_jla_via_sndata()
    if df is not None:
        print('Загружено JLA через sndata.')
        return df
    # 4) Синтетика
    print('Нет реальных данных — создаю синтетический пример…')
    N = 1200
    ra = np.random.uniform(0,360,N)
    dec = np.degrees(np.arcsin(np.random.uniform(-1,1,N)))
    z = np.random.uniform(0.02, 0.8, N)
    mu_true = 5*np.log10(3000*z) + 25
    mu_err = np.full(N, 0.15)
    mu_obs = mu_true + np.random.normal(0, mu_err)
    return pd.DataFrame({'z':z,'ra_deg':ra,'dec_deg':dec,'mu':mu_obs,'mu_err':mu_err})

sn = get_sn_dataframe()
sn = sn[sn['z']>=Z_MIN].reset_index(drop=True)
sn.head()

JLA: ошибка импорта/загрузки — подробности ниже:
Нет реальных данных — создаю синтетический пример…


Traceback (most recent call last):
  File "C:\Users\ATH516\AppData\Local\Temp\ipykernel_29996\1475728735.py", line 56, in load_jla_via_sndata
    from sndata.jla import JLA
ImportError: cannot import name 'JLA' from 'sndata.jla' (C:\Users\ATH516\AppData\Roaming\Python\Python313\site-packages\sndata\jla\__init__.py)


Unnamed: 0,z,ra_deg,dec_deg,mu,mu_err
0,0.529754,250.728907,39.710681,40.855938,0.15
1,0.78315,103.010161,0.630604,41.614473,0.15
2,0.272625,81.666523,68.569606,39.448378,0.15
3,0.122469,198.473317,18.431943,37.956563,0.15
4,0.154575,259.008829,10.747187,38.345845,0.15


## 3. HEALPix разбиение (astropy-healpix) и fallback-сетка

In [None]:
def assign_sector_ahpix(ra_deg, dec_deg, nside: int):
    hpix = HEALPix(nside=nside, order='ring', frame='icrs')
    sc = SkyCoord(ra=ra_deg*u.deg, dec=dec_deg*u.deg, frame='icrs')
    return hpix.skycoord_to_healpix(sc)

def assign_sector_grid(ra_deg, dec_deg, nlon: int, nlat: int):
    ra_bin = np.floor(ra_deg / (360/nlon)).astype(int)
    lat_edges = np.linspace(-90, 90, nlat+1)
    lat_bin = np.digitize(dec_deg, lat_edges) - 1
    lat_bin = np.clip(lat_bin, 0, nlat-1)
    return ra_bin + nlon * lat_bin

if HAVE_AHPIX and HAVE_ASTROPY:
    sn['sector'] = assign_sector_ahpix(sn['ra_deg'].values, sn['dec_deg'].values, NSIDE)
    N_SECT = 12 * NSIDE * NSIDE
else:
    sn['sector'] = assign_sector_grid(sn['ra_deg'].values, sn['dec_deg'].values, GRID_DIV[0], GRID_DIV[1])
    N_SECT = GRID_DIV[0]*GRID_DIV[1]

sn['sector'].nunique(), N_SECT

## 4. Космология (μ-модель) и плейсхолдер δΛ→Δμ

In [None]:
def mu_model_flat_LCDM(z, H0, Om):
    if HAVE_ASTROPY:
        cosmo = FlatLambdaCDM(H0=H0, Om0=Om)
        dl = cosmo.luminosity_distance(z).to('pc').value
        mu = 5*np.log10(dl) - 5
        return mu
    # Простая аппроксимация, достаточная для синтетики
    q0 = 0.5*Om - (1-Om)
    dl_mpc = (C/H0) * z * (1 + (1 - q0)*z/2)
    mu = 5*np.log10(dl_mpc*1e6) - 5
    return mu

def sector_offset_param(delta_val, use_sigma2=False, kappa=1.0):
    """Плейсхолдер: Δμ(z) = α * δΛ  (или через δσ²: δΛ = 3 κ δσ² / c²).
    Замените на численный расчёт через HD-Фридман.
    """
    if use_sigma2:
        delta_lambda = (3.0/(C**2)) * kappa * delta_val
    else:
        delta_lambda = delta_val
    alpha = 0.2  # чувствительность (условная константа)
    return lambda z: alpha * delta_lambda

## 5. Фит по секторам

In [None]:
def chi2_sector(params, z, mu_obs, mu_err, use_sigma2=False, kappa=1.0):
    H0, Om, Mabs, dpar = params
    mu_th = mu_model_flat_LCDM(z, H0, Om) + Mabs
    mu_th = mu_th + sector_offset_param(dpar, use_sigma2, kappa)(z)
    return np.sum(((mu_obs - mu_th)/mu_err)**2)

def fit_sector(df_sec, init=None, use_sigma2=False, kappa=1.0):
    z = df_sec['z'].values
    mu_obs = df_sec['mu'].values
    mu_err = df_sec['mu_err'].values
    if init is None:
        init = np.array([H0_GUESS, OMEGA_M_GUESS, M_ABS_GUESS, 0.0])
    if not HAVE_SCIPY:
        H0_grid = np.linspace(H0_GUESS-5, H0_GUESS+5, 7)
        Om_grid = np.linspace(OMEGA_M_GUESS-0.1, OMEGA_M_GUESS+0.1, 7)
        M_grid  = np.linspace(M_ABS_GUESS-0.5, M_ABS_GUESS+0.5, 11)
        d_grid  = np.linspace(-0.02, 0.02, 21)
        best, best_val = None, np.inf
        for H0 in H0_grid:
            for Om in Om_grid:
                for M in M_grid:
                    for d in d_grid:
                        val = chi2_sector((H0,Om,M,d), z, mu_obs, mu_err, use_sigma2, kappa)
                        if val < best_val:
                            best_val, best = val, (H0,Om,M,d)
        return best, best_val
    else:
        res = minimize(lambda p: chi2_sector(p, z, mu_obs, mu_err, use_sigma2, kappa), init,
                       method='Nelder-Mead')
        return res.x, res.fun

sectors = []
for s in range(sn['sector'].nunique()):
    df_sec = sn[sn['sector']==s]
    if len(df_sec) < 20:
        continue
    pars, chi2 = fit_sector(df_sec, use_sigma2=USE_SIGMA2, kappa=KAPPA_HD)
    H0_s, Om_s, Mabs_s, dpar_s = pars
    sectors.append({'sector': s, 'N': len(df_sec), 'H0':H0_s, 'Om':Om_s, 'Mabs':Mabs_s, 'dpar':dpar_s, 'chi2':chi2})

sec_df = pd.DataFrame(sectors).sort_values('sector').reset_index(drop=True)
sec_df

## 6. Карта значений `dpar` и простые графики

In [None]:
npix = sn['sector'].nunique()
vals = np.full(npix, np.nan)
for _, row in sec_df.iterrows():
    s = int(row['sector'])
    if s < npix:
        vals[s] = row['dpar']

plt.figure(); plt.title('Секторные значения dpar');
plt.plot(np.arange(npix), vals, 'o-'); plt.xlabel('sector id'); plt.ylabel('dpar'); plt.tight_layout(); plt.show()

if HAVE_AHPIX and HAVE_ASTROPY:
    hpix = HEALPix(nside=NSIDE, order='ring', frame='icrs')
    ipix = np.arange(12*NSIDE*NSIDE)
    sc = hpix.healpix_to_skycoord(ipix)
    ra_cent = sc.ra.deg; dec_cent = sc.dec.deg
    pix_vals = np.full_like(ra_cent, np.nan, dtype=float)
    for _, row in sec_df.iterrows():
        pix_vals[int(row['sector'])] = row['dpar']
    ra_rad = np.deg2rad(ra_cent); ra_rad = np.remainder(ra_rad+2*np.pi, 2*np.pi); ra_rad[ra_rad>np.pi] -= 2*np.pi
    dec_rad = np.deg2rad(dec_cent)
    plt.figure(); ax = plt.subplot(111, projection='mollweide')
    sca = ax.scatter(ra_rad, dec_rad, c=pix_vals, s=50)
    ax.set_title('Карта dpar (Mollweide)'); plt.tight_layout(); plt.show()
else:
    print('Нет astropy/astropy-healpix — проекцию Mollweide пропускаем.')

## 7. Бутстрап dpar по секторам (черновой каркас)

In [None]:
def bootstrap_dpar(sn_df, n_boot=100, use_sigma2=False, kappa=1.0):
    dpars = []
    for _ in range(n_boot):
        idx = np.random.choice(len(sn_df), len(sn_df), replace=True)
        boot = sn_df.iloc[idx].copy()
        pars, _ = fit_sector(boot, use_sigma2=use_sigma2, kappa=kappa)
        dpars.append(pars[-1])
    return np.array(dpars)

boot_stats = []
for s in sorted(sn['sector'].unique()):
    df_sec = sn[sn['sector']==s]
    if len(df_sec) < 20:
        continue
    bs = bootstrap_dpar(df_sec, n_boot=50, use_sigma2=USE_SIGMA2, kappa=KAPPA_HD)
    boot_stats.append({'sector':s, 'dpar_mean':float(np.mean(bs)), 'dpar_std':float(np.std(bs))})
pd.DataFrame(boot_stats)

## 8. Дальнейшие шаги
- Заменить плейсхолдер `sector_offset_param` на точный расчёт $Δμ(z)$ из half-dilation уравнения.
- Добавить BAO/CC, разбиение по красным смещениям, учёт bulk-flow.
- Статистика: Δχ², AIC/BIC, bootstrap/джекнайф, контроль look-elsewhere.