In [2]:
from concurrent.futures.process import ProcessPoolExecutor

import satpy
import numpy as np
import matplotlib.pyplot as plt
import xarray
import pandas
import earthaccess
import os
from pathlib import Path
from datetime import datetime
import glob
from pykdtree.test_tree import test3d_8n_ub_leaf20
from skyfield.api import EarthSatellite
from skyfield.api import load, wgs84
from skyfield.iokit import parse_tle_file
from collections import defaultdict
from datetime import timedelta


In [10]:
class HistoricalOrbitAnalyzer:
    def __init__(self, satellites, sat_definitions):
        """
        satellites: list of EarthSatellite
        sat_definitions: dict {name: norad_id}
        """
        self.ts = load.timescale()
        self.sat_definitions = sat_definitions

        self.sats_by_norad = defaultdict(list)
        for sat in satellites:
            self.sats_by_norad[sat.model.satnum].append(sat)

        # sort TLEs by epoch
        for norad in self.sats_by_norad:
            self.sats_by_norad[norad].sort(key=lambda s: s.epoch.tt)

    def get_satellite_history(self, name):
        norad = self.sat_definitions[name]
        return self.sats_by_norad[norad]

    # calculates a satellite propogation
    def ground_track(self, name, points_per_tle=300, max_days_last=7.0):
        sats = self.get_satellite_history(name)

        lats, lons, heights, times = [], [], [], []

        for i, sat in enumerate(sats):
            t0 = sat.epoch

            if i < len(sats) - 1:
                t1 = sats[i + 1].epoch
                t_end = self.ts.tt_jd(0.5 * (t0.tt + t1.tt))
            else:
                t_end = self.ts.tt_jd(t0.tt + max_days_last)

            t = self.ts.tt_jd(
                np.linspace(t0.tt, t_end.tt, points_per_tle)
            )

            sp = wgs84.geographic_position_of(sat.at(t))
            lats.extend(sp.latitude.degrees)
            lons.extend(sp.longitude.degrees)
            heights.extend(sp.elevation.m)
            times.extend(t.utc_datetime())

        return {'lats':np.array(lats), 'lons':np.array(lons),'heights':np.array(heights), 'times':np.array(times)}


#################################################################################
def swath_bounds(lat, lon, heading_deg, swath_km):
    from pyproj import Geod
    geod = Geod(ellps="WGS84")

    half = 0.5 * swath_km * 1000.0

    lon_l, lat_l, _ = geod.fwd(lon, lat, heading_deg - 90, half)
    lon_r, lat_r, _ = geod.fwd(lon, lat, heading_deg + 90, half)

    return (lat_l, lon_l), (lat_r, lon_r)


#################################################################################
def swaths_intersect(track1, track2, time_tol_seconds=3600):
    from shapely.geometry import LineString
    from datetime import timedelta
    intersections = []

    for i in range(len(track1["times"])):
        t1 = track1["times"][i]

        # find nearby times in track2
        dt = np.abs(track2["times"] - t1)
        idx = np.where(dt < timedelta(seconds=time_tol_seconds))[0]

        if len(idx) == 0:
            continue

        p1 = track1["swath"][i]
        for j in idx:
            p2 = track2["swath"][j]

            if p1.intersects(p2):
                intersections.append((t1, i, j))

    return intersections

def groundtrack_intersections(track1, track2, max_km=100, max_dt_sec=10800):

    intersections = []

    for i, t1 in enumerate(track1["times"]):
        dt = abs(track2["times"] - t1)
        idx = np.where(dt < timedelta(seconds=max_dt_sec))[0]

        for j in idx:
            d = ground_distance_km(
                track1["lats"][i], track1["lons"][i],
                track2["lats"][j], track2["lons"][j]
            )
            if d <= max_km:
                intersections.append((t1, i, j, d))

    return intersections

from pyproj import Geod
geod = Geod(ellps="WGS84")

def ground_distance_km(lat1, lon1, lat2, lon2):
    _, _, dist_m = geod.inv(lon1, lat1, lon2, lat2)
    return dist_m / 1000.0

#################################################################################
def triple_intersections(intersections_ab, intersections_ac):
        times_ab = set(t for t, _, _ in intersections_ab)
        times_ac = set(t for t, _, _ in intersections_ac)

        return sorted(times_ab & times_ac)



In [4]:
# concatenate tle files together
tle_dir = Path("~/Downloads/").expanduser()

ts = load.timescale()
satellites = []

for tle_path in tle_dir.glob("sat00*"):
    with load.open(str(tle_path)) as f:
        satellites.extend(parse_tle_file(f, ts))

Plot satellite orbits historically based on TLE and using swath width find intersections of all three satellites within a certain time distinction... convert code to a class

In [5]:
from collections import defaultdict

sats_by_norad = defaultdict(list)

for sat in satellites:
    norad = sat.model.satnum
    sats_by_norad[norad].append(sat)

for norad, sat_list in sats_by_norad.items():
    sat_list.sort(key=lambda s: s.epoch.tt)

sat_name_by_norad = {}
# for norad, sat_list in sats_by_norad.items():
#     sat_name_by_norad[norad] = sat_list.name

In [6]:
sats_by_norad.keys()

dict_keys([25994, 27424, 39084, 40697, 43013, 49260, 54234])

