# Prototyping a Dataflow Implementation of the CARTA System

### Ingesting and storing our image data

In [1]:
from src.backend.image import Image

i = Image('/ramdisk/h_m51_b_s05_drz_sci.fits')

In [None]:
%time i.data

In [None]:
%time i.data.compute()

In [None]:
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = (20,10)

plt.imshow(i.data, vmin=0, vmax=1, cmap='inferno')

### Computing with Dask's default scheduler

In [None]:
gauss = Image('/ramdisk/image-20000-20000.fits')

In [None]:
%time gauss.data

In [None]:
import dask
import dask.array as da

gauss_range = (gauss.data.min(), gauss.data.max())
histo, bins = da.histogram(gauss.data, bins=1000, range=gauss_range)

In [None]:
%time histo

In [None]:
histo.visualize()

In [None]:
%time histo_values = histo.compute()

In [None]:
plt.bar(bins[:1000].compute(), histo_values)
plt.show()

### Computing with dask.distributed

In [None]:
from dask.distributed import Client, SSHCluster

cluster = SSHCluster(
                ['localhost',       # scheduler
                 '192.168.80.12',   # worker 0
                 '192.168.80.13',   # worker 1
                 '192.168.80.14'])  # worker 2

client = Client(cluster)
client

In [None]:
gauss.data = client.persist(gauss.data)

In [None]:
%time histo_values = histo.compute()

In [None]:
plt.bar(bins[:1000].compute(), histo_values)
plt.show()

### Interacting with Dask and CARTA

In [None]:
import logging
import nest_asyncio
import time
from threading import Thread

logging.getLogger().setLevel(logging.INFO)
nest_asyncio.apply()

In [None]:
from src.frontend.client import Client

carta_client = Client("CartaClient", 'localhost', 3002, is_carta_client=True)

In [None]:
carta_client.register_viewer()

In [None]:
from src.backend.server import Server

serverThread = Thread(target=Server, args=('localhost', 3003), daemon=True)
serverThread.start()

In [None]:
dask_client = Client("DaskClient", 'localhost', 3003, is_carta_client=False)

In [None]:
clients = [dask_client, carta_client]
stats = []

for client in clients:
    client.open_file('image-20000-20000.fits', '/ramdisk/')
    %time stats.append(client.get_region_statistics())
    time.sleep(1)

In [None]:
from src.test.unit_tests import is_close

is_close(stats[0], stats[1])