In [None]:
from time import time
import argparse
import astropy.units as u
import logging
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt
import numpy as np
import shutil

import traceback
import warnings

from astropy.coordinates import Angle, SkyCoord
from astropy.utils.exceptions import AstropyWarning
from astropy.io import fits
from astropy.table import Table
from astropy.time import Time
from matplotlib import use
from pathlib import Path
from regions import CircleSkyRegion, PointSkyRegion

from gammapy.data import Observation, observatory_locations, FixedPointingInfo, PointingMode
from gammapy.datasets import MapDataset, MapDatasetEventSampler
from gammapy.extern import xmltodict
from gammapy.irf import load_irf_dict_from_file
from gammapy.makers import MapDatasetMaker
from gammapy.maps import MapAxis, RegionNDMap, WcsGeom
from gammapy.modeling.models import (
    ConstantSpectralModel,
    FoVBackgroundModel,
    LightCurveTemplateTemporalModel,
    PointSpatialModel,
    PowerLawSpectralModel,
    LightCurveTemplateTemporalModel,
    ExpDecayTemporalModel,
    SkyModel,
    Models
)
from gammapy.utils.time import time_ref_to_dict

# Flaring Source simulator with Gammapy 1.2

This notebook can be used to simulate a transient source with gammapy.

The corresponding script is source_simulator.py

In [None]:
# INPUT PARAMETERS

# CHECK ACTIVELY THESE
nsim=2 # Number of simulations
outdir="./test/gamma/workspace/simulations/output/test" # Output Directory
livetime_seconds=1200.0 # Observation duration in seconds
steps = [5, 10, 50, 100] # Time Binning of the lightcurves to be produced in s
transient_offset = 100 # in seconds (if >0 transient starts after observation, if <0 transient starts before)

# THESE MAY STAY DEFAULT
irf_file = "./caldb/data/cta/prod5/Prod5-North-20deg-AverageAz-4LSTs.18000s-v0.1.fits" # Path to an IRF file
emin= "0.01 TeV" # Simulation Min Energy, as astropy quantity string
emax= "50.0 TeV" # Simulation Max Energy, as astropy quantity string
time_ref_isot="2024-01-01T00:00:00" # Observation time start, UTC ISOT format
ra = 83.63 # Pointing RA
dec= 22.01 # Pointing DEC
width=5.0 # FoV size in deg
binsz=0.01 # FoV spatial resolution, in deg

skymodel=None # Path to a YAML file with source model to simulate. If not provided, it will use a default model

lightcurveflag=True # If True, print light curves. If false, simulate only photon list.


In [None]:
# TESTING PARAMETERS

# nsim=2 # Number of simulations
# outdir="/home/gamma/workspace/simulations/output/test" # Output Directory
# livetime_seconds=1200.0 # Observation duration in seconds
# steps = [5, 10, 50, 100] # Time Binning of the lightcurves to be produced in s

# irf_file = "/home/gamma/workspace/simulations/caldb/data/cta/prod5/Prod5-North-20deg-AverageAz-4LSTs.18000s-v0.1.fits" # Path to an IRF file
# emin= "0.01 TeV" # Simulation Min Energy, as astropy quantity string
# emax= "50.0 TeV" # Simulation Max Energy, as astropy quantity string
# time_ref_isot="2024-01-01T00:00:00" # Observation time start, UTC ISOT format
# ra = 83.63 # Pointing RA
# dec= 22.01 # Pointing DEC
# width=5.0 # FoV size in deg
# binsz=0.01 # FoV spatial resolution, in deg

# skymodel=None # Path to a YAML file with source model to simulate. If not provided, it will use a default model

# lightcurveflag=True # If True, print light curves. If false, simulate only photon list.

### Simulation Setup

In [None]:
# 1 - SETUP
print(f"=====Setting initial configuration...")

# Read IRF
print(f"Read IRF from: {irf_file}")
irf = load_irf_dict_from_file(irf_file)

