# TAT-C for Scalable Services Report
PI: Paul T. Grogan paul.grogan@asu.edu

Contributors: Josue Tapia josue.tapia@asu.edu and Suvan Kumar skuma208@asu.edu

# Introduction

This report provides an overview of the Tradespace Analysis Tool for Constellations (TAT-C) architecture for scalable services to support trade space exploration and assesment of hypothetical mission designs autonomously and to interface with the NOAA's ASPEN trade space evaluation tool. The new architecture leverages the Celery python library to distribute and execute tasks in parallel, reducing the simulation runtime per mission assesment. This work has been performed under the project "OSSE / Trade Space Capability for NOAA's Future Mission Design."

# Background
## TAT-C Capabilities
The Tradespace Analysis Tool for Constellations (TAT-C) is an open-source Python software package for early-stage mission modeling, simulation, and analysis for Earth-observing satellite constellations. It was developed with support from the NASA Advanced Information Systems Technology (AIST) program grants NNX17AE06G, 80NSSC17K0586, 80NSSC20K1118, and 80NSSC21K1515. This project uses TAT-C version 3 which is the third major revision to the tool.

TAT-C simulates the orbital motion and observability conditions of satellite-based instruments and produces data such as orbit track, ground track (projected sensor area), and observation records. Analysis methods compute key mission performance metrics such as revisit time (time between subsequent observations of a fixed point) and data latency (time from observation of a fixed point until the first available downlink opportunity) to support trade studies.

The overall objective of this project is to use TAT-C as a pre-processor for the Advanced Systems Performance Evaluation tool for NOAA (ASPEN) Sensor Constellation/Performance (SCP) table. SCP columns that can be informed by TAT-C analysis include (descriptions from ASPEN-91 definition tables):

*  Temporal Refresh: "Time between obsevations at a location, i.e., time to obseve the geographic coverage region D."
*  Data Latency: "Time from 'image taken' to full relay of data to a ground station." (Note: TAT-C does not consider processing time as a part of data latency; in other words, an additional factor must be added to the TAT-C results.)
  
TAT-C models instrument observability constraints rather geophysical variables. Instruments selected for this report consist of infrared and microwave sounders and visual/infrared imagers.

# TAT-C Configuration
This report uses TAT-C version 3.1.3 which is currently in development on the main branch of GitHub. Technical documentation is available at ReadTheDocs.

TAT-C can be installed from PyPI via the terminal/shell command `pip install tatc`

Alternatively to install TAT-C, clone the repository:

    git clone https://github.com/code-lab-org/tatc
and install the dependencies into a new environment (tatc_env) using conda:

    conda env create -f environment.yml
(note: dependency resolution can take upwards of 10 minutes). Activate the new environment when complete:

    conda activate tatc_env
and register the TAT-C library in "editable" mode (enables source changes, if desired):

    pip install -e .
This report also requires the following additional dependencies for parallel processing and interactive features of a Jupyter notebook:

    conda install ipython joblib pandarallel -c conda-forge
and a world country-level shapefile `ne_110m_admin_0_countries.zip` available from NaturalEarth (Select "Download countries" and save the .zip file in the same directory as this notebook).

#  Simulation Runtime Limitation
The TAT-C is a long runtime simulation analysis tool due to arduos tasks. As a result, TAT-C can be used for narrow trade space analysis, which evaluates the performance of a few mission architectures. This limitation hinders mission designs that could maximize system performance while minimizing costs.

This leads to the need for scalable approaches that reduce the TAT-C's simulation runtime. The following section describes a scalable architecture that relies on the Python Celery library, which computes tasks in parallel and asynchronously.

# Scalable Architecture

The image bellow represents the scalabe architecture developed in this report. The architecture has three main components: the TAT-C application, the backend and message broker, and the machine workers. The TAT-C application splits big tasks into small ones. The celery framework supports a message broker (Rabbit MQ) and a database (Redis). The message broker places tasks in a queuing system and distributes them to machine workers, which execute tasks. As workers complete tasks, they store the results in the Redis database. Once all tasks are complete, TAT-C requests the results to the Redis database and aggregates the results to compute coverage analysis. The following subsections address each of these main components.

