Skip to content

Commit

Permalink
Add docstrings and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
maxnoe committed Dec 12, 2023
1 parent 862648f commit f45babe
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 39 deletions.
35 changes: 34 additions & 1 deletion ctapipe/conftest.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
"""
common pytest fixtures for tests in ctapipe
"""

import shutil
from copy import deepcopy

import astropy.units as u
import numpy as np
import pytest
from astropy.coordinates import EarthLocation
from astropy.table import Table
from pytest_astropy_header.display import PYTEST_HEADER_MODULES

from ctapipe.core import run_tool
Expand Down Expand Up @@ -645,3 +647,34 @@ def disp_reconstructor_path(model_tmp_path, gamma_train_clf):
def reference_location():
"""a dummy EarthLocation to use for SubarrayDescriptions"""
return EarthLocation(lon=-17 * u.deg, lat=28 * u.deg, height=2200 * u.m)


@pytest.fixture(scope="session")
def dl1_mon_pointing_file(dl1_file, dl1_tmp_path):
from ctapipe.instrument import SubarrayDescription
from ctapipe.io import read_table, write_table

path = dl1_tmp_path / "dl1_mon_ponting.dl1.h5"
shutil.copy(dl1_file, path)

events = read_table(path, "/dl1/event/subarray/trigger")
subarray = SubarrayDescription.from_hdf(path)

# create some dummy monitoring data
time = events["time"]
start, stop = time[[0, -1]]
duration = (stop - start).to_value(u.s)

# start a bit before, go a bit longer
dt = np.arange(-1, duration + 2, 1) * u.s
time_mon = start + dt

alt = (69 + 2 * dt / dt[-1]) * u.deg
az = (180 + 5 * dt / dt[-1]) * u.deg

table = Table({"time": time_mon, "azimuth": az, "altitude": alt})

for tel_id in subarray.tel:
write_table(table, path, f"/dl0/monitoring/telescope/pointing/tel_{tel_id:03d}")

return path
1 change: 1 addition & 0 deletions ctapipe/io/hdf5eventsource.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ def _generator(self):
h5file=self.file_,
parent=self,
)
tel_pointing_finder = None
else:
pointing_interpolator = None

Expand Down
55 changes: 46 additions & 9 deletions ctapipe/io/pointing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import astropy.units as u
import numpy as np
import tables
from astropy.time import Time
from scipy.interpolate import interp1d

from ctapipe.core import Component, traits
Expand All @@ -29,10 +30,10 @@ class PointingInterpolator(Component):
default_value=False,
).tag(config=True)

def __init__(self, h5file, **kwargs):
def __init__(self, h5file=None, **kwargs):
super().__init__(**kwargs)

if not isinstance(h5file, tables.File):
if h5file is not None and not isinstance(h5file, tables.File):
raise TypeError("h5file must be a tables.File")
self.h5file = h5file

Expand All @@ -49,24 +50,57 @@ def __init__(self, h5file, **kwargs):
self._alt_interpolators = {}
self._az_interpolators = {}

def _read_pointing_table(self, tel_id):
pointing_table = read_table(
self.h5file,
f"/dl0/monitoring/telescope/pointing/tel_{tel_id:03d}",
)
def add_table(self, tel_id, pointing_table):
"""
Add a table to this interpolator
Parameters
----------
tel_id : int
Telescope id
pointing_table : astropy.table.Table
Table of pointing values, expected columns
are ``time`` as ``Time`` column, ``azimuth`` and ``altitude``
as quantity columns.
"""
missing = {"time", "azimuth", "altitude"} - set(pointing_table.colnames)
if len(missing) > 0:
raise ValueError(f"Table is missing required column(s): {missing}")

if not isinstance(pointing_table["time"], Time):
raise TypeError("'time' column of pointing table must be astropy.time.Time")

for col in ("azimuth", "altitude"):
unit = pointing_table[col].unit
if unit is None or not u.rad.is_equivalent(unit):
raise ValueError(f"{col} must have units compatible with 'rad'")

