In [1]:
from dask.distributed import Client, SSHCluster
import sys
import dask
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import dask.dataframe as dd
import dask.array as da
import dask.bag as db
import matplotlib.pyplot as plt

from dask import delayed
from time import time

## DASK CLUSTER

## Functions

In [5]:
# ------ ALGORITHM FUNCTIONS
def load(path):
    return np.load(path, allow_pickle=True)["a"]

def R_yz(theta_rot, phi_rot):
    return np.array([[np.cos(phi_rot) * np.cos(theta_rot), -np.sin(phi_rot) * np.cos(theta_rot), np.sin(theta_rot)], 
                     [np.sin(phi_rot), np.cos(phi_rot), 0], 
                     [-np.sin(theta_rot) * np.cos(phi_rot), np.sin(theta_rot) * np.sin(phi_rot), np.cos(theta_rot)]])

def convert_pmt_ids(input_ids, conversion_ids):
    cd_ids  = np.array(conversion_ids["CdID"])
    pmt_ids = np.array(conversion_ids["PMTID"])
    mask    = np.isin(cd_ids, input_ids)
    return pmt_ids[mask]

def find_pmt_coord(pmt_positions, data_pmt_id):
    return pmt_positions[
        np.isin(pmt_positions.PMTID, data_pmt_id)
        ].loc[:, ["x", "y", "z"]].reset_index(drop=True).to_numpy()

# ------ DASK FUNCTIONS
def load_bag(path, Nevents):
    data_np = load(path)
    data_np = data_np[:, :Nevents]
    return [np.vstack([ data_np[j, i] for j in range(3)]) for i in range(data_np.shape[1])]


# ------ DISTRIBUTED ALGORITHM FUNCTIONS
def rotate_ev(data):

    nonzeros_inds = data[2] != 0.0
    data_pmt_id   = convert_pmt_ids(data[0][nonzeros_inds], conversion_ids)
    pmt_coord     = find_pmt_coord(pmt_positions, data_pmt_id)

    tot_charge = sum(data[1][nonzeros_inds])
    x_cc       = sum(pmt_coord[:,0] * data[1][nonzeros_inds]) / tot_charge
    y_cc       = sum(pmt_coord[:,1] * data[1][nonzeros_inds]) / tot_charge
    z_cc       = sum(pmt_coord[:,2] * data[1][nonzeros_inds]) / tot_charge

    theta_cc   = np.arctan2(
        np.sqrt((x_cc)**2+(y_cc)**2), z_cc
    )
    phi_cc     = np.arctan2(y_cc, x_cc) 

    theta_rot = -theta_cc + np.pi/2
    phi_rot   = -phi_cc
    
    # coord_new = np.matmul(R_yz(theta_rot, phi_rot), pmt_coord.T)
    coord_new = np.matmul(
        R_yz(theta_rot, phi_rot), pmt_coord.T
    )

    R = np.sqrt(np.sum(np.power(coord_new, 2), axis=0))

    charge_hitt = np.vstack([data[1], data[2]])
    charge_hitt = charge_hitt[:,nonzeros_inds]

    rotated = np.vstack([coord_new, R, charge_hitt])
    
    del coord_new
    del charge_hitt
    del pmt_coord
    
    return rotated



def mapping_single_event(rotated_ev):
    ####################
    # rotated_ev must be computed
    ####################
    
    N_max = 115
    
    coord_new   = rotated_ev[:3]
    charge_hitt = rotated_ev[4:, ].T
    R           = rotated_ev[3, ].mean()

    z_levels, step = np.linspace(coord_new[2,].min(), coord_new[2,].max(), 124, retstep=True)
    #z_levels       = z_levels.persist()
    image_mat      = np.zeros((230,124,2))

    #masks = 

    for j, z in enumerate(z_levels):
        mask = (np.abs(coord_new[2,] - z) < step)         #(np.abs(pmt_pos.z - z) < delta)
        if(not np.any(mask)): continue
        masked = coord_new[:,mask]


        Rz = (R**2 - z**2)
        Neff = 0 if Rz < 0 else N_max * np.sqrt(Rz) / R
        #ix = np.zeros(np.sum(mask), dtype=np.int32)
        ix = np.around( Neff * (np.arctan2(masked[1,], masked[0,]) / np.pi) + (N_max / 2) ) + 57
        ix = ix.astype(np.int32)
        #ix = ix.compute()
        if(np.any(ix >= 230)):
            ix[ix >= 230] = ix[ix >= 230] - 230

        image_mat[ix, j,] = charge_hitt[mask, ]

                # if np.isnan(mat[ix, i+1]):
                #     mat[ix, i+1] = row.id
                # else:
                #     mat[ix, 123 if i else i] = row.id

    del rotated_ev
    return image_mat