<img src='./images/Scalable_Architecture.png' align='center'/>

## TAT-C Application

This subsection shows how the TAT-C application breaks down a big task, such as the refresh analysis for a mission architecture, into small and independent tasks that can be executed in parallel. Additionally, the subsection describes the Celery workflows used in the architecture.  

### Celery Workflows

Celery processes and handles data flows as shown in the figure below.

<img src='./images/celery_workflows.png' width='600' align='center'/>

* The chain workflow executes tasks in sequence, one after another. The results of each tasks are passed to the following task as the first argument. The output is a result value processed by the last task.
* The group workflow executes a set of tasks in parallel. The output is a list of results of each task.
* The chord workflow executes a task after a set of tasks computed in parallel. The list of results from the the group task is passed to the last task as the first argument. The output is a result processed by the last task.

Note that the data transferred from one task to another needs to be serialized. Since tatc outputs geodataframes with coverage statistics, we need to serialize the geodataframes to pass results from one taks to the next.

### Grouping Small Tasks for Scalable Services

There are several ways to structure functions in tatc to provide coverage or data latency statistics using Celery's workflows. We determined that the following workflow offers a lower network latency. The diagram below shows the workflow to run revisit time/harmonic mean analysis. 

<img src='./images/run_coverage_analysis.jpg' width='600' align='center'/>

The `run_coverage_analysis_task` computes revist time statistics of a constellation over one target point in the grid. Each task of the group workflow is executed in parallel. Celery sends the group results as a list to the `merge_feature_collection` task, which merges a list of geodataframes into one.

##  Message Broker and Backend

This project utilzes the Rabbit MQ message broker and Redis backend hosted in Amazon Web Servicies (AWS).  

## IP Data Transfer

The communication between message broker, backend, tatc appication, and machine workers is done by using the https protocol. The protocol encrypts the messages. The advantage of using the https protocol in the network is that it can connect workers that are distributed globally.

## Machine Workers
We built a docker image that builds a worker machine container. This container carries all dependencies needed to run a coverage analys using tatc. The image is publicly available in Docker Hub (codelaborg/tatc_aspen_pworker:latest). Alternatively, the image file is shown below:

In [None]:
# This block defines the TAT-C runtime container using the appropriate
# base Python environment.

FROM python:3.10 AS tatc_runtime

WORKDIR /var/tatc
COPY pyproject.toml ./
COPY src src #copies the tatc library in this image
RUN python -m pip install . --no-cache-dir

# This block defines the TAT-C worker container. Using the TAT-C runtime
# container, it installs and starts the worker application.

FROM tatc_runtime AS tatc_worker

WORKDIR /var/tatc
RUN python -m pip install .[app] --no-cache-dir

COPY tatc_app tatc_app
COPY resources resources

ENV TATC_BROKER
ENV TATC_BACKEND

CMD ["celery", "-A", "tatc_app.aws_worker", "worker", "--uid=nobody", "--gid=nogroup"]

This image file is located in the same directory as the tatc_app file, which contains the celery application initializer (the `aws_worker` file) as well as the coverage-task file. The celery application is shown below:

In [None]:
from celery import Celery
from skyfield.api import load
import ssl

app=Celery('tatc_app',
           broker='TATC_BROKER_KEY',
           broker_use_ssl= {
               "keyfile": None,
               "certfile":None,
               "ca_certs": None,
               "cert_reqs": ssl.CERT_NONE,
               },
           backend='TATC_BACKEND_KEY',
           redis_backend_use_ssl={
               "ssl_keyfile": None,
               "ssl_certfile": None,
               "ssl_ca_certs": None,
               "ssl_cert_reqs": ssl.CERT_NONE,
               },
           include=['tatc_app.latency_tasks', 'tatc_app.coverage_tasks']
    )

load("de421.bsp")
if __name__=='__main__':
    app.start()

The following script shows coverage_tasks file, which contains the underlying tatc functions that support the `run_coverage_analysis_task` and the `merge_feature_collection_task`.

In [None]:
from datetime import datetime, timedelta
import geopandas as gpd
from geojson_pydantic import FeatureCollection
import json
from itertools import chain
import pandas as pd
from tatc.schemas.instrument import Instrument
from tatc.schemas.point import Point
from tatc.schemas.satellite import Satellite
from tatc.analysis.coverage import (
    collect_multi_observations,
    aggregate_observations,
    reduce_observations,
    grid_observations,
)