# sort first, so it's not done twice for each interpolator
pointing_table.sort("time")
# interpolate in mjd TAI. Float64 mjd is precise enough for pointing
# and TAI is contiguous, so no issues with leap seconds.
mjd = pointing_table["time"].tai.mjd

az = pointing_table["azimuth"].quantity.to_value(u.rad)
# prepare azimuth for interpolation "unwrapping", i.e. turning 359, 1 into 359, 361
# prepare azimuth for interpolation by "unwrapping": i.e. turning
# [359, 1] into [359, 361]. This assumes that if we get values like
# [359, 1] the telescope moved 2 degrees through 0, not 358 degrees
# the other way around. This should be true for all telescopes given
# the sampling speed of pointing values and their maximum movement speed.
# No telescope can turn more than 180° in 2 seconds.
az = np.unwrap(az)
alt = pointing_table["altitude"].quantity.to_value(u.rad)

self._az_interpolators[tel_id] = interp1d(mjd, az, **self.interp_options)
self._alt_interpolators[tel_id] = interp1d(mjd, alt, **self.interp_options)

def _read_pointing_table(self, tel_id):
pointing_table = read_table(
self.h5file,
f"/dl0/monitoring/telescope/pointing/tel_{tel_id:03d}",
)
self.add_table(tel_id, pointing_table)

def __call__(self, tel_id, time):
"""
Interpolate alt/az for given time and tel_id.
Expand All @@ -86,7 +120,10 @@ def __call__(self, tel_id, time):
interpolated azimuth angle
"""
if tel_id not in self._az_interpolators:
self._read_pointing_table(tel_id)
if self.h5file is not None:
self._read_pointing_table(tel_id)
else:
raise KeyError(f"No pointing table available for tel_id {tel_id}")

mjd = time.tai.mjd
az = u.Quantity(self._az_interpolators[tel_id](mjd), u.rad, copy=False)
Expand Down
11 changes: 11 additions & 0 deletions ctapipe/io/tests/test_hdf5eventsource.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,14 @@ def test_dl1_camera_frame(dl1_camera_frame_file):
assert sim.true_parameters.hillas.intensity is not None

assert tel_id is not None, "did not test any events"


def test_interpolate_pointing(dl1_mon_pointing_file):
from ctapipe.io import HDF5EventSource

with HDF5EventSource(dl1_mon_pointing_file) as source:
for e in source:
assert set(e.pointing.tel.keys()) == set(e.trigger.tels_with_trigger)
for pointing in e.pointing.tel.values():
assert not np.isnan(pointing.azimuth)
assert not np.isnan(pointing.altitude)
113 changes: 113 additions & 0 deletions ctapipe/io/tests/test_pointing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import astropy.units as u
import numpy as np
import pytest
import tables
from astropy.table import Table
from astropy.time import Time


def test_simple():
"""Test pointing interpolation"""
from ctapipe.io.pointing import PointingInterpolator

t0 = Time("2022-01-01T00:00:00")

table = Table(
{
"time": t0 + np.arange(0.0, 10.1, 2.0) * u.s,
"azimuth": np.linspace(0.0, 10.0, 6) * u.deg,
"altitude": np.linspace(70.0, 60.0, 6) * u.deg,
},
)

interpolator = PointingInterpolator()
interpolator.add_table(1, table)

alt, az = interpolator(tel_id=1, time=t0 + 1 * u.s)
assert u.isclose(alt, 69 * u.deg)
assert u.isclose(az, 1 * u.deg)

with pytest.raises(KeyError):
interpolator(tel_id=2, time=t0 + 1 * u.s)


def test_azimuth_switchover():
"""Test pointing interpolation"""
from ctapipe.io.pointing import PointingInterpolator

t0 = Time("2022-01-01T00:00:00")

table = Table(
{
"time": t0 + [0, 1, 2] * u.s,
"azimuth": [359, 1, 3] * u.deg,
"altitude": [60, 61, 62] * u.deg,
},
)

