In [None]:
import itertools

import numpy as np
from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.time import Time
from matplotlib import pyplot as plt

In [None]:
# Due to the dependencies on other packages, gammapy v0.19 is installed
# in the current magic-cta-pipe environment, but it is already outdated.
# In order to use the latest features, such as dynamic theta cuts and
# the wobble region finder, please run this notebook with a different
# environment where the newer versions of gammapy (v0.20.*) is installed.

import gammapy

print(f"gammapy: v{gammapy.__version__}")

from gammapy.data import DataStore
from gammapy.datasets import Datasets, SpectrumDataset
from gammapy.estimators import FluxPointsEstimator, LightCurveEstimator
from gammapy.makers import (
    ReflectedRegionsBackgroundMaker,
    SafeMaskMaker,
    SpectrumDatasetMaker,
    WobbleRegionsFinder,
)
from gammapy.maps import Map, MapAxis, RegionGeom
from gammapy.modeling import Fit
from gammapy.modeling.models import (
    ExpCutoffPowerLawSpectralModel,
    LogParabolaSpectralModel,
    PowerLawSpectralModel,
    SkyModel,
    create_crab_spectral_model,
)
from gammapy.visualization import plot_spectrum_datasets_off_regions
from regions import CircleSkyRegion, PointSkyRegion

> # pox doppio plot?(es con e senza pesi)

In [None]:
# Customize the pyplot figure
plt.rcParams.update(
    {"figure.figsize": (12, 9), "font.size": 15, "grid.linestyle": "--"}
)

# Get the default color cycle
colors = plt.rcParams["axes.prop_cycle"].by_key()["color"]

# Load DL3 data

In [None]:
# ================
# === Settings ===
# ================

input_dir = ('')
    
input_dir_1 = ('')

# === Main ===
# ============

print(f"Input directory: {input_dir}")
print(f"Input directory: {input_dir_1}")

data_store = DataStore.from_dir(input_dir)
data_store_1 = DataStore.from_dir(input_dir_1)
# Show the observation table
data_store.obs_table


In [None]:
# ================
# === Settings ===
# ================



obs_ids = None   # `None` means "all" observations

# ============
# === Main ===
# ============

observations = data_store.get_observations(obs_ids, required_irf="point-like")
observations_1 = data_store_1.get_observations(obs_ids, required_irf="point-like")
print(observations)




# Define a target region

In [None]:
# Get metadata from the first observation
observation = observations[0]

event_meta = observation.events.table.meta
aeff_meta = observation.aeff.meta

# Define a target position
target_position = SkyCoord(
    u.Quantity(event_meta["RA_OBJ"], u.deg),
    u.Quantity(event_meta["DEC_OBJ"], u.deg),
    frame="icrs",
)

if "RAD_MAX" in aeff_meta:
    # Get the global theta cut used for creating the IRFs
    on_region_radius = aeff_meta["RAD_MAX"] * u.deg
    
    # Use the circle sky region to apply the global theta cut
    on_region = CircleSkyRegion(center=target_position, radius=on_region_radius)
    
else:
    # Use the point sky region to apply dynamic theta cuts
    on_region = PointSkyRegion(target_position)
    
print(on_region)

# Run the data reduction chain

In [None]:
# ================
# === Settings ===
# ================

energy_min = "0.030 TeV"
energy_max = "30. TeV"
n_bins_pdec = 5

true_energy_min = "0.01 TeV"
true_energy_max = "100 TeV"
n_bins_pdec_true = 10

n_off_regions = 3

# ============
# === Main ===
# ============

energy_axis = MapAxis.from_energy_bounds(
    energy_min,
    energy_max,
    nbin=n_bins_pdec,
    per_decade=True,
    name="energy",
)

energy_axis_true = MapAxis.from_energy_bounds(
    true_energy_min,
    true_energy_max,
    nbin=n_bins_pdec_true,
    per_decade=True,
    name="energy_true",
)

print("Energy axis:")
print(energy_axis.edges)

on_geom = RegionGeom.create(region=on_region, axes=[energy_axis])

