diff --git a/pyorbital/sno_utils.py b/pyorbital/sno_utils.py index 1abbae9..2de4f84 100644 --- a/pyorbital/sno_utils.py +++ b/pyorbital/sno_utils.py @@ -20,6 +20,7 @@ import json import datetime as dt +# from datetime import timedelta, timezone from datetime import timedelta from geopy import distance # from geojson import dump @@ -30,8 +31,8 @@ import pyresample as pr from pyorbital.config import get_config from pyorbital.orbital import Orbital -from pyorbital.tlefile import Tle from pyorbital.tlefile import SATELLITES +from pyorbital.tle_archive import get_tle_archive from trollsift.parser import Parser from pathlib import Path @@ -77,7 +78,6 @@ } ZERO_SECONDS = timedelta(seconds=0) -max_tle_days_diff = 3 LOG = logging.getLogger(__name__) @@ -350,48 +350,6 @@ def get_sno_point(calipso, the_other_one, arc_calipso, arc_the_other_one, tobj, return match -def populate_tle_buffer(filename, TLE_ID, MY_TLE_BUFFER): - """Populate the TLE buffer.""" - with open(filename, 'r') as fh_: - tle_data_as_list = fh_.readlines() - for ind in range(0, len(tle_data_as_list), 2): - if TLE_ID in tle_data_as_list[ind]: - tle = Tle(TLE_ID, line1=tle_data_as_list[ind], line2=tle_data_as_list[ind+1]) - # dto = datetime.strptime(tle.epoch, '%Y-%m-%dT%H:%M:%S:%f') - ts = (tle.epoch - np.datetime64('1970-01-01T00:00:00Z')) / np.timedelta64(1, 's') - tobj = dt.datetime.utcfromtimestamp(ts) - MY_TLE_BUFFER[tobj] = tle - - -def get_tle_archive(timestamp, filename, TLE_ID, MY_TLE_BUFFER): - """Get Two-Line elements from the archive. - - The TLE buffer MY_TLE_BUFFER is being updated. - """ - # Read tle data if not already in buffer - if len(MY_TLE_BUFFER) == 0: - populate_tle_buffer(filename, TLE_ID, MY_TLE_BUFFER) - - for tobj in MY_TLE_BUFFER: - if tobj > timestamp: - deltat = tobj - timestamp - else: - deltat = timestamp - tobj - if np.abs((deltat).days) < 1: - return MY_TLE_BUFFER[tobj] - for delta_days in range(1, max_tle_days_diff + 1, 1): - for tobj in MY_TLE_BUFFER: - if tobj > timestamp: - deltat = tobj - timestamp - else: - deltat = timestamp - tobj - if np.abs((deltat).days) <= delta_days: - print("Did not find TLE for {:s}, Using TLE from {:s}".format(tobj.strftime("%Y%m%d"), - timestamp.strftime("%Y%m%d"))) - return MY_TLE_BUFFER[tobj] - print("Did not find TLE for {:s} +/- 3 days") - - def get_closest_sno_to_reference(all_features, rfeature, tol_seconds=ZERO_SECONDS): """Check all features found and find the one matching the reference. diff --git a/pyorbital/tests/test_tle_archive.py b/pyorbital/tests/test_tle_archive.py new file mode 100644 index 0000000..bdd8226 --- /dev/null +++ b/pyorbital/tests/test_tle_archive.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2024 Pyorbital developers + + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Testing the TLE archive functions.""" + + +import datetime as dt +import pyorbital +from pyorbital.tle_archive import populate_tle_buffer + + +def test_populate_tle_buffer(fake_tle_file1_calipso): + """Test populate the TLE buffer.""" + tle_filename = str(fake_tle_file1_calipso) + tlebuff = {} + tleid_calipso = '29108' + populate_tle_buffer(tle_filename, tleid_calipso, tlebuff) + + with open(tle_filename, 'r') as fpt: + tlelines = fpt.readlines() + + expected_dtimes = [dt.datetime(2013, 12, 31, 13, 38, 47, 751936), + dt.datetime(2014, 1, 1, 17, 39, 47, 34720), + dt.datetime(2014, 1, 2, 20, 1, 53, 254560), + dt.datetime(2014, 1, 4, 0, 2, 52, 226304)] + for idx, key in enumerate(tlebuff.keys()): + assert key == expected_dtimes[idx] + + for idx, key in enumerate(tlebuff.keys()): + tleobj = tlebuff[key] + assert isinstance(tleobj, pyorbital.tlefile.Tle) + assert tlelines[idx*2].strip() == tleobj.line1 + assert tlelines[idx*2+1].strip() == tleobj.line2 diff --git a/pyorbital/tle_archive.py b/pyorbital/tle_archive.py new file mode 100644 index 0000000..4b76cca --- /dev/null +++ b/pyorbital/tle_archive.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2024 Pyorbital developers + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Functions to support handling many archived TLEs.""" + + +import numpy as np +import datetime as dt +# from datetime import timedelta, timezone +from pyorbital.tlefile import Tle + +max_tle_days_diff = 3 + + +def populate_tle_buffer(filename, tle_id, tle_buffer): + """Populate the TLE buffer.""" + with open(filename, 'r') as fh_: + tle_data_as_list = fh_.readlines() + for ind in range(0, len(tle_data_as_list), 2): + if tle_id in tle_data_as_list[ind]: + tle = Tle(tle_id, line1=tle_data_as_list[ind], line2=tle_data_as_list[ind+1]) + # dto = datetime.strptime(tle.epoch, '%Y-%m-%dT%H:%M:%S:%f') + ts = (tle.epoch - np.datetime64('1970-01-01T00:00:00Z')) / np.timedelta64(1, 's') + tobj = dt.datetime.utcfromtimestamp(ts) + tle_buffer[tobj] = tle + + +def get_tle_archive(timestamp, filename, tle_id, tle_buffer): + """Get Two-Line elements from the archive. + + The TLE buffer tle_buffer is being updated. + """ + # Read tle data if not already in buffer + if len(tle_buffer) == 0: + populate_tle_buffer(filename, tle_id, tle_buffer) + + for tobj in tle_buffer: + if tobj > timestamp: + deltat = tobj - timestamp + else: + deltat = timestamp - tobj + if np.abs((deltat).days) < 1: + return tle_buffer[tobj] + + for delta_days in range(1, max_tle_days_diff + 1, 1): + for tobj in tle_buffer: + if tobj > timestamp: + deltat = tobj - timestamp + else: + deltat = timestamp - tobj + if np.abs((deltat).days) <= delta_days: + print("Did not find TLE for {:s}, Using TLE from {:s}".format(tobj.strftime("%Y%m%d"), + timestamp.strftime("%Y%m%d"))) + return tle_buffer[tobj] + print("Did not find TLE for {:s} +/- 3 days")