In [1]:
import itertools
import pytz

import numpy as np
from tables import *
from datetime import datetime, timedelta

import pandas as pd

from skyfield import api
from skyfield.sgp4lib import EarthSatellite
from skyfield.framelib import itrs

Open up our reduced/cleaned TLE dataz

In [2]:
h5path = "/data/Indexed_TLE/reduced.h5"

In [3]:
def tle_dataframe(records):
    return pd.DataFrame.from_records(records, columns=["epoch", "norad_id", "tle1", "tle2"])

def get_all_tles(norad_id: int):
    """
    Return a pandas DataFrame, where rows correspond to a TLE
    
    (
        epoch_time: float,
        norad_id: int,
        tle_line_1: bytes,
        tle_line_2: bytes
    )
    """
    h5ro = open_file(h5path, mode="r")
    records = h5ro.root.tle_sorted.read_where("norad_id=={}".format(norad_id))
    h5ro.close()
    # Sort records based on time
    return tle_dataframe(sorted(records, key=lambda row: row[0]))

def get_all_ids():
    """
    Get all Norad ID that are present in this dataset in ascending order.
    """
    h5ro = open_file(h5path, mode="r")
    results = set(h5ro.root.tle_sorted.cols.norad_id)
    h5ro.close()
    return list(sorted(results))

max_extrap = timedelta(days=7).total_seconds()


class TLEManager:
    """
    This class is responsible for determining when and which TLE's are
    valid/useful for a particular period in time. It additionally has
    methods for computing Lat/Long/Dist data based on the windows
    computes
    """
    def __init__(self, norad_id: int):
        self.records = get_all_tles(norad_id)
        self._ts = api.load.timescale()
        
    def get_known_timespan(self):
        """
        Return datetimes corresponding to the first and last TLE epoch values
        in our record set
        """
        times = self.get_tle_times()
        return times[0], times[-1]
    
    def get_tle_times(self):
        return self.records.epoch.to_numpy()
    
    def get_compute_windows(self):
        """
        Calculate the windows over which all of the TLE's for this identifier are
        valid. When used on a dataset pruned down for the AIS valid time periods,
        this will automatically
        
        [
          (start_epoch, end_epoch, tle1, tle2)
        ]
        """
        times = self.get_tle_times()
        ntle = times.shape[0]
        
        windows = []
        for i in range(ntle):
            # For each TLE there is a window of time around that we will use to
            # pre-compute the position of the satellite.
            
            # Logic for window starting
            if i == 0:
                # Handle the first record, no TLE before, so start centered on it
                start = times[0]
            else:
                # If the time since the last TLE has been more than two weeks
                # then start this window two weeks back
                # Otherwise this window starts halfway between this and the last
                # TLE to minimize the projection error
                halfway_to_last = (times[i-1] + times[i]) /2
                two_weeks_back = times[i] - max_extrap
                start = max(halfway_to_last, two_weeks_back)
            
            # Logic for window ending
            if i == ntle-1:
                # Handle the last record. Do not project past it
                end = times[-1]
            else:
                # If the time between this TLE and the next is more than two weeks
                # project forward at most two weeks time, then close the window
                halfway_to_next = (times[i] + times[i+1]) / 2
                two_weeks_forward = times[i] + max_extrap
                
                end = min(halfway_to_next, two_weeks_forward)
                
            windows.append((int(round(start)), int(round(end)), self.records.tle1[i], self.records.tle2[i]))
        
        return windows
    
    def _epoch_to_julian(self, time: float) -> float:
        return self._ts.from_datetime(datetime.utcfromtimestamp(time).replace(tzinfo=pytz.utc)).tt
    
    def compute_lat_long_dist(self, start_epoch: int, end_epoch: int, tle1, tle2):
        """
        Given a pair of integer epoch times (in seconds)
        Compute an array of times, lats, longs and distances.
        
        
        """
        # Convert the start and end into julian
        start_time_j = self._epoch_to_julian(start_epoch)
        end_time_j = self._epoch_to_julian(end_epoch)
        
        n_time_steps = int(round((end_epoch - start_epoch) / 60))
        
        julian_times = self._ts.tt_jd(np.linspace(start_time_j, end_time_j, n_time_steps))
        epoch_times = np.linspace(start_epoch, end_epoch, n_time_steps)
        
        sat = EarthSatellite(tle1.decode(), tle2.decode())
        lats, longs, dists = sat.at(julian_times).frame_latlon(itrs)
        
        return epoch_times, lats.degrees, longs.degrees, dists.m

    def compute_tlla_sequence(self):
        windows = self.get_compute_windows()
        to_concat = []

        for start, end, tle1, tle2 in windows:
            tlla = self.compute_lat_long_dist(start, end, tle1, tle2)
            tlla = np.vstack(tlla)            
            to_concat.append(tlla)

        return np.hstack(to_concat).astype(np.float32)


