In [1]:
from dask.distributed import Client, SSHCluster
import sys
import dask
import pandas as pd
import numpy as np
from tqdm import tqdm
import dask.dataframe as dd
import dask.array as da
import dask.bag as db
import matplotlib.pyplot as plt
from dask import delayed

In [2]:
CLUSTER_TYPE ="local"
%env CLUSTER_TYPE $CLUSTER_TYPE

env: CLUSTER_TYPE=local


In [3]:
%%script bash --bg --out script_out

if [[ "$CLUSTER_TYPE" != "docker_cluster" ]]; then
    echo "Launching scheduler and worker"
    
    HOSTIP=`hostname -I | xargs`
    
    echo "dask-scheduler --host $HOSTIP --dashboard-address $HOSTIP:8787"
    
    # dask scheduler 
    dask-scheduler --host $HOSTIP --dashboard-address $HOSTIP:8787 &

    # dask worker
    dask-worker $HOSTIP:8786 --memory-limit 4GB --nworkers 4 --nthreads 2 &

fi

In [4]:
host_ip = !hostname -I | xargs
host_ip = host_ip[0]

from dask.distributed import Client

if CLUSTER_TYPE == "local":
    
    client = Client()

elif CLUSTER_TYPE == "docker_container":
    
    client = Client("{}:8786".format(host_ip))
    
elif CLUSTER_TYPE == "docker_cluster":
    
    # use the provided master
    client = Client("dask-scheduler:8786")
    
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:53844,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:53857,Total threads: 2
Dashboard: http://127.0.0.1:53858/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:53847,
Local directory: /Users/niklai/github/distributed-juno/dask/dask-worker-space/worker-swlk_vqr,Local directory: /Users/niklai/github/distributed-juno/dask/dask-worker-space/worker-swlk_vqr

0,1
Comm: tcp://127.0.0.1:53860,Total threads: 2
Dashboard: http://127.0.0.1:53861/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:53850,
Local directory: /Users/niklai/github/distributed-juno/dask/dask-worker-space/worker-bqh43oq0,Local directory: /Users/niklai/github/distributed-juno/dask/dask-worker-space/worker-bqh43oq0

0,1
Comm: tcp://127.0.0.1:53866,Total threads: 2
Dashboard: http://127.0.0.1:53867/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:53848,
Local directory: /Users/niklai/github/distributed-juno/dask/dask-worker-space/worker-68xczte3,Local directory: /Users/niklai/github/distributed-juno/dask/dask-worker-space/worker-68xczte3

0,1
Comm: tcp://127.0.0.1:53862,Total threads: 2
Dashboard: http://127.0.0.1:53864/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:53849,
Local directory: /Users/niklai/github/distributed-juno/dask/dask-worker-space/worker-en9iugfj,Local directory: /Users/niklai/github/distributed-juno/dask/dask-worker-space/worker-en9iugfj


In [7]:
def load(path):
    return np.load(path, allow_pickle=True)["a"]

@delayed    
def R_yz(theta_rot, phi_rot):
    return np.array([[np.cos(phi_rot) * np.cos(theta_rot), -np.sin(phi_rot) * np.cos(theta_rot), np.sin(theta_rot)], 
                     [np.sin(phi_rot), np.cos(phi_rot), 0], 
                     [-np.sin(theta_rot) * np.cos(phi_rot), np.sin(theta_rot) * np.sin(phi_rot), np.cos(theta_rot)]])

@delayed
def convert_pmt_ids(input_ids, conversion_ids):
    cd_ids  = np.array(conversion_ids["CdID"])
    pmt_ids = np.array(conversion_ids["PMTID"])
    mask    = np.isin(cd_ids, input_ids)
    return pmt_ids[mask]

@delayed
def find_pmt_coord(pmt_positions, data_pmt_id):
    return pmt_positions[
        np.isin(pmt_positions.PMTID, data_pmt_id)
        ].loc[:, ['x', 'y', 'z']].reset_index(drop=True)

In [10]:
data_folder       = "../data/"
pmt_pos_fname     = "PMTPos_CD_LPMT.csv"
pmt_id_conv_fname = "PMT_ID_conversion.csv"
train_data_fname  = "raw_data_train_4.npz"

In [11]:
pmt_positions     = pd.read_csv(data_folder+pmt_pos_fname)
pmt_id_conversion = pd.read_csv(data_folder+pmt_id_conv_fname)
conversion_ids    = pd.read_csv(data_folder+pmt_id_conv_fname)

In [12]:
N_max = 115