#from .schemas import CoverageAnalysisResult
from .aws_worker import app
@app.task
def run_coverage_analysis_task(
    point: str, satellites: list, start: str, end: str
) -> str:
    """
    Task to run coverage analysis.

    Args:
        point (str): JSON serialized :class:`tatc.schemas.Point` object.
        satellites (list): List of JSON serialized :class:`tatc.schemas.Satellite` objects.
        start (str): ISO 8601 serialized start time.
        end (str): ISO 8601 serialized end time.

    Returns:
        str: GeoJSON serialized `FeatureCollection` containing coverage analysis.
    """
    # call analysis function, parsing the serialized arguments
    results = reduce_observations(
        aggregate_observations(
            collect_multi_observations(
                Point.parse_raw(point),
                [Satellite.parse_raw(satellite) for satellite in satellites],
                datetime.fromisoformat(start),
                datetime.fromisoformat(end),
            )
        )
    )
    # re-serialize constituent data
    results["access"] = results["access"].apply(lambda t: t/timedelta(hours=1))
    results["revisit"] = results["revisit"].apply(lambda t: t/timedelta(hours=1))
    return results.to_json(show_bbox=False, drop_id=True)

@app.task
def merge_feature_collections_task(collections: list) -> str:
    """
    Task to merge a list of feature collections into a single feature collection.

    Args:
        collections (list): GeoJSON serialized list of feature collections.

    Results:
        str: GeoJSON serialized feature collection.
    """
    return FeatureCollection(
        type="FeatureCollection",
        features=list(
            chain(
                *list(
                    FeatureCollection.model_validate_json(collection).features
                    for collection in collections
                )
            )
        ),
    ).model_dump_json()

#  Tradespace Analysis

This section describes the script to perform a tradespace of 100 mission architectures using the scalable architecture. The 100 architecture are defined as a combination of multiple constellations:

* Set of mid-inclination Orbit Alternatives: 32 alternatives with field of regard (FOR) [60,80], altitude [450, 550]Km, inclination [30, 60], walker ad star configurations, satellite and planes [(6,2), (12,4)].
* Set of sunsynchronous orbit alternatives: three constellations with one satellite in one plane, two satellites in two planes, and three satellites in three planes. We generate a 96 architectures by combining each set the mid-inclination orbits and the sunsynchronous orbits.
* One constellation with 60 satellites with small swath width (50 Km) and nadir-pointing sensors.
* Three constellations with nadir-point satellites located in a sunsynchronous orbit and in three orbital planes.

The `all_cons` variable in the script below contains all mission architectures in a list.

In [None]:
from tatc.schemas import (CircularOrbit, Satellite,
                          SunSynchronousOrbit, TwoLineElements,
                          Instrument, WalkerConstellation
                          )
from tatc.utils import swath_width_to_field_of_regard
######Circular and Mid-inclination Constellations  #####
name:list=[]
sats:list=[]
inc:list=[]
slew:list=[]
pl:list=[]
# list field of regard
fors:list= [60, 80]
# list of altitudes in Km
altitudes: list= [450, 550]
# list of inclinations
inclination: list =[ 30, 60]
cons_configuration:list=['star', 'delta']
sat_pl:list=[(6, 2), (12, 4)]
constellations:list=[]
co=0
for f in fors:
    for alt in altitudes:
        for incl in inclination:
            for config in cons_configuration:
                for rt in sat_pl:
                    orb= CircularOrbit(type='circular',
                                       altitude=alt*1000,
                                       inclination=incl,

                                       )
                    inst=Instrument(name='ins1',
                                    field_of_regard=f,
                                    fixed_access_time=True)
                    constellations.append(WalkerConstellation(name=f'Circular_{f}_alt_incl_con_{config}_satratio{rt[0]}/{rt[1]}',
                                            orbit= orb.to_tle(),
                                            instruments=[inst],
                                            number_satellites=rt[0],
                                            number_planes=rt[1],
                                            configuration=config,).generate_members())

                    sats.append(rt[0])
                    pl.append(rt[1])
                    inc.append(incl)
                    slew.append(f)
                    co+=1

