# RA, Dec with Dask+Holoviews for DC2 Run 2.2i DR6 Object Table
### Michael Wood-Vasey (@wmwv)
### Last Verified to Run: 2021-05-28 by MWV

Inspect the Run 2.2i DR6 RA, Dec distribution 
Demonstrate how to use Dask and Holoviews to visualize the distributions.

#### Run 2.2i DR6d as of 2020-08-12 includes  
  * 166 tracts

Logistics:

1. These tests were conducted on NERSC through the https://jupyter.nersc.gov interface.  
Note: To enable re-rastering when zooming, use the JupyterLab Classic interface.
You can launch this from an active JupyterHub Notebook by selecting "Help->Launch Classic Notebook".
  * Assuming that you are currently reading this Notebook in JupyterHub and have an active kernel.
  * You can select the "Running" tab and then select the Notebook you want.
  * You could instead browse through the full filesystem path under the "Files" tab to find your Notebook, but that's a lot more clicking.  You may want to take this aproach to launch some other Notebook that's not currently running under JupyterHub.

2. Requires:
```
healpy
holoviews
datashader
bokeh
pyarrow >= 0.13.1
```

Up-to-date versions of each of these are available in `desc-python-bleed` kernel

3. This was run using the `desc-python-bleed` kernel

We directly use the DPDD Parquet files.

## Import Needed Modules

In [None]:
import math
import os

import numpy as np
import pandas as pd

import astropy.units as u
import healpy as hp

import scipy.interpolate

In [None]:
import colorcet

import dask
from dask.distributed import Client

from bokeh.models import HoverTool
import dask.array as da
import dask.dataframe as dd
import datashader as ds
import holoviews as hv
from holoviews.operation import histogram
from holoviews.operation.datashader import datashade, shade, dynspread, rasterize
from holoviews.plotting.util import process_cmap
from holoviews.streams import RangeXY

In [None]:
hv.extension('bokeh')

In [None]:
cmap = 'viridis'

## Start our Dask Cluster


We're only going to load the RA, Dec, so we don't need that much memory for the final product.
There's a 42 GB limit on memory directly in the JupyterHub environment, which we will suceed in staying under.


### Start a local Dask Cluster

In [None]:
client = Client()

In [None]:
client

## Load Data

In [None]:
# This is the central root for all data
# You can look this up in GCRCatalogs.site_config.site_rootdir.yaml for NERSC and IN2P3
# As of 2021-05-28 on NERSC this is:
desc_data_dir = "/global/cfs/cdirs/lsst/shared"

# If you have a local copy of the data you can set your own base directory here.
# You will need a set of the DPDD Object Table parquet files (~112 GB) for the main par
# And then the star truth table (1.3 GB) for the final part at the end.
# desc_data_dir = ""

In [None]:
data_release = "dr6"

run_data_dir = f"DC2-prod/Run2.2i/dpdd/Run2.2i-{data_release}/object_dpdd_only"
data_path = os.path.join(desc_data_dir, run_data_dir)

In [None]:
print(data_path)

In [None]:
columns = ['ra', 'dec']

In [None]:
ddf = dd.read_parquet(data_path, columns=columns, engine='pyarrow', kwargs={'dataset': {'use_legacy_dataset': False}})

## Object Density in RA, Dec

DC2 Run 2.x WFD and DDF regions
https://docs.google.com/document/d/18nNVImxGioQ3tcLFMRr67G_jpOzCIOdar9bjqChueQg/view
https://github.com/LSSTDESC/DC2_visitList/blob/master/DC2visitGen/notebooks/DC2_Run2_regionCoords_WFD.ipynb

| Location          | RA (degrees) | Dec (degrees) | RA (degrees) | Dec (degrees) |
|:----------------- |:------------ |:------------- |:------------ |:------------- |
| Region            | WFD          | WFD           | DDF          | DDF           |
| Center            | 61.856114    | -35.79        | 53.125       | -28.100       |
| North-East Corner | 71.462228    | -27.25        | 53.764       | -27.533       |
| North-West Corner | 52.250000    | -27.25        | 52.486       | -27.533       |
| South-West Corner | 49.917517    | -44.33        | 52.479       | -28.667       |
| South-East Corner | 73.794710    | -44.33        | 53.771       | -28.667       |

(Note that the order of the rows above is different than in the DC2 papers.  The order of the rows above goes around the perimeter in order.)

