# The use-case for V1: Creating a mock LSST catalog with ADAM

In [None]:
import adam
from astropy.coordinates import SkyCoord
from astropy.time import Time
from lsst.opsim import Survey, lsstFoV

The general logic I'd propose is that we have (likely immutable) datasets -- such as catalogs and ephemerides -- and transformations on those datasets -- such as propagation, ephemerides computation, or even upload. Importantly, transformations create _new_ datasets, they don't modify existing ones in place. I.e., the patters is always:

```python
    new_dataset = adam.some_transform(old_dataset, other_args)
```

This pattern (coming from functional languages) makes it easier to reason about and build scalable, robust, distributed workflows.

Consider everything below pseudocode & pardon my typos and outright mistakes.

## Verson 1: Workflow-agnostic variant

This API concept is similar to what we have today; it hides the details of the server-side workflow behind our custom `adam` API.

First: Load the orbit catalog from a local file and upload it to ADAM. `mpcorb` below is an opaque handle to this dataset (e.g., could be an S3 URL internally) to the uploaded catalog. It is this handle that is passed around when working with other ADAM functions (e.g., propagation or ephemerides computation).

In [None]:
orbits = adam.import_orbits_from_file("mpcorb.dat")

Load the LSST survey definition -- this Survey object has member array variables that list the times and directions of all LSST survey pointings (about 3 million over 10 years). Extract the days (in Modified Julian Dates) on which there are any observations.

In [None]:
lsst = Survey.from_txt('../../baseline.csv.gz')
mjd_raw = np.unique(np.floor(lsst.epochs.mjd))
nights = Time(mjd_raw, format='mjd', scale=lsst.epoch.scale) # convert to proper astropy.Time objects

Compute the on-sky locations of asteroids in all the nights when LSST observed (one per night). This is the place for ADAM to shine, as we're asking it to propagate ~10M asteroids to ~3000 nights (which will result in ~30Bn rows of ephemerides).

Note that the input here are `orbits` (a handle to the uploaded dataset), and the output are `ephems_grid` (a handle to the newly generated ephemerides dataset). Nothing gets downloaded to the client at this point.

In [None]:
%%time
ephems_grid = adam.ephemerides(orbits, time=nights, obscode="I11", output_state_vectors=True)

Now we take the the generated ephemerides, and for each night look for objects that are inside of LSST observations done within that night.

