Skip to content

Commit

Permalink
Add feature to map pairwise distances matrix to disk while computing …
Browse files Browse the repository at this point in the history
…the cluster matrix in ASSET (#498)

* Implemented clustering using mapping of distance matrix to a file
* Unit tests for clustering using array mapped to disk
* Changed temporary file class and set explicit file names

Co-authored-by: Cristiano Köhler <c.koehler@fz-juelich.de>
  • Loading branch information
kohlerca and Cristiano Köhler committed Nov 16, 2022
1 parent c014298 commit dd42af5
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 21 deletions.
165 changes: 144 additions & 21 deletions elephant/asset/asset.py
Expand Up @@ -351,7 +351,8 @@ def _analog_signal_step_interp(signal, times):
# =============================================================================


def _stretched_metric_2d(x, y, stretch, ref_angle, working_memory=None):
def _stretched_metric_2d(x, y, stretch, ref_angle, working_memory=None,
mapped_array_file=None, verbose=False):
r"""
Given a list of points on the real plane, identified by their abscissa `x`
and ordinate `y`, compute a stretched transformation of the Euclidean
Expand Down Expand Up @@ -385,12 +386,37 @@ def _stretched_metric_2d(x, y, stretch, ref_angle, working_memory=None):
ref_angle : float
Reference angle in degrees (i.e., the inclination along which the
stretching factor is 1).
working_memory : int, optional
The sought maximum memory in MiB for temporary distance matrix chunks.
When None (default), no chunking is performed. This parameter is passed
directly to `sklearn.metrics.pairwise_distances_chunked` function, and
it has no influence on the outcome matrix. Instead, it controls the
memory VS speed trade-off.
Default: None
mapped_array_file : file-like, optional
Temporary file, that should be used to store the matrix of stretched
distances when chunking the computations. This is achieved using
`np.memmap`. If `working_memory` is None (no chunking), this parameter
is ignored. This will not impact the results, but the operations will
be slower (than chunking and storing the final matrix in a memory
array). This option should be used when there is not enough memory to
allocate the full stretched distance matrix needed before DBSCAN.
Default: None
verbose : bool, optional
Display progress bars and log messages.
Default: False
Returns
-------
D : (n,n) np.ndarray
Square matrix of distances between all pairs of points.
Raises
------
MemoryError
If there is not enough memory to allocate the matrix to store the
pairwise distances when using chunked computations.
"""
alpha = np.deg2rad(ref_angle) # reference angle in radians

Expand Down Expand Up @@ -438,20 +464,86 @@ def calculate_stretch_mat(theta_mat, D_mat):
stretch_mat = calculate_stretch_mat(theta, D)
else:
start = 0

# Depending on the memory size requested, check how many rows can be
# processed per iteration. Mininum is 1. Working memory size is in MB.
# Size is computed for a float32 matrix. The function
# `pairwise_distances_chunked` returns half of the possible size.
estimated_chunk = max(
((working_memory * 1024 * 1024) // (len(y) * 4)) // 2, 1)

# The number of rows in a chunk cannot be larger than the maximum
estimated_chunk = min(len(x), estimated_chunk)

# Compute the number of iterations needed
it_todo = len(x) // estimated_chunk

# If size is not a multiple, an extra iteration with smaller size
# is needed
last_chunk = len(x) % estimated_chunk
if last_chunk > 0:
it_todo += 1
if verbose:
print(f"Estimated chunk size: {estimated_chunk}; "
f"Dimension: ({len(x)}, {len(y)}), "
f"Number of chunked iterations: {it_todo}")

# x and y sizes are the same
stretch_mat = np.empty((len(x), len(y)), dtype=np.float32)
for D_chunk in pairwise_distances_chunked(
points, working_memory=working_memory):
if mapped_array_file is None:
# Create the distance matrix in memory. Raise exception if
# it is not possible due to insufficient memory.
try:
stretch_mat = np.empty((len(x), len(y)), dtype=np.float32)
except MemoryError:
required_size = (len(x) * len(y) * 4) / (1024 ** 3)
raise MemoryError("Can't allocate array in memory. Specify "
"a temporary disk file to map the array "
"to the disk. Operations will be slower. "
f"The required size is {required_size} GiB")

else:
# Using an array mapped to disk. Store in the file passed as
# parameter
if verbose:
print(f"Creating disk array at '{mapped_array_file.name}'.")

stretch_mat = np.memmap(mapped_array_file, mode='w+',
shape=(len(x), len(y)),
dtype=np.float32)

# Buffer to store the computations per iteration, to avoid
# writing to the file after every single operation
chunk_mat = np.empty((estimated_chunk, len(y)), dtype=np.float32)

for D_chunk in tqdm(
pairwise_distances_chunked(points,
working_memory=working_memory),
desc='Pairwise distances chunked',
total=it_todo, disable=not verbose):

chunk_size = D_chunk.shape[0]

assert (chunk_size == estimated_chunk or
chunk_size == last_chunk) # Safety check

dX = x_array[:, start: start + chunk_size].T - x_array
dY = y_array[:, start: start + chunk_size].T - y_array

theta_chunk = np.arctan2(
dY, dX, out=stretch_mat[start: start + chunk_size, :])
# If not using an array mapped to the disk, the output of
# the theta computations are written directly to the
# stretch_mat. Otherwise, write to the buffer
out = stretch_mat[start: start + chunk_size, :] \
if mapped_array_file is None else chunk_mat[:chunk_size, :]

theta_chunk = np.arctan2(dY, dX, out=out)

# stretch_mat (theta_chunk) is updated in-place here
calculate_stretch_mat(theta_chunk, D_chunk)

# If mapping to file, transfer from the buffer to stretch_mat
if mapped_array_file is not None:
stretch_mat[start: start + chunk_size, :] = theta_chunk

start += chunk_size

# Return the stretched distance matrix
Expand Down Expand Up @@ -1348,6 +1440,7 @@ def synchronous_events_intersection(sse1, sse2, intersection='linkwise'):
if len(sse_new[pixel1]) == 0:
del sse_new[pixel1]
elif intersection == 'pixelwise':
# no action required
pass
else:
raise ValueError(
Expand Down Expand Up @@ -2363,7 +2456,8 @@ def mask_matrices(matrices, thresholds):

@staticmethod
def cluster_matrix_entries(mask_matrix, max_distance, min_neighbors,
stretch, working_memory=None):
stretch, working_memory=None, array_file=None,
keep_file=False, verbose=False):
r"""
Given a matrix `mask_matrix`, replaces its positive elements with
integers representing different cluster IDs. Each cluster comprises
Expand Down Expand Up @@ -2419,10 +2513,29 @@ def cluster_matrix_entries(mask_matrix, max_distance, min_neighbors,
The sought maximum memory in MiB for temporary distance matrix
chunks. When None (default), no chunking is performed. This
parameter is passed directly to
``sklearn.metrics.pairwise_distances_chunked`` function and it
has no influence on the outcome matrix. Instead, it control the
``sklearn.metrics.pairwise_distances_chunked`` function, and it
has no influence on the outcome matrix. Instead, it controls the
memory VS speed trade-off.
Default: None
array_file : str or path-like, optional
Path to a location of a temporary file, that should be used to
store the matrix of stretched distances when chunking the
computations. This is achieved using `np.memmap`. If
`working_memory` is None (no chunking), this parameter is ignored.
This will not impact the results, but the operations will be
slower (than chunking and storing the final matrix in a memory
array). This option should be used when there is not enough memory
to allocate the full stretched distance matrix needed before
DBSCAN.
Default: None
keep_file : bool, optional
Delete the temporary file specified in `array_file` automatically.
This option can be used to access the distance matrix after the
clustering.
Default: False
verbose : bool, optional
Display log messages and progress bars.
Default: False
Returns
-------
Expand All @@ -2448,16 +2561,30 @@ def cluster_matrix_entries(mask_matrix, max_distance, min_neighbors,
# List the significant pixels of mat in a 2-columns array
xpos_sgnf, ypos_sgnf = np.where(mask_matrix > 0)

# Allocate temporary file if requested
mapped_array_file = None
if array_file:
file_path = Path(array_file) if isinstance(array_file, str) \
else array_file
file_dir = file_path.parent
file_name = file_path.stem
mapped_array_file = tempfile.NamedTemporaryFile(
prefix=file_name, dir=file_dir,
delete=not keep_file)

# Compute the matrix D[i, j] of euclidean distances between pixels i
# and j
try:
D = _stretched_metric_2d(
xpos_sgnf, ypos_sgnf, stretch=stretch, ref_angle=45,
working_memory=working_memory
)
working_memory=working_memory,
mapped_array_file=mapped_array_file, verbose=verbose)
except MemoryError as err:
raise MemoryError("Set 'working_memory=100' or another value to "
"chunk the data") from err
"chunk the data. If this does not solve, use the"
" 'array_file' parameter to pass a location for "
"a temporary file to map the array to the disk."
) from err

# Cluster positions of significant pixels via dbscan
core_samples, config = dbscan(
Expand Down Expand Up @@ -2521,18 +2648,14 @@ def extract_synchronous_events(self, cmat, ids=None):
t_stop=self.t_stop_i,
ids=ids)

if self.spiketrains_j is self.spiketrains_i:
if self.spiketrains_j is self.spiketrains_i or self.is_symmetric():
diag_id = 0
tracts_y = tracts_x
else:
if self.is_symmetric():
diag_id = 0
tracts_y = tracts_x
else:
diag_id = None
tracts_y = _transactions(
self.spiketrains_j, bin_size=self.bin_size,
t_start=self.t_start_j, t_stop=self.t_stop_j, ids=ids)
diag_id = None
tracts_y = _transactions(
self.spiketrains_j, bin_size=self.bin_size,
t_start=self.t_start_j, t_stop=self.t_stop_j, ids=ids)

# Reconstruct each worm, link by link
sse_dict = {}
Expand Down
21 changes: 21 additions & 0 deletions elephant/test/test_asset.py
Expand Up @@ -11,6 +11,8 @@
import random
import unittest
import warnings
import tempfile
from pathlib import Path

import neo
import numpy as np
Expand Down Expand Up @@ -221,6 +223,25 @@ def test_cluster_matrix_entries_chunked(self):
stretch=stretch, working_memory=working_memory)
assert_array_equal(cmat, cmat_true)

def test_cluster_matrix_entries_chunked_array_file(self):
np.random.seed(12)
mmat = np.random.randn(100, 100) > 0
max_distance = 2
min_neighbors = 2
stretch = 2
cmat_true = asset.ASSET.cluster_matrix_entries(
mmat, max_distance=max_distance, min_neighbors=min_neighbors,
stretch=stretch)

for working_memory in [1, 10, 100, 1000]:
with tempfile.TemporaryDirectory() as tmpdir:
cmat = asset.ASSET.cluster_matrix_entries(
mmat, max_distance=max_distance,
min_neighbors=min_neighbors, stretch=stretch,
working_memory=working_memory,
array_file=Path(tmpdir) / f"test_dist_{working_memory}")
assert_array_equal(cmat, cmat_true)

def test_pmat_neighbors_gpu(self):
np.random.seed(12)
n_largest = 3
Expand Down

0 comments on commit dd42af5

Please sign in to comment.