In [None]:
dc2_run2x_wfd = [[71.462228, -27.25], [52.250000, -27.25], [49.917517, -44.33], [73.794710, -44.33]]
dc2_run2x_ddf = [[53.764, -27.533], [52.486, -27.533], [52.479, -28.667], [53.771, -28.667]]

In [None]:
dc2_run2x_wfd_df = pd.DataFrame({'ra': [coord[0] for coord in dc2_run2x_wfd] + [dc2_run2x_wfd[0][0]],
                                 'dec': [coord[1] for coord in dc2_run2x_wfd] + [dc2_run2x_wfd[0][1]]})
dc2_run2x_ddf_df = pd.DataFrame({'ra': [coord[0] for coord in dc2_run2x_ddf] + [dc2_run2x_ddf[0][0]],
                                 'dec': [coord[1] for coord in dc2_run2x_ddf] + [dc2_run2x_ddf[0][1]]})

In [None]:
def overlay_dc2_region(ra_dec, dc2_run2x_wfd_df=dc2_run2x_wfd_df, dc2_run2x_ddf_df=dc2_run2x_ddf_df):
    # This region isn't quite a polygon.  The sides should be curved.
    wfd_region = hv.Path(dc2_run2x_wfd_df).opts(color='red')
    ddf_region = hv.Path(dc2_run2x_ddf_df).opts(color='orange')
    ra_dec = ra_dec * wfd_region * ddf_region

    max_delta_ra = dc2_run2x_wfd_df['ra'][3] - dc2_run2x_wfd_df['ra'][2]
    delta_dec = dc2_run2x_wfd_df['dec'][1] - dc2_run2x_wfd_df['dec'][3]
    grow_buffer = 0.05

    # Notice that these are specified in increasing RA left->right
    # We rely on the invert_xaxis True above to flip this in the display
    # It's important to get this right because these ranges are used for data selection
    # and then the range is flipped in the display.
    ra_dec.opts(xlim=(dc2_run2x_wfd_df['ra'][2] - max_delta_ra * grow_buffer,
                dc2_run2x_wfd_df['ra'][3] + max_delta_ra * grow_buffer))
    ra_dec.opts(ylim=(dc2_run2x_wfd_df['dec'][3] - delta_dec * grow_buffer,
                dc2_run2x_wfd_df['dec'][1] + delta_dec * grow_buffer))

    return ra_dec

In [None]:
def overlay_dc2_region_ddf(ra_dec, dc2_run2x_wfd_df=dc2_run2x_wfd_df, dc2_run2x_ddf_df=dc2_run2x_ddf_df):
    # This region isn't quite a polygon.  The sides should be curved.
    ddf_region = hv.Path(dc2_run2x_ddf_df).opts(color='orange')
    ra_dec = ra_dec * ddf_region

    max_delta_ra = dc2_run2x_wfd_df['ra'][3] - dc2_run2x_wfd_df['ra'][2]
    delta_dec = dc2_run2x_wfd_df['dec'][1] - dc2_run2x_wfd_df['dec'][3]
    grow_buffer = 0.05

    # Notice that these are specified in increasing RA left->right
    # We rely on the invert_xaxis True above to flip this in the display
    # It's important to get this right because these ranges are used for data selection
    # and then the range is flipped in the display.
    ra_dec.opts(xlim=(dc2_run2x_wfd_df['ra'][2] - max_delta_ra * grow_buffer,
                dc2_run2x_wfd_df['ra'][3] + max_delta_ra * grow_buffer))
    ra_dec.opts(ylim=(dc2_run2x_wfd_df['dec'][3] - delta_dec * grow_buffer,
                dc2_run2x_wfd_df['dec'][1] + delta_dec * grow_buffer))

    return ra_dec