#######
h_ref=800e3 #meters
viirs = Instrument(
    name="VIIRS",
    field_of_regard=swath_width_to_field_of_regard(h_ref, 3000e3),

    fixed_access_time=True
)

sunsyn=[]
num_sat=3
num_pla=3
for i in range(num_pla):

    h_ref=800e3
    viirs = Instrument(
        name="VIIRS",
        field_of_regard=swath_width_to_field_of_regard(h_ref, 3000e3),

        fixed_access_time=True
    )

    d=SunSynchronousOrbit(name='test',
                          type='sso',
                          altitude=h_ref,
                          equator_crossing_time= '20:00',
                          equator_crossing_ascending= True
                          )

    sunsyn.append(WalkerConstellation(name=f'Sunsynchronous_plane{i}',
                            orbit= d.to_tle(),
                            instruments=[viirs],
                            number_satellites=num_sat,
                            number_planes=i+1,
                            configuration='star',
                                    ).generate_members()
                  )

c_one=[sats+ sunsyn[0] for sats in constellations]
c_two=[sats+ sunsyn[1] for sats in constellations]
c_three=[sats+ sunsyn[2] for sats in constellations]

total_constellations= c_one+ c_two + c_three # 96 mission architectures


s2=[WalkerConstellation(name='Starlink',
                       orbit=CircularOrbit(type='circular',
                                          altitude=600e3,
                                          inclination=53,
                                          ).to_tle(),
                       instruments=[Instrument(
                                   name="startlink",
                                   field_of_regard=swath_width_to_field_of_regard(600e3, 50e3), #assumes 50 Km swath
                                   fixed_access_time=True
                               )],
                       number_satellites= 60,
                       number_planes=12,
                       configuration='star'

                       ).generate_members()
    ]
total_constellations= total_constellations+ s2 

small_sunsys=[WalkerConstellation(name=f'sunsy_planes{np}',
                       orbit=SunSynchronousOrbit(name='sunsyn',
                                             type='sso',
                                             altitude=600e3,
                                             equator_crossing_time= '20:00',
                                             equator_crossing_ascending= True
                                             ).to_tle(),
                       instruments=[Instrument(
                                   name="ST_V2",
                                   field_of_regard=swath_width_to_field_of_regard(600e3, 20e3), #assumes 20 Km swath 
                                   fixed_access_time=True
                               )],
                       number_satellites= 60,
                       number_planes=np,
                       configuration='star'

                       ).generate_members()
              for np in [5, 15, 30]
              ]

all_cons= total_constellations+small_sunsys


Once the 100 mission architectures have been defined, we run a revisit time analysis for each architecture in the list. The following script details the functions we defined to run this analysis. The `run_celery_revisit` function builds a task request that performs revisit time for a mission architecture. This function writes the coverage results in a file called "trade_results" with the name of the respective architecture and writes harmonic mean in a file called "hmean_list.txt". Writing results of each mission architecture will prevent re-running tasks in the case of network dispruption. Additionally, the function `tradespace_analysis_revisit` builds a list of harmonic mean values provided by the `run_celery_revisit`.

In [None]:
from tatc.schemas import Satellite, Instrument,Architecture, Point
from tatc.generation import generate_fibonacci_lattice_points
from typing import List
from pydantic import BaseModel, Field
from datetime import datetime, timezone, timedelta
from celery import chain, group
import time
from tatc_app.coverage_tasks import (run_coverage_analysis_task, 
                                    merge_feature_collections_task
                                    )
from geojson_pydantic import FeatureCollection
import geopandas as gpd
from scipy import stats 
#from define_architectures_for_trade import all_cons
from geojson import dump
import numpy as np