# Set Axes for Simulation Geometry
energy_axis      = MapAxis.from_energy_bounds(emin, emax, nbin=5, per_decade=True)
energy_axis_true = MapAxis.from_energy_bounds("0.001 TeV", "250 TeV", nbin=10, per_decade=True, name="energy_true")
migra_axis = MapAxis.from_bounds(0.5, 2, nbin=150, node_type="edges", name="migra")

# Set Observation Parameters
pointing = SkyCoord(ra * u.deg, dec * u.deg, frame="icrs", unit="deg")
livetime = livetime_seconds * u.s
time_ref = Time(time_ref_isot, format="isot", scale="utc")
obsid = "0001"

# Clear Output Directory       
output_directory = Path(outdir).absolute()
if output_directory.is_dir():
    print(f"Remove {output_directory}")
    shutil.rmtree(f"{output_directory}")
# Make Output Directory
output_directory.mkdir(parents=True)

print(f"Setting initial configuration... done!\n")

### Gammapy stuff: create Observation and Dataset (Geometry) objects

In [None]:
def make_obs(obsid, pointing, irf, time_ref, location="cta_south", livetime=1200*u.s):
    location = observatory_locations[location]
    pointing = FixedPointingInfo(mode=PointingMode.POINTING, fixed_icrs=pointing)
    observation = Observation.create(
        obs_id=obsid,
        pointing=pointing,
        livetime=livetime,
        irfs=irf,
        location=location,
        reference_time=time_ref,
        )
    
    return observation

def make_dataset(pointing, observation,
                 energy_axis, energy_axis_true, migra_axis,
                 width=12*u.deg, binsz=0.01*u.deg):
    """
    Define the dataset object.

    Input:
        pointing: ~astropy.SkyCoord object
        observation: ~gammapy.Observation

    Output:
        dataset: ~gammapy.Dataset
        geom: ~gammapy.maps, geometry of the dataset
    """
    geom = WcsGeom.create(skydir=pointing,
                          width=(width, width),
                          binsz=binsz,
                          frame="icrs",
                          axes=[energy_axis],
                          )
    empty = MapDataset.create(geom,
                              energy_axis_true=energy_axis_true,
                              migra_axis=migra_axis,
                              name="my-dataset"
                              )
    maker = MapDatasetMaker(selection=["exposure", "background", "psf", "edisp"])
    dataset = maker.run(empty, observation)

    return dataset

In [None]:
# 2 - CREATE OBSERVATION and DATASET objects
print(f"=====Create observation...")
observation = make_obs(obsid=obsid, pointing=pointing, irf=irf, time_ref=time_ref, livetime=livetime)
print(f"Create observation... done!\n")

print(f"=====Create dataset...")
dataset = make_dataset(pointing, observation, energy_axis, energy_axis_true, migra_axis, width=width, binsz=binsz)
print(f"Create dataset... done!\n")

# Run Simulations

1. Set the Model: it can be read from a YAML file, or it can be a default model
2. Simulate the Events (Photon lists)
3. Bin the Events in a ON region to get a counts light curve

**Default Model:**
- Spatial: Point-lke source at 0.4 degree offset from FoV center
- Spectral: Power Law with fixed index and amplitude (compatible with a burst detectable with the default IRF)
- Temporal: Exponential Decay Model, where the time scale and the normalization factor are **random** values, and the burst starts 30s before the observation start.

**Is this okay?**