In [None]:
def plot_ra_dec(df, dc2_run2x_wfd_df=dc2_run2x_wfd_df, dc2_run2x_ddf_df=dc2_run2x_ddf_df,
                show_dc2_region=True, cmap="bmy", bins=100, cmin=10):
    """Show rasterized RA, Dec object density.
    
    We're just doing this on a rectilinear grid
    The distortion is noticeable from the lowest to highest Dec in the change in density due to the change in area."""
    points_ra_dec = hv.Points(df, kdims=[hv.Dimension('ra', soft_range=(dc2_run2x_wfd[2][0], dc2_run2x_wfd[3][0])),
                                         hv.Dimension('dec', soft_range=(dc2_run2x_wfd[3][1], dc2_run2x_wfd[1][1]))])
    # We have to define the colormap here now, because the opts aren't passed through the datashade->Points.
    # See, e.g., https://github.com/holoviz/holoviews/issues/4125
    ra_dec = datashade(points_ra_dec, cmap=process_cmap(cmap, provider="colorcet"), precompute=True)
    ra_dec = ra_dec.opts(invert_xaxis=True)  # Flip to East left
#    ra_dec = ra_dec.opts(precompute=True)
    if show_dc2_region:
        ra_dec = overlay_dc2_region(ra_dec, dc2_run2x_wfd_df=dc2_run2x_wfd_df, dc2_run2x_ddf_df=dc2_run2x_ddf_df)
   
    return ra_dec

In [None]:
ra_dec = plot_ra_dec(ddf)

In [None]:
ra_dec.opts(width=800, height=700)

For exaples of specifying hover-over tools in Bokeh, see:

https://holoviews.org/user_guide/Plotting_with_Bokeh.html

https://docs.bokeh.org/en/latest/docs/user_guide/tools.html

https://holoviz.org/tutorial/Large_Data.html

In [None]:
(73.79471 + 49.917517)/2, (-44.33 + -27.25)/2

In [None]:
dc2_run2x_wfd

Fake up the axis labels to approximate the RA, Dec values.  Would be nice to put on the curved lines eventually.

In [None]:
# hv.save(foo, 'DC2_Run2.2i_DR6c_ra_dec.png', fmt='png')

The overall object density distribution looks good.

Notes:
* If you are viewing this through a direct JupyterLab connection (Jupyter Classic Notebook, or separately on your own machine or setup), the plot will re-raster as you zoom in and out.  This functionality is not available within the JupyterHub environment.  JupyterHub doesn't allow the JavaScript callbacks in the browser back to the server that are necessary to do the re-rastering.
* We explicitly excluded the tracts that overlap the DDF region (orange square upper-right corner).
* There are also a few patches that failed within the main region.
* The saved files are significant cropped.  I don't understand what's going on.

See the input visit coverage map here:  
https://github.com/LSSTDESC/ImageProcessingPipelines/issues/97#issuecomment-498303504


In [None]:
dc2_run2x_wfd_center = [(dc2_run2x_wfd_df['ra'][0] + dc2_run2x_wfd_df['ra'][1])/2,
                        (dc2_run2x_wfd_df['dec'][0] + dc2_run2x_wfd_df['dec'][2])/2]                         
mollweide = hp.projector.MollweideProj(rot=(dc2_run2x_wfd_center[0], dc2_run2x_wfd_center[1]))

In [None]:
def make_linear_interpolator(mollweide):
    X, Y = mollweide._MollweideProj__molldata
    return scipy.interpolate.interp1d(X, Y, bounds_error=False, fill_value=(Y[0], Y[-1]))

In [None]:
lininterp = make_linear_interpolator(mollweide)
rotmat = mollweide.rotator._matrix

In [None]:
# dir2vec from healpy.rotator
# Rewritten to work for Dask
def dir2vec(theta, phi):
    lon, lat = theta, phi
    theta, phi = np.pi / 2 - da.radians(lat), da.radians(lon)
    ct, st, cp, sp = da.cos(theta), da.sin(theta), da.cos(phi), da.sin(phi)
    vx, vy, vz = st * cp, st * sp, ct
    return vx, vy, vz

def vec2dir(vx, vy, vz):
    r = da.sqrt(vx ** 2 + vy ** 2 + vz ** 2)
    theta = da.arccos(vz / r)
    phi = da.arctan2(vy, vx)
    
    return theta, phi