def run_celery_revisit(start:datetime, end: datetime, 
                       arch: Architecture, points: gpd.GeoDataFrame) -> float:
    constellation= arch.satellites
    task= chain(
        group(
            run_coverage_analysis_task.s(
                point.model_dump_json(),
                [sat.model_dump_json() for sat in constellation],
                start.isoformat(),
                end.isoformat(),
               )
            for point in points
            ),
        merge_feature_collections_task.s(),
        )()
    finished=False
    while finished==False:
        time.sleep(1.5)
        finished= task.ready()
    #print(f"{arch.name} has been executed successfully")
    res=FeatureCollection.model_validate_json(task.get())
    gdf_r= gpd.GeoDataFrame.from_features(res)
    gdf_r=gdf_r.dropna()
    
    re=np.round(stats.hmean(gdf_r.revisit), 2)
   
    #print('hmean=', re)
    with open(f'trade_results/{arch.name}.geojson', 'w') as f:
       dump(res, f)
    
    with open('hmean_list.txt', 'a') as file:
        file.write(f'{re}\n')
    return re 


def tradespace_analysis_revisit(start: datetime,
                        end: datetime,
                        missions: list[Architecture],
                        points: gpd.GeoDataFrame):
    results=[run_celery_revisit(start, end, arch, points) for arch in missions]
    
    return results


start = datetime(
    year=2023, 
    month=9, 
    day=15, 
    hour=0, 
    minute=0, 
    tzinfo=timezone.utc
)
end = start + timedelta(days=30)


points_df = generate_fibonacci_lattice_points(1000e3)

points = points_df.apply(
    lambda r: Point(
        id=r.point_id, 
        latitude=r.geometry.y, 
        longitude=r.geometry.x
    ),
    axis=1,
)

from tatc.schemas import GroundStation

mcmurdo = GroundStation(
    name="McMurdo", 
    latitude=-77.846323, 
    longitude=166.668235, 
    elevation=150, 
    min_elevation_angle=5
)
svalbard = GroundStation(
    name="Svalbard", 
    latitude=78.229772, 
    longitude=15.407786, 
    elevation=450, 
    min_elevation_angle=5
)
fairbanks = GroundStation(
    name="Fairbanks", 
    latitude=64.97381, 
    longitude=-147.50575, 
    elevation=400, 
    min_elevation_angle=5
)
troll = GroundStation(
    name="Troll", 
    latitude=-72.016667, 
    longitude=2.533333, 
    elevation=1275, 
    min_elevation_angle=5
)

ground_network = [mcmurdo, svalbard, fairbanks, troll]


missions= [Architecture(name= f"mission_{i}",
                       satellites= all_cons[i],
                       stations= ground_network
                       )
           for i in range(len(all_cons))
          ]

results=tradespace_analysis_revisit(start,
                        end,
                        missions,
                        points)

## Results

This section discusses the results of the tradespace analysis. The graph on the right shows the average sample per point (ASpP) on the x-axis and the harmonic mean in hours on the y-axis. The four outliers shown on the upper left side represent the architectures with 60 satellites. While these constellations have many satellites, their sensor features a small and nadir-pointing swath width, which limits the satellite coverage. On the other hand, the best designs perform low harmonic mean with high ASpP; the best designs are located at the lower right side of the graph. We plot a second graph with only the best designs. Note that these designs comprise 15 satellites. 

The second graph displays the number of planes vs harmonic mean for all best desings. We observe that designs with fewer planes offer a relatively lower harmonic mean than designs with higher planes in their constellations. Expectedly, designs with a wide field of regard (FOR) perform better than designs with a narrow FOR.

<img src='./images/results.png' align='center'/>


# Conclusion

This report developed a scalable architecture, which reduces TAT-C's simulation runtime by approximately 70% compared to a single-machine worker network. The architecture utilizes the Python celery library to perform independent tasks asynchronously. We split a big task, such as revisit time analysis for a mission architecture, into small and independent tasks to process in parallel. The small tasks perform revisit time per each target point in the grid because it provides low network latency. 

Also, we performed a tradespace analysis considering 100 hypothetical architectures. The results show that an architecture with 15 satellites in five orbital planes offers a lower harmonic mean than other architectures with more orbital planes, which increases the cost of a mission. The scalable architecture is an instrumental tool for exploring a wide range of hypothetical mission archictectures that maximize performance at an attainable cost. 

Ultimately, this scalable framework can support the ASPEN tool by autonomously evaluating the refresh or revisit time and data latency of a wide range of architectures. Given that tatc is an open-source tool, the framework built in this study can be built easily from any computer.