In [4]:
# This takes a couple seconds....
all_ids = get_all_ids()
len(all_ids)

19404

In [None]:
windows = tlem.compute_tlla_sequence()

In [None]:
windows.shape

Lets really quick check that the windows we compute and the trajectories meet end-to-end for a semi-random data point

In [None]:
import matplotlib.pyplot as plt

fig, (ax1, ax2) = plt.subplots(1, 2)
fig.set_size_inches(20,  10)

tlem = TLEManager(list(all_ids)[-3001])
windows = tlem.get_compute_windows()

for i, (start, end, tle1, tle2) in enumerate(windows):
    times, lats, longs, alts = tlem.compute_lat_long_dist(start, end, tle1, tle2)
  
    ax1.set_title("Latitude")
    ax1.plot(times, lats, alpha=0.4)

    ax2.set_title("Longitude")
    ax2.plot(times, longs, alpha=0.4)


# Add vertical lines where there are TLE epochs
xmin, xmax = ax1.get_xlim()
for t in tlem.get_tle_times():
    if(t > xmin and t < xmax):
        ax1.axvline(t, ls="--")
        ax2.axvline(t, ls="--")

This looks ok at a cursory glance


In [5]:
def do_the_work(sat_id: int):
    return TLEManager(sat_id).compute_tlla_sequence()

In [6]:
# Trying Dask:
#from dask.distributed import Client, as_completed
#client = Client(memory_limit='8GB')  # start local workers as processes
#client

# Trying concurrent.futures:
from concurrent.futures import ProcessPoolExecutor, as_completed
client = ProcessPoolExecutor()

In [8]:
precompute = open_file("/data/Indexed_TLE/precomp2.h5", "a")

In [None]:
if not hasattr(precompute.root, "sat"):
    print("Created new group")
    sat_group = precompute.create_group("/", "sat", "Precomputed satellite position cache")
else:
    print("Found existing group")
    sat_group = precompute.get_node("/sat")

    
already_completed = set(int(k.replace("s", "")) for k in dir(sat_group) if not k.startswith("_"))
print("Already completed: %i" % len(already_completed))
todo_ids = set(all_ids).difference(already_completed)

fut_to_id = {client.submit(do_the_work, sat_id): sat_id for sat_id in todo_ids}

from tqdm import tqdm

with tqdm(total=len(fut_to_id)) as pbar:
    pbar.update(len(already_completed))
    for fut in as_completed(fut_to_id.keys()):
        sat_id = fut_to_id[fut]
        data = fut.result()

        pbar.set_description("%i: %s" % (sat_id, str(data.shape)))
        # Create the hdf5 carray and populate it with the data
        name = "s" + str(sat_id)
        sat_array = Array(sat_group, name , data, title="Data for satellite with norad id: %s" % sat_id)
        pbar.update(1)

Created new group
Already completed: 0


  0%|          | 0/19404 [00:00<?, ?it/s]

In [None]:
precompute.close()
client.shutdown(wait=False)

In [None]:
from datetime import datetime
from tables import open_file
import numpy as np

computed = open_file("/data/Indexed_TLE/precomp-sample.h5", mode="r")
sat_group = computed.get_node("/sat")


class SatelliteDataReader:
    def __init__(self, h5_node):
            self.node = h5_node

    def get_satellites_contained():
        return set(int(k.replace("s", "")) for k in dir(self.node) if not k.startswith("_"))

    def get_precomputed_tracks(satellite: int, start: datetime, end: datetime):
        """
        Return a listing of active satellites and positions that spans `start` to `end`
        datetimes. The arrays returned will have `times` elements, which correspond to the
        number of integer minutes between the starting points of the two arrays.

        This returns an array:
        @returns np.array (times, 3)

        """
        name = "s" + str(satellite)    
        assert hasattr(self.node, name), "No Data for satellite with norad ID: %i" % norad_id
        dataz = getattr(self.node, name)[:]

        start_index = np.searchsorted(dataz[0, :], start.timestamp())
        end_index   = np.searchsorted(dataz[0, :], end.timestamp())
        return dataz[:, start_index: end_index]
    
    

In [None]:
get_satellites_contained()

In [None]:
t, lat, long, dist = get_precomputed_tracks(312, datetime(2010, 1, 1), datetime(2020, 1, 1))