def vec2xy(vx, vy, vz, mollweide):
    rotmat = mollweide.rotator._matrix
    
    # MWV: I think we have to trigger this computing the lengths to get the partitions aligned
    # because while the Dask Series contain the number of partitions,
    #   it does not have the size of the partitions.
    coords = da.stack([vx.to_dask_array(lengths=True),
                       vy.to_dask_array(lengths=True),
                       vz.to_dask_array(lengths=True)])
    vxp, vyp, vzp = da.tensordot(rotmat, coords, axes=(1, 0))

    theta, phi = vec2dir(vxp, vyp, vzp)
        
    phi = (phi + np.pi) % (2 * np.pi) - np.pi
    lat = (np.pi / 2) - theta
    
    phi = phi.to_dask_dataframe(index=vx.index)
    lat = lat.to_dask_dataframe(index=vx.index)

    # Wrap the result of the SciPy interpolation function as a Dask Array
    A = dd.map_partitions(lininterp, lat, meta=('A', 'float64'))
    
    flip = mollweide._flip

    x = flip * (2 / np.pi) * phi * da.cos(A)
    y = da.sin(A)

    return x, y
    
def moll_ang2xy(theta, phi, mollweide):
    vx, vy, vz = dir2vec(theta, phi)
    return vec2xy(vx, vy, vz, mollweide)

In [None]:
def fill_in_sides_from_corners(x, y, n=100):
    edges_x = []
    edges_y = []
    for start, end in zip(x[:-1], x[1:]):
        edges_x.extend(np.linspace(start, end, n))
    for start, end in zip(y[:-1], y[1:]):
        edges_y.extend(np.linspace(start, end, n))
        
    return edges_x, edges_y

In [None]:
def get_overlay(df, mollweide, color='red', **kwargs):
    df['ra'], df['dec']
    edges_ra, edges_dec = fill_in_sides_from_corners(df['ra'], df['dec'], **kwargs)
    edges_x, edges_y = mollweide.ang2xy(edges_ra, edges_dec, lonlat=True)
    
    return hv.Path((edges_x, edges_y))

In [None]:
wfd_outline = get_overlay(dc2_run2x_wfd_df, mollweide).opts(color='red')
ddf_outline = get_overlay(dc2_run2x_ddf_df, mollweide).opts(color='orange')

In [None]:
def plot_ra_dec_mollweide(df, dc2_run2x_wfd_df=dc2_run2x_wfd_df, dc2_run2x_ddf_df=dc2_run2x_ddf_df,
                          show_dc2_region=True, cmap="bmy", cmin=10,
                          also_return_mollweide=False):
    """Use a Mollweide projection to get equal-area densities in the aggregation.
    
    also_return_mollweide: [bool]  Return both the holoviews map and the mollweide projection object as a tuple
    """

    dc2_run2x_wfd_center = [(dc2_run2x_wfd_df['ra'][0] + dc2_run2x_wfd_df['ra'][1])/2,
                            (dc2_run2x_wfd_df['dec'][0] + dc2_run2x_wfd_df['dec'][2])/2]                         
    mollweide = hp.projector.MollweideProj(rot=(dc2_run2x_wfd_center[0], dc2_run2x_wfd_center[1]))
    x, y = moll_ang2xy(df['ra'], df['dec'], mollweide=mollweide)
    ddf= df.assign(x=x, y=y)
    
    points_ra_dec = hv.Points(ddf, ['x', 'y'])

    # We have to define the colormap here now, because the opts aren't passed through the datashade->Points.
    # See, e.g., https://github.com/holoviz/holoviews/issues/4125
#    ra_dec = datashade(points_ra_dec, cmap=process_cmap(cmap, provider="colorcet"))
    ra_dec = rasterize(points_ra_dec, width=1080, height=1080)
    
    if also_return_mollweide:
        return ra_dec, mollweide
    else:
        return ra_dec

In [None]:
# ra_dec_moll = plot_ra_dec_mollweide(good, show_dc2_region=False)
ra_dec_moll, mollweide = plot_ra_dec_mollweide(ddf, also_return_mollweide=True)

In [None]:
def generate_ra_dec_tick_labels(mollweide, n_ra=14, n_dec=10, ra_range=(74, 48), dec_range=(-45, -27)):
    major_ticks_ra = np.linspace(ra_range[0], ra_range[1], n_ra)
    major_ticks_dec = np.linspace(dec_range[0], dec_range[1], n_dec)
    # If you set the Dec to the be the rotation center you get Delta x steps that are constant
    left_ra = np.zeros(n_dec) + dc2_run2x_wfd_df['ra'][0]  # It doesn't matter what this is, because it doesn't affect Dec.
    bottom_dec = np.zeros(n_ra) + dc2_run2x_wfd_df['dec'][2]
    major_ticks_x, _ = mollweide.ang2xy(major_ticks_ra, bottom_dec, lonlat=True)
    # RA doesn't matter for Dec
    _, major_ticks_y = mollweide.ang2xy(left_ra, major_ticks_dec, lonlat=True)

    major_ticks_and_labels_x = [(x, f"{ra:0.0f}") for x, ra in zip(major_ticks_x, major_ticks_ra)]
    major_ticks_and_labels_y = [(y, f"{dec:0.0f}") for y, dec in zip(major_ticks_y, major_ticks_dec)]
    
    return major_ticks_and_labels_x, major_ticks_and_labels_y