In [7]:
# name, noradid
satellite_names = {
    "terra": 25994,
    "aqua": 27424,
    "landsat8": 39084,
    "sentinel2a": 40697,
    "noaa20": 43013,
    "landsat9": 49260,
    "noaa21": 54234,
    "sentinel2c":60989,
}

instrument_swath_widths = {
    'MODIS': 2330 * 1000,
    'VIIRS': 3060 * 1000,
    'Landsat': 185 * 1000,
    'Sentinel2': 290 * 1000
}

In [8]:
analyzer = HistoricalOrbitAnalyzer(satellites, satellite_names)

In [9]:
# analyzer.get_satellite_history('terra')
# terra_track = analyzer.ground_track('terra')
aqua_track = analyzer.ground_track('aqua')
# landsat8_track = analyzer.ground_track('landsat8')
# sentinel2a_track = analyzer.ground_track('sentinel2a')
noaa20_track = analyzer.ground_track('noaa20')
# landsat9_track= analyzer.ground_track('landsat9')
# noaa21_track = analyzer.ground_track('noaa21')
sentinel2c_track = analyzer.ground_track('sentinel2c')

In [None]:
terra_noaa20 = groundtrack_intersections(terra_track, noaa20_track)

100       10800
(datetime.datetime(2023, 1, 1, 6, 14, 4, 300524, tzinfo=datetime.timezone.utc), 88, 130, 88.41442853852912)
(datetime.datetime(2023, 1, 1, 7, 48, 28, 481670, tzinfo=datetime.timezone.utc), 196, 139, 99.35752584924283)
(datetime.datetime(2023, 1, 1, 8, 35, 40, 572243, tzinfo=datetime.timezone.utc), 250, 43, 54.54977417172736)
(datetime.datetime(2023, 1, 1, 14, 21, 45, 612781, tzinfo=datetime.timezone.utc), 352, 361, 98.74527114670794)
(datetime.datetime(2023, 1, 1, 15, 10, 54, 481960, tzinfo=datetime.timezone.utc), 413, 404, 86.69441415496365)
(datetime.datetime(2023, 1, 1, 15, 59, 15, 9056, tzinfo=datetime.timezone.utc), 473, 362, 98.46465919999125)
(datetime.datetime(2023, 1, 1, 16, 2, 28, 377510, tzinfo=datetime.timezone.utc), 477, 530, 24.365575208072837)
(datetime.datetime(2023, 1, 1, 16, 48, 23, 878235, tzinfo=datetime.timezone.utc), 534, 405, 73.46883931535405)
(datetime.datetime(2023, 1, 1, 17, 39, 9, 431621, tzinfo=datetime.timezone.utc), 597, 532, 61.1682434486

In [191]:
len(terra_track['lats'])

1412400

In [1]:
terra_landsat8 = groundtrack_intersections(terra_track, landsat8_track)

NameError: name 'terra_noaa20' is not defined

In [None]:
terra_sentinel2a = groundtrack_intersections(terra_track, sentinel2a_track)

In [None]:
terra_noaa21 = groundtrack_intersections(terra_track, noaa21_track)

In [None]:
terra_landsat9 = groundtrack_intersections(terra_track, landsat9_track)

In [None]:
aqua_noaa20 = groundtrack_intersections(aqua_track, noaa20_track)


In [None]:
aqua_landsat8= groundtrack_intersections(aqua_track, landsat8_track)


In [None]:
aqua_sentinel2a = groundtrack_intersections(aqua_track, sentinel2a_track)


In [None]:
aqua_noaa21 = groundtrack_intersections(aqua_track, noaa21_track)


In [None]:
aqua_landsat9 = groundtrack_intersections(aqua_track, landsat9_track)


In [212]:
import numpy as np
from datetime import timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from pyproj import Geod

# Create a Geod instance for distance calculation
geod = Geod(ellps="WGS84")

def ground_distance_km(lat1, lon1, lat2, lon2):
    _, _, dist_m = geod.inv(lon1, lat1, lon2, lat2)
    return dist_m / 1000.0  # Convert to kilometers

def check_distance(t1, i, track2, max_km):
    for j, t2 in enumerate(track2["times"]):
        d = ground_distance_km(
            track1["lats"][i], track1["lons"][i],
            track2["lats"][j], track2["lons"][j]
        )
        if d <= max_km:
            return (t1, i, j, d)
    return None

def groundtrack_intersections(track1, track2, max_km=100, max_dt_sec=10800):
    intersections = []
    print(max_km, '     ', max_dt_sec)

    with ThreadPoolExecutor() as executor:
        futures = []
        print("starting")
        for i, t1 in enumerate(track1["times"]):
            dt = np.abs(track2["times"] - t1)
            idx = np.where(dt < timedelta(seconds=max_dt_sec))[0]

            for j in idx:
                futures.append(executor.submit(check_distance, t1, i, track2, max_km))

        for future in as_completed(futures):
            result = future.result()
            if result:
                intersections.append(result)
                print(result)

    return intersections

#################################################################################

def triple_intersections(intersections_ab, intersections_ac):
    times_ab = set(t for t, _, _ in intersections_ab)
    times_ac = set(t for t, _, _ in intersections_ac)

    return sorted(times_ab & times_ac)
