# Multi-Node Rendering with Dask-MPI and Dask.Dataframe
Similar to the multi-node Dask.Array example, we take the work we did to get a dataframe rendered on a single node and distribute it via Dask. We run the scheduler in the same way as that example--one `mpiexec` call to start the scheduler process, and a second to run all of the workers and connect to it. The `runCluster.sh` shows the full commands in detail. We set an option when invoking the scheduler to create a scheduler file in the local directory, `scheduler.json`, which contains the details we need to connect to the scheduler process from the client.

In [None]:
from dask.distributed import Client, Sub, Pub, fire_and_forget, wait
client = Client(scheduler_file='scheduler.json', set_as_default=True) #Connect to the Dask scheduler

N = len(client.scheduler_info()['workers']) #Get the number of workers in our cluster
print("Connected to cluster with", N, "workers")

Dask.Dataframe lets us read from the source CSV file directly. We call `repartition` and `persist` to load the data into `N` chunks before rebalancing across the cluster. For convenience, we also store the names of the variables that we intend to use as 3D coordinates.

In [None]:
# Read and repartition the data
import dask.dataframe as dd
df = dd.read_csv("/data/Gaia/ds.eq.csv").repartition(npartitions=N).persist()
wait(df)
client.rebalance()

# We also define the names of the columns we want to use for (x,y,z) position
posCols = ['x[pc]', 'y[pc]', 'z[pc]']

Once the data's loaded and you've done whatever other processing you needed to, we can fire up our `PVRenderActor`s and do some rendering. Each `PVRenderActor` has it's own internal state, which lets us initialize the rendering context once rather than starting from scratch for every frame. We use `client.map` with an input of `N` integers to execute one `PVRenderActor` on every worker. Since Dask is lazy, `client.map` just returns a set of futures. `client.gather` waits on those futures, and returns a list of objects representing the Actor on each node.

In [None]:
from ipyparaview import PVRenderActor
renderers = client.gather(client.map(PVRenderActor, range(N), actor=True))

`PVRenderActor` is generic; it only sets up common state, so we still need to set up the state as we desire it for our specific case. This is mostly the same as with the single-node Dataframe example, but one initial step of fetching one partition of the dataframe.

In [None]:
# Define a function for remote execution that will set up the ParaView state
def workerState(self, df, posCols):
    # get the dataframe portion for this rank and create a vtkTable from it
    pdf = df.get_partition(self.rank).compute()
    import PVGeo
    vtkt = PVGeo.interface.data_frame_to_table(pdf)
    tbl = self.pvs.TrivialProducer()
    tbl.GetClientSideObject().SetOutput(vtkt)
    tbl.UpdatePipeline()
    print("Rank", self.rank, "has", pdf.shape[0], "stars")
    
    # wrap the vtkTable in a producer, then apply a TableToPoints filter
    stars = self.pvs.TrivialProducer()
    stars.GetClientSideObject().SetOutput(vtkt)
    stars.UpdatePipeline()
    ttp = self.pvs.TableToPoints(Input=stars)
    ttp.XColumn = posCols[0]
    ttp.YColumn = posCols[1]
    ttp.ZColumn = posCols[2]
    
    # Create a new 'Render View'
    self.renV.ViewSize = [800, 500]
    self.renV.CenterOfRotation = [-42.81136055702743, 20.561263821238143, -8.375310213352522]
    self.renV.CameraFocalPoint = self.renV.CenterOfRotation
    self.renV.CameraPosition = [-19.004408839051408, 78.59672562652541, -351.72329889073706]
    self.renV.CameraViewUp = [0,1,0]
    self.renV.Background = [0.32, 0.34, 0.43]

    # create the color lookup table that we'll use to set star color
    plxmasLUT = self.pvs.GetColorTransferFunction('plxmas')
    plxmasLUT.RGBPoints = [5.800048770935931, 0.0, 0.0, 0.0, 26.651891986128916, 0.901960784314, 0.0, 0.0, 47.5037352013219, 0.901960784314, 0.901960784314, 0.0, 57.92965680891839, 1.0, 1.0, 1.0]
    plxmasLUT.ColorSpace = 'RGB'
    plxmasLUT.NanColor = [0.0, 0.498039215686, 1.0]
    plxmasLUT.ScalarRangeInitialized = 1.0

    # create a Glyph filter to create a sphere for each star
    glyph = self.pvs.Glyph(Input=ttp, GlyphType='Sphere')
    glyph.ScaleFactor = 29.995732905481237
    glyph.GlyphTransform = 'Transform2'
    glyph.GlyphMode = 'All Points'
    glyph.GlyphType.Radius = 0.025
    glyph.GlyphType.ThetaResolution = 180
    glyph.GlyphType.PhiResolution = 90
    
    # notice that we're saving glyphDisplay as Actor state so we can modify it later
    self.glyphDisplay = self.pvs.Show(glyph, self.renV)
    self.glyphDisplay.Representation = 'Surface'
    self.glyphDisplay.ColorArrayName = ['POINTS', 'plx[mas]']
    self.glyphDisplay.LookupTable = plxmasLUT

    # finally, show the color legend
    plxmasLUTColorBar = self.pvs.GetScalarBar(plxmasLUT, self.renV)
    plxmasLUTColorBar.Title = 'plx[mas]'
    plxmasLUTColorBar.Visibility = 1

# Submit the setup function for execution on Dask workers
wait([r.run(workerState, [df, posCols]) for r in renderers])

And that's it! We can create a `PVDisplay` widget and display it.

In [None]:
from ipyparaview.widgets import PVDisplay
w = PVDisplay(renderers)
w

If we want to modify any state, such as the variable used to color the output, we have to modify that state on the workers. Instead of modifying a local state variable directly like in single-node case, the callback function that we pass to `interact` passes a function to execute remotely. It's a couple extra lines of boilerplate, but otherwise works the same.

In [None]:
varlist = [n for n in list(df.columns) if n not in posCols]

def setVar(v):
    def workerSetVar(self, v):
        self.pvs.ColorBy(self.glyphDisplay, ('POINTS', v))
        self.glyphDisplay.RescaleTransferFunctionToDataRange(True, False)
        vLUT = self.pvs.GetColorTransferFunction(v)
        vLUT.ApplyPreset('Black-Body Radiation', True)
    wait([r.run(workerSetVar, [v]) for r in renderers])
    w.render()

from ipywidgets import interact
interact(setVar, v=varlist)