dataset_empty = SpectrumDataset.create(geom=on_geom, energy_axis_true=energy_axis_true)

# Create a spectrum dataset maker
dataset_maker = SpectrumDatasetMaker(
    containment_correction=False,
    selection=["counts", "exposure", "edisp"],
    use_region_center=True,
)

# Create a background maker
print(f"\nNumber of OFF regions: {n_off_regions}")

region_finder = WobbleRegionsFinder(n_off_regions=n_off_regions)
bkg_maker = ReflectedRegionsBackgroundMaker(region_finder=region_finder)

# Create a safe mask maker
safe_mask_maker = SafeMaskMaker(methods=["aeff-max"], aeff_percent=3)

First sample

In [None]:
datasets = Datasets()

counts = Map.create(skydir=target_position, width=3)

# Loop over every observation
print("Running the makers...")

n_observations = len(observations)

for i_obs, observation in enumerate(observations):
    
    if (i_obs % 10) == 0:
        print(f"{i_obs}/{n_observations}")
    
    obs_id = observation.obs_id
    
    # Fill the number of events in the map
    counts.fill_events(observation.events)

    # Run the makers to the observation data
    dataset = dataset_maker.run(dataset_empty.copy(name=str(obs_id)), observation)
    dataset_on_off = bkg_maker.run(dataset, observation)
    dataset_on_off = safe_mask_maker.run(dataset_on_off, observation)

    datasets.append(dataset_on_off)

print(f"{n_observations}/{n_observations}")

# Get the information table
info_table = datasets.info_table(cumulative=True)
print(datasets.energy_ranges)
# Show the table
info_table

Seconda sample

In [None]:
datasets_1 = Datasets()

counts_1 = Map.create(skydir=target_position, width=3)

# Loop over every observation
print("Running the makers...")

n_observations_1 = len(observations_1)

for i_obs, observation in enumerate(observations_1):
    
    if (i_obs % 10) == 0:
        print(f"{i_obs}/{n_observations_1}")
    
    obs_id = observation.obs_id
    
    # Fill the number of events in the map
    counts_1.fill_events(observation.events)

    # Run the makers to the observation data
    dataset = dataset_maker.run(dataset_empty.copy(name=str(obs_id)), observation)
    dataset_on_off = bkg_maker.run(dataset, observation)
    dataset_on_off = safe_mask_maker.run(dataset_on_off, observation)

    datasets_1.append(dataset_on_off)

print(f"{n_observations_1}/{n_observations_1}")

# Get the information table
info_table_1 = datasets_1.info_table(cumulative=True)
print(datasets_1.energy_ranges)
# Show the table
info_table_1

Some plots (first sample)

In [None]:
plt.figure()

# Plot the count map
ax = counts.plot(add_cbar=True)

# Plot the ON position
on_geom.plot_region(ax=ax, edgecolor="red")

# Plot the OFF positions (only the first part of observations)
if n_observations < 10:
    plot_spectrum_datasets_off_regions(datasets, ax)
else:
    plot_spectrum_datasets_off_regions(datasets[0:10], ax)

ax.grid()

In [None]:
plt.figure(figsize=(20, 7))
grid = (1, 2)

# Plot the number of events along the livetime
plt.subplot2grid(grid, (0, 0))
plt.xlabel("Livetime [hour]")
plt.ylabel("Number of events")

plt.plot(
    info_table["livetime"].to("h"),
    info_table["excess"],
    marker="o",
    linestyle="--",
    label="Excess",
)

plt.plot(
    info_table["livetime"].to("h"),
    info_table["background"],
    marker="o",
    linestyle="--",
    label="Background",
)

plt.grid()
plt.legend()

# Plot the significance along the livetime
plt.subplot2grid(grid, (0, 1))
plt.xlabel("Livetime [hour]")
plt.ylabel("Sqrt(TS)")

plt.plot(
    info_table["livetime"].to("h"), info_table["sqrt_ts"], marker="o", linestyle="--"
)

plt.grid()

# Fit the spectrum

In [None]:
source_name="Crab Nebula"