In [None]:
# THIS FUNCTION WILL SAVE THE RANDOM LIGHTCURVE MODEL + A PLOT
def make_default_model(time_ref, livetime, source_coordinates, i, output_directory,
                       amplitude="3e-10 cm-2 s-1 TeV-1", #"1e-12 cm-2 s-1 TeV-1",
                       index="2.25"
                       ):
    """
    Make a default SkyModel.
    
    time_ref : `astropy.time.Time`
        Observation start.
    livetime : `astropy.quantity.Quantity`
        Observation duration.
    source_coordinates : `astropy.coordinates.SkyCoord`
        Coordinates for Point-like source.
    i : int
        Number of the Simulation.
    output_directory : `pathlib.Path`
        Directory where to save models.
    amplitude, index : str
        Power Law parameters.
    """
    # The Default Temporal Model is an ExpDecayTemporalModel,
    # ExpDecayTemporalModel = exp((t-tstart)/t0).
    
    # Burst Offset T_start chosen randomly inside the observation (serendipitous transient)
    # tstart = time_ref + np.random.uniform(0, livetime.value) * u.s
    # Burst Offset 30 s before observation start (follow-up transient).
    tstart = time_ref + transient_offset*u.s
    
    # Decay Timescale t0 randomly chosen between 0 and half the livetime duration.
    t0 = np.random.uniform(0, livetime.value * 0.5) * u.s
    print(f"Decay Time Scale: {t0.value:.5f} s")
    
    # The Normalization is a random factor between 0.1 and 3
    mult_fact = np.random.uniform(0.1, 3)
    print(f"Normalization Factor: {mult_fact:.3f}")

    # Create the time array and define the model
    time = time_ref + np.linspace(0, livetime.to("s").value, 10000) * u.s
    idx = np.where(time >= tstart)
    expdecay_model = ExpDecayTemporalModel(t_ref=(tstart-time_ref).to("d"), t0=t0)
    # Create the norm array and evaluate the model.
    norm = np.zeros_like(time, float)
    norm[idx] = expdecay_model.evaluate(time[idx], t0, tstart).value * mult_fact
    # Write the Model
    tab = Table()
    tab["TIME"] = (time - time_ref).to("s")
    tab["NORM"] = norm
    tab.meta = time_ref_to_dict(time_ref, scale="utc")
    tab.meta["TIMEUNIT"] = "s"
    filename = output_directory.joinpath(f"lightcurve_model_{i}.fits")
    tab.write(filename, overwrite=True)
    
    # Save model plot
    fig, ax = plt.subplots(1, figsize=(12,8), constrained_layout=True)
    ax.plot(tab['TIME'], tab['NORM'])
    ax.set_xlabel('Time (s)')
    ax.set_ylabel('Norm')
    ax.grid()
    fig.savefig(output_directory.joinpath(f"lightcurve_model_{i}.png"))

    # Read the Default Temporal Model produced
    temporal_model = LightCurveTemplateTemporalModel.read(filename)
    # Add a Power Law Spectral Model
    spectral_model = PowerLawSpectralModel(amplitude=amplitude, index = index, reference="1 TeV")
    # Add a Spatial Model at the source_coordinates
    spatial_model = PointSpatialModel.from_position(source_coordinates)
    # Bundle the Source Models
    model = SkyModel(spectral_model=spectral_model,
                    spatial_model=spatial_model,
                    temporal_model=temporal_model,
                    name="fake_src")
    # Add the Background
    bkg = FoVBackgroundModel(dataset_name="my-dataset")
    
    # Bundle all the Models
    models = Models([model, bkg])
    models.write(output_directory.joinpath(f"models_{i}.yaml"))
    
    return models

In [None]:
# SAVE SIMULATED PHOTON LIST + SUMMARY PLOT
def save_events(events, dataset, output_directory, index):
    """Save simulated event list.

    Input:
        events: ~gammapy.EventList; event list
        dataset: ~gammapy.Dataset; dataset of the OB
        output_directory: ~path; path of the output file
        index : int; simulation index.
    """
    # Save plot
    gs = gridspec.GridSpec(nrows=2, ncols=3)
    fig = plt.figure(figsize=(12, 8))

    # energy plot
    ax_energy = fig.add_subplot(gs[1, 0])
    events.plot_energy(ax=ax_energy)

    # offset plots
    ax_offset = fig.add_subplot(gs[0, 1])
    events.plot_offset2_distribution(ax=ax_offset)
    ax_energy_offset = fig.add_subplot(gs[0, 2])
    events.plot_energy_offset(ax=ax_energy_offset)

    # time plot
    ax_time = fig.add_subplot(gs[1, 1])
    events.plot_time(ax=ax_time)

    # image plot
    m = events._counts_image(allsky=False)
    ax_image = fig.add_subplot(gs[0, 0], projection=m.geom.wcs)
    m.plot(ax=ax_image, stretch="sqrt", vmin=0)
    plt.subplots_adjust(wspace=0.3)
    
    fig.savefig(output_directory.joinpath(f"events_{index}.png"))
    
    
    # Save file
    primary_hdu = fits.PrimaryHDU()
    hdu_evt = fits.BinTableHDU(events.table)
    hdu_gti = dataset.gti.to_table_hdu()
    hdu_all = fits.HDUList([primary_hdu, hdu_evt, hdu_gti])
    hdu_all.writeto(output_directory.joinpath(f"events_{index}.fits"), overwrite=True)
    return None

