Skip to content

Commit

Permalink
ref: cleanup multiprocessing-based rotation in 3D bpg
Browse files Browse the repository at this point in the history
  • Loading branch information
paulmueller committed Feb 13, 2019
1 parent 9e44189 commit f1a848c
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 51 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
- basic 3D backpropagation algorithm (#7)
- 3D backpropagation with a tilted axis of rotation (#9)
- ref: replace asserts with raises (#8)
- ref: multiprocessing-based rotation does not anymore require
a variable (_shared_array) at the top level of the module; As a
result, multiprocessing-rotation should now also work on Windows.
0.2.2
- docs: minor update and add changelog
0.2.1
Expand Down
4 changes: 0 additions & 4 deletions odtbrain/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,3 @@

__author__ = "Paul Müller"
__license__ = "BSD (3 clause)"


# Shared variable used by 3D backpropagation
_shared_array = None
82 changes: 45 additions & 37 deletions odtbrain/_alg3d_bpp.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,44 @@
"""3D backpropagation algorithm"""
import ctypes
import multiprocessing as mp
import platform

import numexpr as ne
import numpy as np
import pyfftw
import scipy.ndimage

import odtbrain

from . import util


_ncores = mp.cpu_count()
ncores = mp.cpu_count()
mprotate_dict = {}


def _cleanup_worker():
if "X" in mprotate_dict:
mprotate_dict.pop("X")
if "X_shape" in mprotate_dict:
mprotate_dict.pop("X_shape")
if "X_dtype" in mprotate_dict:
mprotate_dict.pop("X_dtype")


def _init_worker(X, X_shape, X_dtype):
"""Initializer for pool for _mprotate"""
# Using a dictionary is not strictly necessary. You can also
# use global variables.
mprotate_dict["X"] = X
mprotate_dict["X_shape"] = X_shape
mprotate_dict["X_dtype"] = X_dtype


def _mprotate(ang, lny, pool, order):
"""Uses multiprocessing to wrap around _rotate
4x speedup on an intel i7-3820 CPU @ 3.60GHz with 8 cores.
The function calls _rotate which accesses the
`odtbrain._shared_array`. Data is rotated in-place.
The function calls _rotate which accesses the `mprotate_dict`.
Data is rotated in-place.
Parameters
----------
Expand All @@ -37,38 +53,29 @@ def _mprotate(ang, lny, pool, order):
"""
targ_args = list()

slsize = np.int(np.floor(lny / _ncores))
slsize = np.int(np.floor(lny / ncores))

for t in range(_ncores):
for t in range(ncores):
ymin = t * slsize
ymax = (t + 1) * slsize
if t == _ncores - 1:
if t == ncores - 1:
ymax = lny
targ_args.append((ymin, ymax, ang, order))

if platform.system() == "Windows":
# Because Windows does not support forking,
# the subprocess does not have access to
# odtbrain._shared_array. We circumvent
# this problem by not using a pool.
# We could copy all the data instead and
# use a _rotate function that accepts the
# array as an argument.
for d in targ_args:
_rotate(d)

else:
pool.map(_rotate, targ_args)
pool.map(_rotate, targ_args)


def _rotate(d):
arr = np.frombuffer(mprotate_dict["X"],
dtype=mprotate_dict["X_dtype"]).reshape(
mprotate_dict["X_shape"])
(ymin, ymax, ang, order) = d
return scipy.ndimage.interpolation.rotate(
odtbrain._shared_array[:, ymin:ymax, :], # input
arr[:, ymin:ymax, :], # input
angle=-ang, # angle
axes=(0, 2), # axes
reshape=False, # reshape
output=odtbrain._shared_array[:, ymin:ymax, :], # output
output=arr[:, ymin:ymax, :], # output
order=order, # order
mode="constant", # mode
cval=0)
Expand All @@ -78,7 +85,7 @@ def backpropagate_3d(uSin, angles, res, nm, lD=0, coords=None,
weight_angles=True, onlyreal=False,
padding=(True, True), padfac=1.75, padval=None,
intp_order=2, dtype=None,
num_cores=_ncores,
num_cores=ncores,
save_memory=False,
copy=True,
count=None, max_count=None,
Expand Down Expand Up @@ -238,9 +245,9 @@ def backpropagate_3d(uSin, angles, res, nm, lD=0, coords=None,
raise ValueError("Parameter `padding` must be boolean tuple.")
if coords is not None:
raise NotImplementedError("Setting coordinates is not yet supported.")
if num_cores > _ncores:
if num_cores > ncores:
raise ValueError("`num_cores` must not exceed number "
+ "of physical cores: {}".format(_ncores))
+ "of physical cores: {}".format(ncores))

# setup dtype
if dtype is None:
Expand Down Expand Up @@ -468,14 +475,14 @@ def backpropagate_3d(uSin, angles, res, nm, lD=0, coords=None,
direction="FFTW_BACKWARD",
flags=["FFTW_MEASURE"])

# assert shared_array.base.base is shared_array_base.get_obj()
shared_array_base = mp.Array(ct_dt_map[dtype], ln * lny * lnx)
_shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
_shared_array = _shared_array.reshape(ln, lny, lnx)
# Setup a shared array
shared_array = mp.RawArray(ct_dt_map[dtype], ln * lny * lnx)
arr = np.frombuffer(shared_array, dtype=dtype).reshape(ln, lny, lnx)

# Initialize the pool with the shared array
odtbrain._shared_array = _shared_array
pool4loop = mp.Pool(processes=num_cores)
pool4loop = mp.Pool(processes=num_cores,
initializer=_init_worker,
initargs=(shared_array, (ln, lny, lnx), dtype))

# filtered projections in loop
filtered_proj = np.zeros((ln, lny, lnx), dtype=dtype_complex)
Expand Down Expand Up @@ -514,7 +521,7 @@ def backpropagate_3d(uSin, angles, res, nm, lD=0, coords=None,

# resize image to original size
# The copy is necessary to prevent memory leakage.
_shared_array[:] = filtered_proj.real
arr[:] = filtered_proj.real

phi0 = np.rad2deg(angles[aa])

Expand All @@ -523,17 +530,18 @@ def backpropagate_3d(uSin, angles, res, nm, lD=0, coords=None,

_mprotate(phi0, lny, pool4loop, intp_order)

outarr.real += _shared_array
outarr.real += arr

if not onlyreal:
_shared_array[:] = filtered_proj_imag
arr[:] = filtered_proj_imag
_mprotate(phi0, lny, pool4loop, intp_order)
outarr.imag += _shared_array
outarr.imag += arr

if count is not None:
count.value += 1

pool4loop.terminate()
pool4loop.join()
_cleanup_worker()

return outarr
25 changes: 15 additions & 10 deletions tests/test_alg3d_bpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ def test_3d_backprop_phase32():
r = list()
for p in parameters:
f = odtbrain.backpropagate_3d(sino, angles,
dtype=np.float32, padval=0, **p)
dtype=np.float32,
padval=0,
**p)
r.append(cutout(f))
data32 = np.array(r).flatten().view(np.float32)
data64 = test_3d_backprop_phase()
Expand All @@ -105,21 +107,24 @@ def test_3d_mprotate():
ln = 10
ln2 = 2*ln
initial_array = np.arange(ln2**3).reshape((ln2, ln2, ln2))
shared_array_base = mp.Array(ctypes.c_double, ln2 * ln2 * ln2)
_shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
_shared_array = _shared_array.reshape(ln2, ln2, ln2)
_shared_array[:, :, :] = initial_array
odtbrain._shared_array = _shared_array
# pool must be created after _shared array
pool = mp.Pool(processes=mp.cpu_count())
shared_array = mp.RawArray(ctypes.c_double, ln2 * ln2 * ln2)
arr = np.frombuffer(shared_array).reshape(ln2, ln2, ln2)
arr[:, :, :] = initial_array
_alg3d_bpp.mprotate_dict["X"] = shared_array
_alg3d_bpp.mprotate_dict["X_shape"] = (ln2, ln2, ln2)

pool = mp.Pool(processes=mp.cpu_count(),
initializer=_alg3d_bpp._init_worker,
initargs=(shared_array, (ln2, ln2, ln2), np.dtype(float)))
_alg3d_bpp._mprotate(2, ln, pool, 2)
if WRITE_RES:
write_results(myframe, _shared_array)
assert np.allclose(np.array(_shared_array).flatten().view(
write_results(myframe, arr)
assert np.allclose(np.array(arr).flatten().view(
float), get_results(myframe))


if __name__ == "__main__":
test_3d_backprop_phase32()
# Run all tests
loc = locals()
for key in list(loc.keys()):
Expand Down

0 comments on commit f1a848c

Please sign in to comment.