def decorate_ra_dec_plot(ra_dec_plot, mollweide=mollweide):
    major_ticks_and_labels_x, major_ticks_and_labels_y = generate_ra_dec_tick_labels(mollweide)
    ra_dec_plot = ra_dec_plot.opts(xlabel='RA', ylabel='Dec', xticks=major_ticks_and_labels_x, yticks=major_ticks_and_labels_y)
    ra_dec_plot = ra_dec_plot.opts(hv.opts.Image(colorbar=True, clim=(10, None), clipping_colors={'min': 'gray'},
                                   cmap=process_cmap("viridis", provider="matplotlib")))
    ra_dec_plot = ra_dec_plot.opts(width=480, height=400)

    return ra_dec_plot

In [None]:
ra_dec_moll = decorate_ra_dec_plot(ra_dec_moll, mollweide)

In [None]:
dc2_ra_dec_coverage = ra_dec_moll * wfd_outline * ddf_outline

In [None]:
dc2_ra_dec_coverage

In [None]:
hv.save(dc2_ra_dec_coverage, "dc2_ra_dec_coverage.html", backend="bokeh")

Cool!  We did the projection, it ran across our workers and we can even zoom in/and out and it will dynamically rebin.

But watch what happens with the Dask Dashboard when you zoom in and out.

That's right, `read_parquet` is called across 166 partition.  It's re-reading the data each time!  Now, this is all in various caches and isn't as painful as one might fear, but still, this seems very much not what we wanted.

The key is that Dask doesn't know that you're not done.  It assumed that when you showed the plot (or slightly more specifically, once `rasterize` calculated the aggregate values) you were done and didn't need the data any more so it just dropped it.  Zoom and around triggers a re-read.

So next let's persist the data in memory to avoid the re-reading.

In [None]:
ddf = ddf.persist()

You should see a blocks of `read_parquet` in the Dask Dashboard.

In [None]:
new_ra_dec_moll, molleweide = plot_ra_dec_mollweide(ddf, also_return_mollweide=True)

In [None]:
new_ra_dec_moll = decorate_ra_dec_plot(new_ra_dec_moll, mollweide)

In [None]:
new_ra_dec_moll

Where here we've stripped out some of the formatting and tick label rewriting for simplicity.

Huh.  Zooming in and out is not particularly faster.  If we look at the Dask processing, we see that's because there's still a lot of computation in the geometric projection.

Let's see if pulling out and persisting those columns makes things faster.

In [None]:
x, y = moll_ang2xy(ddf['ra'], ddf['dec'], mollweide=mollweide)
ddf = ddf.assign(x=x, y=y)

In [None]:
ddf = ddf.persist()

In [None]:
ddf

In [None]:
ddf.dtypes

In [None]:
points_ra_dec_moll = hv.Points(ddf, ['x', 'y'])
persisted_ra_dec_moll = rasterize(points_ra_dec_moll, width=1080, height=1080)

In [None]:
persisted_ra_dec_moll = decorate_ra_dec_plot(persisted_ra_dec_moll, mollweide)

In [None]:
persisted_ra_dec_moll

And now the Zooming is much faster because everything is in memory.  There's no activity in the Dask Dashboard because there are no calculations being done through Dask.  The only really notable thing is that with an extra two columns ('x', 'y') we now have 16 GB of memory used across the workers instead of 8 GB for ('ra', 'dec')

Wait, as long as we're just persisting all of the data from the 'ra', 'dec' columns and then the 'x', 'y' projected columns to memory, do we even need to use Dask?

YES.  The projection calculation is compute-intensive enough that Dask does a better job of using the available CPUs to do the calculation.  The simple Pandas calculation is `numpy`, which only uses at most a few threads effectively in `numpy`'s calls to the the backend OpenBLAS/Intel MKL/ATLAS that is actually doing the calculation.  Whereas Dask can distribute separate Python processes across the workers, each of which could use the same level of multi-threading as the single process.