## Load mapping data

In [6]:
data_folder       = "../data/"
pmt_pos_fname     = "PMTPos_CD_LPMT.csv"
pmt_id_conv_fname = "PMT_ID_conversion.csv"
train_data_fname  = "raw_data_train_4.npz"

In [7]:
pmt_positions     = pd.read_csv(data_folder+pmt_pos_fname)
pmt_id_conversion = pd.read_csv(data_folder+pmt_id_conv_fname)
conversion_ids    = pd.read_csv(data_folder+pmt_id_conv_fname)

## Distributed Processing

In [13]:
Nevents = 50

In [14]:
time_0  = time()


start   = time()
data_db = db.from_sequence(load_bag(data_folder + train_data_fname, Nevents=Nevents), npartitions=16)
stop    = time()

load_time = stop-start
print("Load time:\t", load_time)

start   = time()
rotated = db.map(rotate_ev, data_db)
stop    = time()

rotation_time = stop-start
print("Rotation time:\t", rotation_time)

start   = time()
mapped  = db.map(mapping_single_event, rotated)
stop    = time()

projection_time = stop-start
print("Mapping time:\t", projection_time)

start   = time()
images  = mapped.compute()
stop    = time()

compute_time = stop-start
print("Compute time:\t", compute_time)



time_1 = time()
total_time = time_1 - time_0
print("\n\nTotal time:\t", total_time)

Load time:	 3.674474000930786
Rotation time:	 0.013355016708374023
Mapping time:	 0.0846560001373291
Compute time:	 0.4426698684692383


Total time:	 4.215909719467163


## Ideas

Varying the following parameters
*   `n_workers`
*   `n_threads`
*   `n_partitions`

Then
1. study total time 
2. study each function separately through a mid-point `compute()`
3. ...?

## Grid search

In [16]:
cpu_cores  = 8
workers    = [1, 2, 3, 4, 5, 6, 7, 8] 
threads    = [cpu_cores - worker + 1 for worker in workers]
# partitions = []

## Shutdown cluster

In [38]:
client.shutdown()

distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client


In [15]:
client.restart()

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:65458,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: 15 minutes ago,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:49637,Total threads: 2
Dashboard: http://127.0.0.1:49640/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:65468,
Local directory: /Users/niklai/github/distributed-juno/dask/dask-worker-space/worker-g9fif3s8,Local directory: /Users/niklai/github/distributed-juno/dask/dask-worker-space/worker-g9fif3s8

0,1
Comm: tcp://127.0.0.1:49630,Total threads: 2
Dashboard: http://127.0.0.1:49631/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:65467,
Local directory: /Users/niklai/github/distributed-juno/dask/dask-worker-space/worker-62cjrv_m,Local directory: /Users/niklai/github/distributed-juno/dask/dask-worker-space/worker-62cjrv_m

0,1
Comm: tcp://127.0.0.1:49636,Total threads: 2
Dashboard: http://127.0.0.1:49638/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:65465,
Local directory: /Users/niklai/github/distributed-juno/dask/dask-worker-space/worker-oe8oz143,Local directory: /Users/niklai/github/distributed-juno/dask/dask-worker-space/worker-oe8oz143

0,1
Comm: tcp://127.0.0.1:49633,Total threads: 2
Dashboard: http://127.0.0.1:49634/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:65466,
Local directory: /Users/niklai/github/distributed-juno/dask/dask-worker-space/worker-z_lhesq3,Local directory: /Users/niklai/github/distributed-juno/dask/dask-worker-space/worker-z_lhesq3