spectral_model = LogParabolaSpectralModel(
    amplitude=u.Quantity(5e-12, unit="cm-2 s-1 TeV-1"),
    alpha=2,
    beta=0.1,
    reference=u.Quantity(1, unit="TeV"),
)

# ============
# === Main ===
# ============

sky_model = SkyModel(spectral_model=spectral_model.copy(), name=source_name)

# Add the model to the stacked dataset
stacked_dataset = datasets.stack_reduce()
stacked_dataset.models = [sky_model]

stacked_dataset_1 = datasets_1.stack_reduce()
stacked_dataset_1.models = [sky_model]

# Fit the spectral model
fit = Fit()

First sample

In [None]:
results = fit.run(datasets=stacked_dataset)
print(results)

# Keep the best fit model
best_fit_model = stacked_dataset.models[0].spectral_model.copy()

# Show the fitted parameters
stacked_dataset.models.to_parameters_table()

Second sample

In [None]:
results_1 = fit.run(datasets=stacked_dataset_1)
print(results_1)

# Keep the best fit model
best_fit_model_1 = stacked_dataset_1.models[0].spectral_model.copy()

# Show the fitted parameters
stacked_dataset_1.models.to_parameters_table()


Plots

In [None]:
plt.figure()

# Plot the number of excess and predicted events
ax_spectrum, ax_residuals = stacked_dataset.plot_fit()

ax_spectrum.set_ylabel("Number of events")
ax_spectrum.grid()

plt.setp(ax_spectrum.get_xticklabels(), visible=False)
plt.subplots_adjust(hspace=0)

ax_residuals.grid()

In [None]:
plt.figure()

# Plot the number of excess and predicted events
ax_spectrum, ax_residuals = stacked_dataset_1.plot_fit()

ax_spectrum.set_ylabel("Number of events")
ax_spectrum.grid()

plt.setp(ax_spectrum.get_xticklabels(), visible=False)
plt.subplots_adjust(hspace=0)

ax_residuals.grid()

# Estimate the flux points

In [None]:
# Create a flux point estimator
flux_points_estimator = FluxPointsEstimator(
    energy_edges=energy_axis.edges, source=source_name, selection_optional="all"
)

First sample

In [None]:
# Run the flux point estimator to the datasets
print("Running the flux points estimator...")
flux_points = flux_points_estimator.run(datasets=stacked_dataset)

# Show the flux points table
flux_points.to_table(sed_type="e2dnde", formatted=True)

Second sample

In [None]:
# Run the flux point estimator to the datasets
print("Running the flux points estimator...")
flux_points_1 = flux_points_estimator.run(datasets=stacked_dataset_1)

# Show the flux points table
flux_points_1.to_table(sed_type="e2dnde", formatted=True)

SED plot

In [None]:
# ================
# === Settings ===
# ================

sed_type = "e2dnde"
yunits = u.Unit("erg cm-2 s-1")
#yunits = "TeV cm-2 s-1"
crab_model = create_crab_spectral_model("magic_lp")
print(crab_model)
reference_models = {
    "Crab reference (MAGIC, JHEAp 2015)": crab_model,
    
}
#unit = u.Unit('TeV cm-2 s-1')
# ============
# === Main ===
# ============
from astropy import visualization

with visualization.quantity_support():
    plt.figure()

    energy_bounds = energy_axis.edges[[0, -1]]

    lines = itertools.cycle(["--", "-.", ":"])

    # Plot the flux points
    ax = flux_points.plot(sed_type=sed_type, label="LST-1 + MAGIC (this work), -bias,pointing weights")
    ax = flux_points_1.plot(sed_type=sed_type, label="LST-1 + MAGIC (this work),  pointing weights")
    

    # Plot the best fit model and its error, first sample
    best_fit_model.plot(
        ax=ax,
        energy_bounds=energy_bounds,
        sed_type=sed_type,
        yunits=yunits,
        color=colors[0],
        label="Best fit model, s"
    )

    best_fit_model.plot_error(
        ax=ax,
        energy_bounds=energy_bounds,
        sed_type=sed_type,
        yunits=yunits,
        facecolor=colors[0],
    )
    
    
    # Plot the best fit model and its error, seconda sample
    best_fit_model_1.plot(
        ax=ax,
        energy_bounds=energy_bounds,
        sed_type=sed_type,
        yunits=yunits,
        color=colors[1],
        label="Best fit model, -bias, "
    )

    best_fit_model_1.plot_error(
        ax=ax,
        energy_bounds=energy_bounds,
        sed_type=sed_type,
        yunits=yunits,
        facecolor=colors[1],
    )
    
   
    # Plot the reference spectra
    for label, model in reference_models.items():

        model.plot(
            ax=ax,
            energy_bounds=energy_bounds,
            sed_type=sed_type,
            yunits=yunits,
            label=label,
            linestyle=next(lines)
        )

    ax.set_title(f"Spectral Energy Distribution of {source_name}")
    
    ax.set_ylim(1e-12)
    ax.grid(which="both")
    ax.legend(loc="lower left")