In [None]:
# 3 - RUN SIMULATION
print(f"=====Run {nsim} simulations... ")
for i in np.arange(nsim):
        
    # Define Models
    if skymodel is None:
        print(f"Create the model and assign to the dataset...")
        source_coordinates = SkyCoord((ra+0.4) * u.deg, dec * u.deg, frame="icrs", unit="deg")
        models = make_default_model(time_ref, livetime, source_coordinates, i, output_directory)
    else:
        print(f"Read models from: {skymodel}")
        models = Models.read(skymodel)
        # Add background
        print(f"Add background: {skymodel}")
        bkg = FoVBackgroundModel(dataset_name="my-dataset")
        models.append(bkg)
        
    print(models)
    dataset.models = models
    print(f"Create the model and assign to the dataset... done!")

    print(f"Run simulations for pointing {i}... ")
    sampler = MapDatasetEventSampler(random_state=i)
    events = sampler.run(dataset, observation)

    print(f"     - Saving events for pointing {obsid}")
    save_events(events, dataset, output_directory, i)
    print(f"Run simulations for pointing {i}... done!")
        
    if lightcurveflag:
        # Get the counts light curve
        print(f"Make light curves... ")
        ON_center=models[0].spatial_model.position
        ON_region = CircleSkyRegion(center=ON_center, radius=Angle(0.2*u.deg))
        ON_events = events.select_region(ON_region)
        
        # Compute the OFF position with background only
        # OFF has the same offset as ON position, but is symmetrical wrt pointing.
        pos_angle = pointing.position_angle(ON_center)
        sep_angle = pointing.separation(ON_center)
        OFF_center = pointing.directional_offset_by(pos_angle + Angle(np.pi, "rad"), sep_angle)
        OFF_region = CircleSkyRegion(center=OFF_center, radius=Angle(0.2*u.deg))
        OFF_events = events.select_region(OFF_region)
            
        # Bin Counts
        event_times_on = ON_events.time.unix
        event_times_off= OFF_events.time.unix
        t_min, t_max = np.min(event_times_on), np.max(event_times_on)
        for step in steps:
            n_bins = int( livetime.to('s').value/step)
            timebins = np.linspace(t_min, t_max, num=n_bins+1)
            
            # Bin ON and OFF light curve
            lightcurve_on, edges_on = np.histogram(event_times_on, bins=timebins)
            lightcurve_off,edges_off= np.histogram(event_times_off, bins=timebins) # edges_on==edges_off
            # Plot
            fig, ax = plt.subplots(1, figsize=(12,8), constrained_layout=True)
            ax.stairs(lightcurve_on, edges_on, label="ON")
            ax.stairs(lightcurve_off, edges_off, label="OFF")
            ax.set_xlabel('UNIX Time (s)')
            ax.set_ylabel('Counts')
            ax.set_title(f"Light Curve Simulation {i}, {step}s.")
            ax.grid()
            ax.legend()
            fig.savefig(output_directory.joinpath(f"lightcurve_simulation_{i}_{step}s.png"))
            # Save Table
            edges_min = edges_on[:-1]
            edges_max = edges_on[1:]
            t = Table([edges_min, edges_max, lightcurve_on, lightcurve_off], names=('t_min', 't_max', 'on_counts','off_counts'))
            t.write(output_directory.joinpath(f"lightcurve_simulation_{i}_{step}s.csv"))
    
print("Simulations completed!")