And, yes, the calculation itself that we're doing can probably be better optimized to allow numpy to figure out what are the array operations.  But the more general point is that it's easy to have functions that work fine on 1-4 threads.  Diving in to the full details of what in `numpy` is parallelized across how many threads threads to what level in different backends is a rich explorational experience.  But the overhead to use multiple processes to do this in parallel is generally small; Dask is one easy way to build and schedule the work graph to do this work.

In [None]:
%%timeit
x, y = moll_ang2xy(ddf['ra'], ddf['dec'], mollweide=mollweide)
print(x.mean(), y.mean())  # Trigger the computation

If we do this all just in straight Pandas we would do:

In [None]:
%%timeit
df = pd.read_parquet(data_path, columns=columns, engine='pyarrow')

The read is fast enough because `pyarrow` automatically uses the available CPUs to do parallel reads.  It looks up how many available cores it has with `pyarrow.cpu_count()` and uses all of them.

However, if either `OMP_NUM_THREADS` or `OMP_THREADS_LIMIT` is set, it will use that value.  So if you are already set up to be in a multi-processing setup, you might find that these have been already set to as low as 1 and this `pandas.read_parquet` may in fact be somewhat slower.

In [None]:
import pyarrow
pyarrow.cpu_count()

And now we have to actually run the command.  The %%timeit magic prevents any of the directly set variables to outside its context.

In [None]:
df = pd.read_parquet(data_path, columns=columns, engine='pyarrow')

Let's try the projection calculation:

In [None]:
%%timeit
df['x'], df['y'] = mollweide.ang2xy(df['ra'], df['dec'], lonlat=True)

Ouch!  I got 48 seconds on my 3GHz 8-core Xeon E5 desktop.

Well, maybe that's just because the HealPy projection function isn't optimized well for such large arrays.  Well, this both seems somewhat unlikely as that's essentially a key HealPy usecase, but also we already know that that's not obviously true because we've just read that code above.  The Dask-aware transformation code above is essentially the code taken from HealPy and it's all reasonable good array-based numpy.  In particularly, `numpy.tensordot` is _really_ optimized in most underlying OpenBLAS/MKL implementations.

Now trig functions can be expensive, and there are some more compact ways of writing some of the operations that would reduce the number of trig function calls, but there's nothing really particularly obviously wrong with the original HealPy code.

But still, we should compare apple to apples and write out the transformation functions just like we did above for the Dask-aware case.

In [None]:
# dir2vec from healpy.rotator
# Rewritten to work for Dask
def pd_dir2vec(theta, phi):
    lon, lat = theta, phi
    theta, phi = np.pi / 2 - np.radians(lat), np.radians(lon)
    ct, st, cp, sp = np.cos(theta), np.sin(theta), np.cos(phi), np.sin(phi)
    vx, vy, vz = st * cp, st * sp, ct
    return vx, vy, vz

def pd_vec2dir(vx, vy, vz):
    r = np.sqrt(vx ** 2 + vy ** 2 + vz ** 2)
    theta = np.arccos(vz / r)
    phi = np.arctan2(vy, vx)
    
    return theta, phi

def pd_vec2xy(vx, vy, vz, mollweide):
    rotmat = mollweide.rotator._matrix
    
    # MWV: I think we have to trigger this computing the lengths to get the partitions aligned
    # because while the Dask Series contain the number of partitions,
    #   it does not have the size of the partitions.
    coords = [vx, vy, vz]
    vxp, vyp, vzp = np.tensordot(rotmat, coords, axes=(1, 0))

    theta, phi = pd_vec2dir(vxp, vyp, vzp)
        
    phi = (phi + np.pi) % (2 * np.pi) - np.pi
    lat = (np.pi / 2) - theta
    
    # Wrap the result of the SciPy interpolation function as a Dask Array
    A = lininterp(lat)
    
    flip = mollweide._flip

    x = flip * (2 / np.pi) * phi * np.cos(A)
    y = np.sin(A)

    return x, y
    
def pd_moll_ang2xy(theta, phi, mollweide):
    vx, vy, vz = pd_dir2vec(theta, phi)
    return pd_vec2xy(vx, vy, vz, mollweide)