# Estimate the light curve

Setting + First sample

In [None]:
# ================
# === Settings ===
# ================

frozen_params = ["alpha", "beta"]

# ============
# === Main ===
# ============

sky_model = SkyModel(
    spectral_model=best_fit_model.copy(), name=source_name
)

# Freeze the spectral parameters
for param in frozen_params:
    sky_model.parameters[param].frozen = True
    
# Add the model to the datasets
datasets.models = [sky_model]

print(sky_model)


Second sample

In [None]:
sky_model_1 = SkyModel(
    spectral_model=best_fit_model_1.copy(), name=source_name
)

# Freeze the spectral parameters
for param in frozen_params:
    sky_model_1.parameters[param].frozen = True
    
# Add the model to the datasets
datasets_1.models = [sky_model_1]

print(sky_model_1)


### Light curve 

In [None]:
# ================
# === Settings ===
# ================

energy_edges = energy_axis.edges[[1,-1]]
print(energy_edges)

#time_intervals = [
#    Time([59171.98, 59172.01], format="mjd", scale="utc"),
#    Time([59190.98, 59190.99], format="mjd", scale="utc"),
#    Time([59198.89, 59198.94], format="mjd", scale="utc"),
#    Time([59258.91, 59258.99], format="mjd", scale="utc"),
#    Time([59288.91, 59288.94], format="mjd", scale="utc"),
#    Time([59290.94, 59290.96], format="mjd", scale="utc"),
#] 
time_intervals=None# `None` means "run-wise"

# ============
# === Main ===
# ============

# Create a light curve estimator
light_curve_estimator = LightCurveEstimator(
    energy_edges=energy_edges,
    time_intervals=time_intervals,
    source=source_name,
    selection_optional="all",
)

First sample

In [None]:
# Run the light curve estimator to the datasets
print("Running the light curve estimator...")
light_curve = light_curve_estimator.run(datasets=datasets)

# Show the light curve table
light_curve.to_table(sed_type="flux", format="lightcurve")


Seconda sample

In [None]:
# Run the light curve estimator to the datasets
print("Running the light curve estimator...")
light_curve_1 = light_curve_estimator.run(datasets=datasets_1)

# Show the light curve table
light_curve_1.to_table(sed_type="flux", format="lightcurve")



LC plot

In [None]:
plt.figure()

lines = itertools.cycle(["--", "-.", ":"])

# Plot the light curve
ax = light_curve.plot(sed_type="flux", label="LST-1 + MAGIC (this work), pointing weights")
ax = light_curve_1.plot(sed_type="flux", label="LST-1 + MAGIC (this work), intensity + pointing weights")


xlim = plt.xlim()

# Plot the reference flux
for label, model in reference_models.items():
    
    integ_flux = model.integral(energy_edges[0], energy_edges[1])
    ax.plot(xlim, np.repeat(integ_flux, 2), label=label, linestyle=next(lines))

energy_range = f"{energy_edges[0]:.3f} < $E$ < {energy_edges[1]:.1f}"

ax.set_title(f"Light curve of {source_name} ({energy_range})")
ax.set_ylabel("Flux [cm$^{-2}$ s$^{-1}$]")
ax.set_yscale("linear")
ax.legend()
ax.grid()