interpolator = PointingInterpolator()
interpolator.add_table(1, table)

alt, az = interpolator(tel_id=1, time=t0 + 0.5 * u.s)
assert u.isclose(az, 360 * u.deg)
assert u.isclose(alt, 60.5 * u.deg)


def test_invalid_input():
"""Test invalid pointing tables raise nice errors"""
from ctapipe.io.pointing import PointingInterpolator

wrong_time = Table(
{
"time": [1, 2, 3] * u.s,
"azimuth": [1, 2, 3] * u.deg,
"altitude": [1, 2, 3] * u.deg,
}
)

interpolator = PointingInterpolator()
with pytest.raises(TypeError, match="astropy.time.Time"):
interpolator.add_table(1, wrong_time)

wrong_unit = Table(
{
"time": Time(1.7e9 + np.arange(3), format="unix"),
"azimuth": [1, 2, 3] * u.m,
"altitude": [1, 2, 3] * u.deg,
}
)
with pytest.raises(ValueError, match="compatible with 'rad'"):
interpolator.add_table(1, wrong_unit)

wrong_unit = Table(
{
"time": Time(1.7e9 + np.arange(3), format="unix"),
"azimuth": [1, 2, 3] * u.deg,
"altitude": [1, 2, 3],
}
)
with pytest.raises(ValueError, match="compatible with 'rad'"):
interpolator.add_table(1, wrong_unit)


def test_hdf5(tmp_path):
from ctapipe.io import write_table
from ctapipe.io.pointing import PointingInterpolator

t0 = Time("2022-01-01T00:00:00")

table = Table(
{
"time": t0 + np.arange(0.0, 10.1, 2.0) * u.s,
"azimuth": np.linspace(0.0, 10.0, 6) * u.deg,
"altitude": np.linspace(70.0, 60.0, 6) * u.deg,
},
)

path = tmp_path / "pointing.h5"
write_table(table, path, "/dl0/monitoring/telescope/pointing/tel_001")
with tables.open_file(path) as h5file:
interpolator = PointingInterpolator(h5file)
alt, az = interpolator(tel_id=1, time=t0 + 1 * u.s)
assert u.isclose(alt, 69 * u.deg)
assert u.isclose(az, 1 * u.deg)
32 changes: 3 additions & 29 deletions ctapipe/io/tests/test_table_loader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import shutil

import astropy.units as u
import numpy as np
import pytest
Expand Down Expand Up @@ -440,34 +438,10 @@ def test_order_merged():
check_equal_array_event_order(table, tel_trigger[mask])


def test_interpolate_pointing(dl1_file, tmp_path):
from ctapipe.io import TableLoader, write_table

path = tmp_path / dl1_file.name
shutil.copy(dl1_file, path)

with TableLoader(dl1_file) as loader:
events = loader.read_telescope_events()
subarray = loader.subarray

# create some dummy monitoring data
time = events["time"]
start, stop = time[[0, -1]]
duration = (stop - start).to_value(u.s)

# start a bit before, go a bit longer
dt = np.arange(-1, duration + 2, 1) * u.s
time_mon = start + dt

alt = (69 + 2 * dt / dt[-1]) * u.deg
az = (180 + 5 * dt / dt[-1]) * u.deg

table = Table({"time": time_mon, "azimuth": az, "altitude": alt})

for tel_id in subarray.tel:
write_table(table, path, f"/dl0/monitoring/telescope/pointing/tel_{tel_id:03d}")
def test_interpolate_pointing(dl1_mon_pointing_file, tmp_path):
from ctapipe.io import TableLoader

with TableLoader(path, interpolate_pointing=True) as loader:
with TableLoader(dl1_mon_pointing_file, interpolate_pointing=True) as loader:
events = loader.read_telescope_events([4])
assert len(events) > 0
assert "telescope_pointing_azimuth" in events.colnames
Expand Down

0 comments on commit f45babe

Please sign in to comment.