In [19]:
# ===== DC2 TS-MAP WORKFLOW (robust downloader + analysis) =====

# --- imports
from threeML import Powerlaw
from cosipy import FastTSMap, SpacecraftFile
from cosipy.response import FullDetectorResponse
import astropy.units as u
from histpy import Histogram
from astropy.time import Time
import numpy as np
from astropy.coordinates import SkyCoord
from pathlib import Path
from matplotlib import pyplot as plt
import gc, shutil, os
from cosipy.util import fetch_wasabi_file

# --- all data live here
data_dir = Path("/data01/grb")
data_dir.mkdir(parents=True, exist_ok=True)

# optional: keep plots under /data01/grb/plots
plots_dir = data_dir / "plots"
plots_dir.mkdir(parents=True, exist_ok=True)

# --- helper: download if missing
def ensure_file(remote_key: str, local_path: Path, unzip_to: Path | None = None):
    """
    Download remote_key -> local_path if not present. 
    If local_path is a ZIP and unzip_to is provided, unzip there and remove the ZIP.
    """
    if local_path.exists():
        return
    fetch_wasabi_file(remote_key, local_path)
    if local_path.suffix == ".zip" and unzip_to is not None:
        shutil.unpack_archive(local_path, extract_dir=unzip_to)
        try:
            local_path.unlink()  # delete zip
        except FileNotFoundError:
            pass

# --- robust DC2 response fetch: try multiple keys, unzip, auto-pick .h5
def ensure_dc2_response():
    key_candidates = [
        "COSI-SMEX/DC2/Responses/SMEXv12.Continuum.HEALPixO3_10bins_log_flat.binnedimaging.imagingresponse_nonsparse_nside8.area.good_chunks_unzip.h5.zip",
        "COSI-SMEX/DC2/Responses/SMEXv12.Continuum.HEALPixO3_10bins_log_flat.binnedimaging.imagingresponse_nonsparse_nside8.area.good_chunks.h5.zip",
    ]
    # We don't care about the zip filename; we'll glob the unzipped .h5
    zip_path = data_dir / "SMEXv12.Continuum.response.good_chunks.h5.zip"

    # If a response .h5 already exists, just use it
    matches = list(data_dir.glob("SMEXv12.Continuum*.h5"))
    if matches:
        return matches[0]

    last_err = None
    for key in key_candidates:
        try:
            ensure_file(key, zip_path, unzip_to=data_dir)
            matches = list(data_dir.glob("SMEXv12.Continuum*.h5"))
            if matches:
                return matches[0]
        except Exception as e:
            last_err = e
    raise RuntimeError(
        "Could not download/unzip the DC2 response.\n"
        "Tried keys:\n  - " + "\n  - ".join(key_candidates) +
        f"\nLast error: {last_err}"
    )

# --- input files (all under data_dir)
GRB_signal_path   = data_dir / "grb_binned_data.hdf5"
background_path   = data_dir / "bkg_binned_data_local.hdf5"
orientation_path  = data_dir / "20280301_3_month_with_orbital_info.ori"

# --- fetch (download only if missing)
ensure_file("COSI-SMEX/cosipy_tutorials/grb_spectral_fit_local_frame/grb_binned_data.hdf5", GRB_signal_path)
ensure_file("COSI-SMEX/cosipy_tutorials/ts_maps/bkg_binned_data_local.hdf5", background_path)
ensure_file("COSI-SMEX/DC2/Data/Orientation/20280301_3_month_with_orbital_info.ori", orientation_path)
response_path = ensure_dc2_response()

# --- sanity check
for p in [GRB_signal_path, background_path, orientation_path, response_path]:
    assert Path(p).exists(), f"Missing file: {p}"

In [20]:
# --- spectrum
index = -2.2
K     = 10 / u.cm / u.cm / u.s / u.keV
piv   = 100 * u.keV

spectrum = Powerlaw()
spectrum.index.value = index
spectrum.K.value     = K.value
spectrum.piv.value   = piv.value 
spectrum.K.unit      = K.unit
spectrum.piv.unit    = piv.unit