In [None]:
%%timeit
df['x'], df['y'] = pd_moll_ang2xy(df['ra'], df['dec'], mollweide=mollweide)

So, 36 seconds is a little better, but this is still _much_ slower than the 2.5 seconds when using all of the available cores with Dask.

And it really is this calculation that is the dominant time.  The HoloViews and datashader `Points` and `rasterize` are very fast because they don't actually do the aggregation/binning calculation yet.

In [None]:
%%timeit
pd_points_ra_dec_moll = hv.Points(df, ['x', 'y'])
pd_ra_dec_moll = rasterize(pd_points_ra_dec_moll, width=1080, height=1080)

In [None]:
pd_points_ra_dec_moll = hv.Points(df, ['x', 'y'])
pd_ra_dec_moll = rasterize(pd_points_ra_dec_moll, width=1080, height=1080)
pd_ra_dec_moll = decorate_ra_dec_plot(pd_ra_dec_moll, mollweide)

And then the actual aggregation happens when we display or zoom the plot and only takes a few seconds.

In [None]:
pd_ra_dec_moll

Dask is somewhat obviously absolute necessary when you're dealing with a dataset that doesn't fit in memory.

But it's also a really useful way of easily constructing and executing work across multiple cores, even if we're just on one machine.

## Calculate the Area Covered

In [None]:
def calculate_area(df, threshold=0.25, nside=1024, verbose=False):
    """Calculate the area covered by a catalog with 'ra', 'dec'
    
    Parameters:
    --
    cat: DataFrame, dict-like with 'ra', 'dec', keys
    threshold:  float
        Fraction of median value required to count a pixel.
    nside:  int
        Healpix NSIDE.  NSIDE=1024 is ~12 sq arcmin/pixel, NSIDE=4096 is 0.74 sq. arcmin/pixel
        Increasing nside will decrease calculated area as holes become better resolved 
        and relative Poisson fluctuations in number counts become more significant.
    verbose:  bool
        Print details on nside, number of significant pixels, and area/pixel.
        
    Returns:
    --
    area:  Astropy Quantity.
    """
    import healpy as hp

    # MWV: The following line of code makes me sad, but 
    # We need to make a matching DataFrame for nside to satisfy conservative
    # Pandas 1.2.4 requirement that all ufuncs have arguments of the same type.
    # `ang2pix` takes `nside`, `ra`, `dec` and so each of those need to be of the same type.
    # That means we need to take our simple int nside and convert it to a Dask Series.
    # We explicitly base it off the df['ra'] so that the partitions are automatically propagated
    # And then set the value with 'nside' and cast to int.
    nside_ds = (nside + 0 * df['ra']).astype(int)

    indices = hp.ang2pix(nside_ds, df['ra'], df['dec'], lonlat=True)
    idx, counts = np.unique(indices, return_counts=True)
    
    # Take the 25% of the median value of the non-zero counts/pixel
    threshold_counts = threshold * np.median(counts)

    if verbose:
        print(f'Median {np.median(counts)} objects/pixel')
        print(f'Only count pixels with more than {threshold_counts} objects')

    significant_pixels, = np.where(counts > threshold_counts)
    area_pixel = hp.nside2pixarea(nside, degrees=True) * u.deg**2

    if verbose:
        print(f'Pixel size ~ {hp.nside2resol(nside, arcmin=True) * u.arcmin:0.2g}')
        print(f'nside: {nside}, area/pixel: {area_pixel:0.4g}, num significant pixels: {len(significant_pixels)}')

    area = len(significant_pixels) * area_pixel

    if verbose:
        print(f'Total area: {area:0.7g}')
    
    return area

This is, in fact, a somewhat expensive calculation.  It takes about 20 seconds.

In [None]:
%%timeit
area_dc2 = calculate_area(ddf)

In [None]:
%%timeit
area_dc2 = calculate_area(df)

There's no real difference between the `ddf` and the `df` because the HealPy `ang2pix` call itself is not Dask aware.  So all of the processing is single-processor in either case.

In some runs I saw worse performance with `ddf`.

In [None]:
area_dc2 = calculate_area(ddf)
print(f'DC2 Run 2.2i area: {area_dc2:0.2f}')

In [None]:
print(f'Average density: {len(df)/area_dc2.to("arcmin**2")}')

## Are the divots above the bright stars?

