In [1]:
import os
import pandas as pd
import fitsio
import time
import ray
import numpy as np

In [2]:
def extract_coincidentals(spikes_list, coords_id, idx):
    # Spikes coordinates at given wavelength index
    spikes_w = spikes_list[idx]
    # Associated neighbour coordinates
    nb_pixels = coords_id[spikes_w[0, :], :]
    # Sublist of spikes data that will excludes the one serving as template
    spikes_sublist = spikes_list[:idx] + spikes_list[idx + 1:]
    # Coincidental cross-referencing.
    mask_w_arr = np.array([np.isin(nb_pixels, coords_id[spikes[0, :], :]).any(axis=1) for spikes in spikes_sublist])
    select_pixels = mask_w_arr.any(axis=0)
    coords_w = spikes_w[0, select_pixels]
    w_tables = np.insert(mask_w_arr[:, select_pixels], idx, True, axis=0)
    # Retrieve intensity values for the selected coordinates
    intensities = spikes_w[1:, select_pixels]
    arr_w = np.concatenate([coords_w[np.newaxis, ...], intensities, w_tables], axis=0)
    arr_w = np.insert(arr_w, 3, idx, axis=0)

    return arr_w

In [16]:
@ray.remote
def process_group(coords, fpaths, group_n):
    spikes_list = [fitsio.read(os.path.join(os.environ['SPIKESDATA'], f)) for f in fpaths]
    group_data = np.concatenate([extract_coincidentals(spikes_list, coords, i) for i in range(7)], axis=1)
    column_names = ['coords', 'int1', 'int2', 'wref', 'w0', 'w1', 'w2', 'w3', 'w4', 'w5', 'w6']
    coincidental_spikes_df = pd.DataFrame(group_data.T, columns=column_names)
    coincidental_spikes_df['GroupNumber'] = group_n
    return coincidental_spikes_df

In [4]:
nx, ny = 4096, 4096
# List of relative 2D coordinates for 8-neighbour connectiviy (9-element list). 1st one is the origin pixel.
coords_8nb = np.array([[0, 0], [-1, 0], [-1, -1], [0, -1], [1, -1], [1, 0], [1, 1], [0, 1], [-1, 1]])
# Array of 2D coordinates for a 4096 x 4096 array. Matrix convention is kept. [rows, cols] = [y-axis, x-axis]
coords_1d = np.arange(nx * ny)
coordy, coordx = np.unravel_index(coords_1d, [ny, nx]) # also possible by raveling a meshgrid() output
coords2d = np.array([coordy, coordx])
# Create the array of 2D coordinates of 8-neighbours associated with each pixel.
# pixel 0 has 8 neighbour + itself, pixel 1 has 8 neighbour + itself, etc...
coords2d_8nb = coords2d[np.newaxis, ...] + coords_8nb[..., np.newaxis]
# Handle off-edges coordinates by clipping to the edges, operation done in-place. Here, square detector assumed.
# to per-axis clipping if that ever changes for another instrument.
np.clip(coords2d_8nb, 0, nx-1, out=coords2d_8nb)
# Convert to 1D coordinates.
lookup_coords = np.array([coords2d_8nb[i, 0, :] * nx + coords2d_8nb[i, 1, :] for i in range(len(coords_8nb))],
                     dtype='int32', order='C').T

In [5]:
ncpus = 60
ray.init(num_cpus=ncpus)

2020-02-12 15:50:19,852	INFO resource_spec.py:212 -- Starting Ray with 335.89 GiB memory available for workers and up to 18.63 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).


{'node_ip_address': '10.1.222.15',
 'redis_address': '10.1.222.15:59236',
 'object_store_address': '/tmp/ray/session_2020-02-12_15-50-19_850563_13292/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2020-02-12_15-50-19_850563_13292/sockets/raylet',
 'webui_url': None,
 'session_dir': '/tmp/ray/session_2020-02-12_15-50-19_850563_13292'}

In [6]:
spikes_df = pd.read_parquet(os.path.join(os.environ['SPIKESDATA'], 'spikes_df_2010_filtered.parquet'), engine='pyarrow')
spikes_df2 = spikes_df.set_index(['GroupNumber', 'Time'])
tintervals = pd.interval_range(start=pd.Timestamp('2010-05-13 00:00:00', tz='UTC'), end=pd.Timestamp('2011-01-01 00:00:00', tz='UTC'), freq='D', closed='left')

tinterval = tintervals[0]
groups = spikes_df['GroupNumber'].loc[(spikes_df['Time'] >= tinterval.left) & (spikes_df['Time'] < tinterval.right)].unique()

In [7]:
fpaths_ = [spikes_df2['Path'].loc[group_n] for group_n in groups]

In [8]:
fpaths_id = ray.put(fpaths_)
coords_id = ray.put(lookup_coords)

In [14]:
%time res = ray.get([process_group.remote(coords_id, fpaths, g) for g, fpaths in enumerate(fpaths_)])

CPU times: user 24.3 s, sys: 5.44 s, total: 29.7 s
Wall time: 4min 1s


In [17]:
%time res = ray.get([process_group.remote(coords_id, fpaths, g) for g, fpaths in enumerate(fpaths_)])

CPU times: user 26.9 s, sys: 5.7 s, total: 32.6 s
Wall time: 3min 58s