In [21]:
# --- Read GRB signal
signal = Histogram.open(GRB_signal_path)

# GRB time bounds from signal
grb_tmin = signal.axes["Time"].edges.min()
grb_tmax = signal.axes["Time"].edges.max()

# Project to Em, PsiChi, Phi
signal = signal.project(['Em', 'PsiChi', 'Phi'])

# --- Load full background (3 months)
bkg_full = Histogram.open(background_path)

# Robust slice of background to the GRB 40 s interval (Time axis):
edges = bkg_full.axes['Time'].edges.value
try:
    bkg_tmin_idx = np.where(edges == grb_tmin.value)[0][0]
    bkg_tmax_idx = np.where(edges == grb_tmax.value)[0][0]
except IndexError:
    # Fallback: nearest indices if exact float match fails
    bkg_tmin_idx = int(np.searchsorted(edges, grb_tmin.value, side="left"))
    bkg_tmax_idx = int(np.searchsorted(edges, grb_tmax.value, side="left"))
# Clamp and ensure at least one time bin
bkg_tmin_idx = max(0, min(bkg_tmin_idx, len(edges)-2))
bkg_tmax_idx = max(bkg_tmin_idx+1, min(bkg_tmax_idx, len(edges)-1))

bkg = bkg_full.slice[bkg_tmin_idx:bkg_tmax_idx, :].project(['Em', 'PsiChi', 'Phi'])

# Data = bkg + signal
data = bkg + signal

# --- Average the full background down to 40 s to form the background model
bkg_full_duration = (bkg_full.axes['Time'].edges.max() - bkg_full.axes['Time'].edges.min())
bkg_model = (bkg_full / (bkg_full_duration / 40)).project(['Em', 'PsiChi', 'Phi'])


In [22]:

# --- Quick counts plot
ax, _ = bkg.project("Em").draw(label="background component")
data.project("Em").draw(ax,   label="data (GRB+bkg)")
signal.project("Em").draw(ax, label="GRB signal")
bkg_model.project("Em").draw(ax, label="background model")

ax.legend()
ax.set_xscale("log"); ax.set_yscale("log"); ax.set_ylabel("Counts")
plt.savefig(plots_dir / "counts_projection.png", dpi=150, bbox_inches="tight")
plt.close()

In [23]:
# --- Orientation sliced to GRB window
ori_full = SpacecraftFile.parse_from_file(orientation_path)
grb_ori  = ori_full.source_interval(Time(grb_tmin, format="unix"), Time(grb_tmax, format="unix"))

# Free RAM
del bkg_full, ori_full
_ = gc.collect()

In [None]:
# --- TS Map object
ts = FastTSMap(
    data          = data,
    bkg_model     = bkg_model,
    orientation   = grb_ori,
    response_path = response_path,
    cds_frame     = "local",
    scheme        = "RING",
)

# Hypothesis coordinates (nside controls TS-map resolution)
hypothesis_coords = FastTSMap.get_hypothesis_coords(nside=16)

# Choose cores safely
requested_cores = 56
cpu_cores = min(requested_cores, os.cpu_count() or requested_cores)

# Run TS fit
ts_results = ts.parallel_ts_fit(
    hypothesis_coords = hypothesis_coords,
    energy_channel    = [2, 3],
    spectrum          = spectrum,
    ts_scheme         = "RING",
    cpu_cores         = cpu_cores,
)

In [None]:
# --- Plot TS maps (saved to /data01/grb/plots)
coord = SkyCoord(l=93, b=-53, unit=(u.deg, u.deg), frame="galactic")
_cwd = os.getcwd()
try:
    os.chdir(plots_dir)
    ts.plot_ts(skycoord=coord, save_plot=True)
    ts.plot_ts(skycoord=coord, containment=0.9, save_plot=True)
finally:
    os.chdir(_cwd)

print("Done. Using response:", response_path)
print("Outputs in:", plots_dir)
