In [1]:
import os
import pandas as pd
import numpy as np
import fitsio
import dask
from dask import delayed

In [2]:
def extract_coincidentals(spikes_list, idx):
    
    # Spikes coordinates at given wavelength index
    spikes_w = spikes_list[idx]
    # Associated neighbour coordinates
    nb_pixels = index_8nb[spikes_w[0, :], :]
    # Sublist of spikes data that will excludes the one serving as template
    spikes_sublist = spikes_list[:idx]+spikes_list[idx+1:]
    # Coincidental cross-referencing. 
    mask_w_arr = np.array([np.isin(nb_pixels, index_8nb[spikes[0,:], :]).any(axis=1) for spikes in spikes_sublist])
    select_pixels = mask_w_arr.any(axis=0)
    coords_w = spikes_w[0, select_pixels] 
    w_tables = np.insert(mask_w_arr[:, select_pixels], idx, True, axis=0)
    # Retrieve intensity values for the selected coordinates
    intensities = spikes_w[ 1:, select_pixels]
    arr_w = np.concatenate([coords_w[np.newaxis,...], intensities, w_tables], axis=0)
    arr_w = np.insert(arr_w, 3, idx, axis=0)
    
    return arr_w

def extract_all_coincidentals(spikes_list):
    column_names = ['coords' , 'int1', 'int2', 'wref', 'w0', 'w1', 'w2', 'w3', 'w4', 'w5', 'w6']
    group_data = np.concatenate([extract_coincidentals(spikes_list, i) for i in range(7)], axis=1)
    coincidental_spikes_df = pd.DataFrame(group_data.T, columns=column_names)
    return coincidental_spikes_df


def process_group(fpaths):
    spikes_list = [fitsio.read(os.path.join(data_dir, f)) for f in fpaths]
    df_group = extract_all_coincidentals(spikes_list)
    return df_group

In [3]:
data_dir = os.environ['SPIKESDATA']
spikes_df = pd.read_parquet(os.path.join(data_dir, 'spikes_df_2010.parquet'), engine='pyarrow')
spikes_df2 = spikes_df.set_index(['GroupNumber', 'Time'])

In [4]:
path_Series = spikes_df2['Path']
path_Series.head()

GroupNumber  Time                            
0            2010-05-13 00:00:02.090000+00:00    2010/05/13/2010-05-13T00:00:02.09Z_0193.spikes...
             2010-05-13 00:00:03.570000+00:00    2010/05/13/2010-05-13T00:00:03.57Z_0094.spikes...
             2010-05-13 00:00:05.070000+00:00    2010/05/13/2010-05-13T00:00:05.07Z_0335.spikes...
             2010-05-13 00:00:06.580000+00:00    2010/05/13/2010-05-13T00:00:06.58Z_0171.spikes...
             2010-05-13 00:00:08.080000+00:00    2010/05/13/2010-05-13T00:00:08.08Z_0211.spikes...
Name: Path, dtype: object

In [5]:
tintervals = pd.interval_range(start=spikes_df['Time'].iloc[0], end=spikes_df['Time'].iloc[-1], freq='10D', closed='left')

In [6]:
tselect = (spikes_df['Time'] >= tintervals[0].left) & (spikes_df['Time'] < tintervals[0].right)
groups = spikes_df['GroupNumber'].loc[tselect].unique()
groups

array([    0,     1,     2, ..., 71998, 71999, 72000])

In [7]:
spikes_df2['Path'].loc[groups[0:3]]

GroupNumber  Time                            
0            2010-05-13 00:00:02.090000+00:00    2010/05/13/2010-05-13T00:00:02.09Z_0193.spikes...
             2010-05-13 00:00:03.570000+00:00    2010/05/13/2010-05-13T00:00:03.57Z_0094.spikes...
             2010-05-13 00:00:05.070000+00:00    2010/05/13/2010-05-13T00:00:05.07Z_0335.spikes...
             2010-05-13 00:00:06.580000+00:00    2010/05/13/2010-05-13T00:00:06.58Z_0171.spikes...
             2010-05-13 00:00:08.080000+00:00    2010/05/13/2010-05-13T00:00:08.08Z_0211.spikes...
             2010-05-13 00:00:09.580000+00:00    2010/05/13/2010-05-13T00:00:09.58Z_0304.spikes...
             2010-05-13 00:00:11.080000+00:00    2010/05/13/2010-05-13T00:00:11.08Z_0131.spikes...