error_ls = []
def rotate_single_file(fname):

    data_np = load(data_folder+train_data_fname) # dask.delayed(load)(data_folder+train_data_fname).compute()
    Nevents = 10 # data_np.shape[1]
    rotated = []

    for i in range(Nevents):
        # find non-zero-time hits
        data = da.from_array([data_np[:,i][j] for j in range(3)])
        nonzeros_inds = data[2] != 0.0
        data_pmt_id   = convert_pmt_ids(data[0][nonzeros_inds], conversion_ids)
        pmt_coord     = find_pmt_coord(pmt_positions, data_pmt_id)
        
        tot_charge = dask.delayed(sum)(data[1][nonzeros_inds])
        x_cc       = dask.delayed(sum)(pmt_coord[:,0] * data[1][nonzeros_inds]) / tot_charge
        y_cc       = dask.delayed(sum)(pmt_coord[:,1] * data[1][nonzeros_inds]) / tot_charge
        z_cc       = dask.delayed(sum)(pmt_coord[:,2] * data[1][nonzeros_inds]) / tot_charge
        
        theta_cc   = dask.delayed(da.arctan2)(
            dask.delayed(da.sqrt)((x_cc)**2+(y_cc)**2), z_cc
        )
        phi_cc     = dask.delayed(da.arctan2)(y_cc, x_cc) 
        
        theta_rot = -theta_cc + np.pi/2
        phi_rot   = -phi_cc
        
        # coord_new = np.matmul(R_yz(theta_rot, phi_rot), pmt_coord.T)
        coord_new = dask.delayed(da.matmul)(
            R_yz(theta_rot, phi_rot), pmt_coord.T
        )

        R = dask.delayed(da.sqrt)(dask.delayed(da.sum)(dask.delayed(da.power)(coord_new, 2), axis=0))

        charge_hitt = da.vstack([data[1], data[2]])
        charge_hitt = charge_hitt[:,nonzeros_inds]

        rotated.append(dask.delayed(da.vstack)([coord_new, R, charge_hitt]))
        del coord_new
        del charge_hitt
        del pmt_coord
    
    return rotated

def mapping_single_event(rotated_ev):
        ####################
        # rotated_ev must be computed
        ####################
    coord_new   = rotated_ev[:3]
    charge_hitt = rotated_ev[4:, ].T
    R           = rotated_ev[3,].mean()

    z_levels, step = da.linspace(coord_new[2,].min(), coord_new[2,].max(), 124, retstep=True)
    z_levels       = z_levels.persist()
    image_mat      = da.zeros((230,124,2))

    for j, z in enumerate(z_levels):
        mask = (da.abs(coord_new[2,] - z) < step)         #(np.abs(pmt_pos.z - z) < delta)
        if(not da.any(mask)): continue
        masked = coord_new[:,mask]

        Rz = (R**2 - z**2)
        Neff = 0 if Rz < 0 else N_max * da.sqrt(Rz) / R
        #ix = np.zeros(np.sum(mask), dtype=np.int32)
        ix = da.around( Neff * (da.arctan2(masked[1,], masked[0,]) / np.pi) + (N_max / 2) ) + 57
        ix = ix.astype(np.int32)
        #ix = ix.compute()
        if(da.any(ix >= 230)):
            ix[ix >= 230] = ix[ix >= 230] - 230

        image_mat[ix, j,] = charge_hitt[ix, ]

            # if np.isnan(mat[ix, i+1]):
            #     mat[ix, i+1] = row.id
            # else:
            #     mat[ix, 123 if i else i] = row.id

    del rotated_ev
    return image_mat
    

In [13]:
rotated_one_file = rotate_single_file(train_data_fname) # qui dovremo fare un client.map quando mettiamo più files

rotation_future  = client.map(dask.compute, rotated_one_file)

### !!!! ERRORE SUL GATHER

rotated          = client.gather(rotation_future)
persisted_rot    = [rotated[i][0].persist() for i in range(len(rotated))]

  (Delayed('vstack-934e51b9-7c7a-456c-a02a-c0407d971094'),)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
Function:  execute_task
args:      ((<built-in function getitem>,               x          y           z
0      316.0782  -882.0791  19365.0000
1      941.0836  1016.1627  19365.0000
2     -274.3375 -1908.0597  19338.1592
3    -1915.1156   219.7390  19338.1592
4     2213.9154  1066.1655  19278.0244
...         ...        ...         ...
2524  -744.5171 -1167.8695 -19365.0000
2525 -1273.1379   545.3989 -19365.0000
2526  -686.8696  -637.3219 -19365.0000
2527   895.3717   276.1856 -19365.0000
2528  -926.5345   139.6526 -19365.0000

[2529 rows x 3 columns], (<class 'tuple'>, [(<class 'slice'>, None, None, None), 2])))
kwargs:    {}
Exception: 'TypeError("

TypeError: '(slice(None, None, None), 2)' is an invalid key

Function:  compute
args:      (Delayed('vstack-8d88e35f-8458-46b0-b407-fe148870fa4d'))
kwargs:    {}
Exception: 'TypeError("\'(slice(None, None, None), 2)\' is an invalid key")'

Function:  compute
args:      (Delayed('vstack-73eae8fb-e9de-424b-b1c1-ecf8780d5f16'))
kwargs:    {}
Exception: 'TypeError("\'(slice(None, None, None), 2)\' is an invalid key")'

Function:  compute
args:      (Delayed('vstack-a6a610fc-46db-4d5e-8e1d-299376774a6c'))
kwargs:    {}
Exception: 'TypeError("\'(slice(None, None, None), 2)\' is an invalid key")'

Function:  compute
args:      (Delayed('vstack-9af0c92f-edb7-424d-9257-7d6e41399123'))
kwargs:    {}
Exception: 'TypeError("\'(slice(None, None, None), 2)\' is an invalid key")'



In [None]:
mapping_future   = client.map(mapping_single_event, persisted_rot)
mapped           = client.gather(mapping_future)
mapped

In [None]:
mapped_future    = client.map(dask.compute, mapped)
mapped_computed  = client.gather(mapped_future)
mapped_images    = [mapped_computed[i][0] for i in range(len(mapped_computed))]
mapped_images[0].shape

In [None]:
ev = 6

image = mapped_images[ev][:,:,0].T
image[image == 0] = np.NaN

fig, ax = plt.subplots(1, figsize=(20,10))
ax.imshow(image)
plt.show()

In [14]:
client.shutdown()

distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