There are clear pixels in the above map that have a notably lower density.  Are these from bright stars in those regions that are masking out a number of objects?  Not just saturated stars, but really bright ones.  This would be at a smaller resolution than the pixel size of the density map.

Let's check the truth catalog with GCR Catalogs:
`dc2_truth_run2.2i_star_truth_summary`

In [None]:
star_truth_catalog_name = "dc2_truth_run2.2i_star_truth_summary.yaml"

star_truth_dir = os.path.join(desc_data_dir, "DC2-prod", "Run2.2i", "truth", "startruth")
star_truth_file = "star_truth_summary.parquet"
star_truth_filepath = os.path.join(star_truth_dir, star_truth_file)

In [None]:
cat.list_all_quantities()

In [None]:
def convert_nanoJansky_to_mag(flux):
    """Convert calibrated nanoJansky flux to AB mag.
    """
    #pylint: disable=C0103
    AB_mag_zp_wrt_Jansky = 8.90  # Definition of AB
    # 9 is from nano=10**(-9)
    #pylint: disable=C0103
    AB_mag_zp_wrt_nanoJansky = 2.5 * 9 + AB_mag_zp_wrt_Jansky

    return -2.5 * np.log10(flux) + AB_mag_zp_wrt_nanoJansky

Check that the conversion makes sense:

In [None]:
convert_nanoJansky_to_mag(1e6)

The entire Star Truth summary file is only 1.3 GB and we're just pulling out 5 columns.  So we can just do this directly in a Pandas DataFrame.

In [None]:
all_stars = pd.read_parquet(star_truth_filepath,
                            columns=['ra', 'dec', 'flux_g', 'flux_r', 'flux_i'])

In [None]:
for f in 'g', 'r', 'i':
    all_stars[f"mag_{f}"] = convert_nanoJansky_to_mag(all_stars[f"flux_{f}"])
    
all_stars["g-r"] = all_stars["mag_g"] - all_stars["mag_r"]
all_stars["r-i"] = all_stars["mag_r"] - all_stars["mag_i"]

In [None]:
color_color_points = hv.Points(all_stars, kdims=["g-r", "r-i"])
color_color = datashade(color_color_points)

color_mag_points = hv.Points(all_stars, kdims=["g-r", "mag_r"])
color_mag = datashade(color_mag_points)

In [None]:
color_color + color_mag

There are two surprising features of the left plot:
1. The streaks up and to the right.
    These are the finite M-stars models that are then reddened by different amounts of dust, leading to the streaking.  This same reddening affects the other stars as well, but the warmer and hotter stars are distributed along a line parallel to the reddening vector (as an interesting piece of astrophysics that's the subject of a lecture in Astro classes).
2. The curving downward.
    These are various cool white dwarf models, some a bit more theoretical than observed.

In [None]:
# Crude simple selection of stars in area.
# We're being generous instead of doing the geometry precisely

min_ra, max_ra = dc2_run2x_wfd[2][0], dc2_run2x_wfd[3][0]
min_dec, max_dec = dc2_run2x_wfd[2][1], dc2_run2x_wfd[0][1]

stars = all_stars[(min_ra < all_stars["ra"]) & (all_stars["ra"] < max_ra) &
                  (min_dec < all_stars["dec"]) & (all_stars["dec"] < max_dec)]

In [None]:
bright_stars = stars[stars["mag_r"] < 10]

In [None]:
len(bright_stars)

In [None]:
x, y = pd_moll_ang2xy(bright_stars['ra'], bright_stars['dec'], mollweide=mollweide)
bright_stars_ra_dec = hv.Points((x, y))

In [None]:
# We can use the same xticks, yticks <-> RA, Dec mapping as from above.
bright_stars_ra_dec = bright_stars_ra_dec.opts(xlabel='RA', ylabel='Dec',
                                               xticks=major_ticks_and_labels_x, yticks=major_ticks_and_labels_y)

In [None]:
bright_stars_ra_dec.opts(color='red')

And we can again use composition in HoloViews to display these stars on top of our previous RA, Dec coverage map.  

In [None]:
persisted_ra_dec_moll * bright_stars_ra_dec

You can see how the star selection was a bit generous over getting the geometry exactly right.

Navigating around with the Bokeh UI tools, I conclude that the bright stars aren't obviously responsible for the major visible divots at the large scale.

But it was useful to think about how to extract items from the truth catalog and overlay them on the density plot.