1            2010-05-13 00:00:14.080000+00:00    2010/05/13/2010-05-13T00:00:14.08Z_0193.spikes...
             2010-05-13 00:00:15.580000+00:00    2010/05/13/2010-05-13T00:00:15.58Z_0094.spikes...
             2010-05-13 00:00:17.080000+00:00    2010/05/13/201

In [8]:
################################################################################################
# Pre-compute the 8-connectivity lookup table. This will be shared across parallel workers.
################################################################################################
# List of relative 2D coordinates for 8-neighbour connectiviy (9-element list). 1st one is the origin pixel.
coords_8nb = np.array([[0, 0], [-1, 0], [-1, -1], [0, -1], [1, -1], [1, 0], [1, 1], [0, 1], [-1, 1]])
# Array of 2D coordinates for a 4096 x 4096 array. Matrix convention is kept. [rows, cols] = [y-axis, x-axis]
ny, nx = [4096, 4096]
coords_1d = np.arange(nx * ny)
coordy, coordx = np.unravel_index(coords_1d, [ny, nx]) # also possible by raveling a meshgrid() output
coords2d = np.array([coordy, coordx])
# Create the array of 2D coordinates of 8-neighbours associated with each pixel.
# pixel 0 has 8 neighbour + itself, pixel 1 has 8 neighbour + itself, etc...
coords2d_8nb = coords2d[np.newaxis, ...] + coords_8nb[..., np.newaxis]
# Handle off-edges coordinates by clipping to the edges, operation done in-place. Here, square detector assumed. Update
# to per-axis clipping if that ever changes for another instrument.
np.clip(coords2d_8nb, 0, nx-1, out=coords2d_8nb)
# Convert to 1D coordinates.
index_8nb = np.array([coords2d_8nb[i, 0, :] * nx + coords2d_8nb[i, 1, :] for i in range(len(coords_8nb))],
                     dtype='int32', order='C').T
index_8nb.shape

(16777216, 9)

In [9]:
groups = spikes_df2.index.get_level_values('GroupNumber').unique()

In [14]:
paths_list_groups = [path_Series.loc[group_n] for group_n in groups[0:24]]
paths_list_groups[0]

Time
2010-05-13 00:00:02.090000+00:00    2010/05/13/2010-05-13T00:00:02.09Z_0193.spikes...
2010-05-13 00:00:03.570000+00:00    2010/05/13/2010-05-13T00:00:03.57Z_0094.spikes...
2010-05-13 00:00:05.070000+00:00    2010/05/13/2010-05-13T00:00:05.07Z_0335.spikes...
2010-05-13 00:00:06.580000+00:00    2010/05/13/2010-05-13T00:00:06.58Z_0171.spikes...
2010-05-13 00:00:08.080000+00:00    2010/05/13/2010-05-13T00:00:08.08Z_0211.spikes...
2010-05-13 00:00:09.580000+00:00    2010/05/13/2010-05-13T00:00:09.58Z_0304.spikes...
2010-05-13 00:00:11.080000+00:00    2010/05/13/2010-05-13T00:00:11.08Z_0131.spikes...
Name: Path, dtype: object

In [11]:
from dask.distributed import Client

client = Client(n_workers=4)

In [17]:
spikes_groups = [[fitsio.read(os.path.join(data_dir, f)) for f in files] for files in paths_list_groups]

In [19]:
%time data_groups = [extract_all_coincidentals(spikes_list) for spikes_list in spikes_groups]

CPU times: user 20.7 s, sys: 137 ms, total: 20.8 s
Wall time: 20.4 s


In [20]:
delayed_spikes = [delayed(extract_all_coincidentals)(spikes_list) for spikes_list in spikes_groups] 

In [None]:
%time r = dask.compute(*delayed_spikes)

  ([array([[    9362,     9706,    10170, ..., 16767 ... dtype=int32)],)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  % (format_bytes(len(b)), s)


In [16]:
r[1]

Unnamed: 0,coords,int1,int2,wref,w0,w1,w2,w3,w4,w5,w6
0,59972,56,8,0,1,0,0,0,1,0,0
1,60593,324,2,0,1,0,0,1,0,0,0
2,132225,764,10,0,1,0,0,0,0,1,0
3,158038,42,10,0,1,0,0,0,1,0,0
4,158039,45,10,0,1,0,0,0,1,0,0
5,161842,296,9,0,1,0,0,1,0,0,0
6,169588,622,13,0,1,0,0,0,1,0,0
7,169589,501,14,0,1,0,0,0,1,0,0
8,194379,158,10,0,1,1,0,0,0,0,1
9,198475,230,11,0,1,1,0,0,0,0,1


In [20]:
client.close()