We do that by asking adam to run our user-defined function -- `test_in_FoV` -- on chunks of the (large, 30Bn-row) ephemerides dataset -- a [`map` operation](https://en.wikipedia.org/wiki/Map_(higher-order_function)). This function is [cloudpickled](https://github.com/cloudpipe/cloudpickle) over to server side, and executed on chunks which are passed to it as Pandas DataFrames. It returns chunks that (when all concatenated) form the output dataset.

Note: I assume that chunking is arbitrary (i.e., it's not guaranteed that the ADAM runtime will chunk the dataset on per-night (or any other) basis; this could be made more intelligent later (e.g., `groupby` type functionality).

In [None]:
def test_in_FoV(ephem_chunk, lsst, approximate):
    # Given the (ra, dec, time) for a set of asteroids, the function below returns
    # a table of candidate visits that may have observed those asteroids. The table has the
    # following columns: (visit_id, visit_time, asteroid_index)

    c_visits = lsst.is_observed(ephem_chunk['ra'], ephem_chunk['dec'], ephem_chunk['time'], approximate=approximate)

    # now we'll add the asteroid information to each row, so we can use it later
    # see https://pandas.pydata.org/pandas-docs/stable/user_guide/merging.html#set-logic-on-the-other-axes for
    # what the third line does.
    ephem_states = ephem_chunk["desig x y z vx vy vz epoch".split()]
    ast_data = ephem_states.iloc[c_visits['asteroid_index']] # this extracts the state vector -- we'll use it later to compute the exact positions
    c_visits = pd.concat([c_visits, ast_data], axis=1)
    c_visits["time"] = c_visit["visit_time"] # make the "time" column correspond to visit time -- this will be used by adam.ephemerides to compute exact positions

    return c_visits

candidates = adam.map(ephem_grid, lambda chunk: test_in_FoV(chunk, lsst, approximate=True))

Like before, the `candidate_visits` object above is a handle to the server-side dataset. This dataset should now have some ~2Bn rows (we know this from LSST simulations).

These are objects that are _near_ LSST observations, but not guaranted to have been observed. So the next step is to compute their exact positions at individual visit times, and check if they were within the field of view. We do that by calling `adam.ephemerides`, passing it the `candidate_visits` dataset. Each row of this dataset has the state vector for the asteroid (x, y, z, vx, vy, vz, epoch) and the `time` for which the ephemeris is desired. `adam.ephemerides` reads this and returns the ephemeris computed at that time.

In [None]:
ephems_exact = adam.ephemerides(candidates, obscode="I11")

Now filter the catalog by running `adam.map` again, this time with `approximate=False` for `test_in_FoV`.

In [None]:
mock_catalog = adam.map(ephems, lambda chunk: test_in_FoV(chunk, lsst, approximate=False))

Download the generated catalog as a dataframe. This is the first time we're downloading something sizable back. This dataset should be ~1.6Bn rows in size.

In [None]:
df = mock_catalog.toDF()

## Verson 2: If using Spark for a workflow manager

This experiment exposes the underlying workflow manager, and uses it completely to do the parallelization. In this variant, the `adam` Python library are just thin wrappers around integrators plus various utility functions. Below is what the user experience may look like.

If we were using Spark to manage the workflow, the most natural (persistent) dataset storage format becomes parquet. The temporary datasets become Spark's Dataframes.

In [None]:
import adam
from astropy.coordinates import SkyCoord
from astropy.time import Time
from lsst.opsim import Survey, lsstFoV
import lsst.astutils

import databricks.koalas as ks

In [None]:
mpcorb = lsst.astutils.loadMPCORB("mpcorb.dat")
orbits = ks.from_pandas(mpcorb)

In [None]:
lsst = Survey.from_txt('../../baseline.csv.gz')
mjd_raw = np.unique(np.floor(lsst.epochs.mjd))
nights = Time(mjd_raw, format='mjd', scale=lsst.epoch.scale) # convert to proper astropy.Time objects

In [None]:
def compute_mock_catalog(orbits, lsst, nights):
    # this is essentially the same code as before, but the key difference is that these are now
    # computations that are performed _on the machine_ where this function is running.
    # Spark schedules `compute_mock_catalog` to run on a specific node, and then it runs locally
    # w/o calling to remote APIs. E.g., adam.ephemerides would probably directly invoke 
    # pyorb or STK, etc., and the returned value is an actual Pandas dataframe.
    #
    # So it's _very_ different compared to "Version 1" computing model (potentially simpler in
    # some ways, and more complex in others.)
    #

    # approximate ephemerides on a grid, for this subset of orbits
    ephems_grid = adam.ephemerides(orbits, time=nights, obscode="I11", output_state_vectors=True)

    # figure out who's likely to have been observed
    c_visits = lsst.is_observed(ephem_chunk['ra'], ephem_chunk['dec'], ephem_chunk['time'], approximate=True)

    # compute the exact locations, at the time of the visit, of the potentially observed asteroids
    states = ephems_grid["desig x y z vx vy vz epoch".split()].iloc[c_visits["asteroid_index"]]
    ephems_exact = adam.ephemerides(states, time=c_visits["visit_time"], time_per_row=True, obscode="I11")

    # intersect with observations
    mock_catalog = lsst.is_observed(ephems_exact['ra'], ephems_exact['dec'], ephems_exact['time'])

    return mock_catalog

result = (
    orbits
        .groupby(orbits["rowid"] % 10000)            # a hack parallelize over 10000 chunks (there's a better way to do this)
        .apply(compute_mock_catalog, lsst, nights)   # for each chunk, do full computation within your function
        .toDF()                                      # download the resulting DF
)


The above has the benefit that no large intermediate datasets are ever created, as the entire pipeline executes in the same function. The flexibility is significantly greater, as full-blown map-reduce workflows can be implemented. It could run natively on Google's Dataproc. Would an Apache Beam implementation looks similar?

N.b., we could run the above with Cloud Functions as well (given the pleasing parallelism)...