Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix debugging for parallelized dfield generation #374

Merged
merged 3 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/benchmark_targets.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
binning_1d: 3.1223518816799785
binning_4d: 9.514051519199997
inv_dfield: 7.265958606239991
inv_dfield: 5.934024921119999
workflow_1d: 18.886161206160004
workflow_4d: 22.608196924320012
74 changes: 9 additions & 65 deletions sed/calibrator/momentum.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import itertools as it
from copy import deepcopy
from datetime import datetime
from multiprocessing import Pool
from typing import Any
from typing import Dict
from typing import List
Expand All @@ -19,13 +18,14 @@
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import psutil
import scipy.ndimage as ndi
import xarray as xr
from bokeh.colors import RGB
from bokeh.io import output_notebook
from bokeh.palettes import Category10 as ColorCycle
from IPython.display import display
from joblib import delayed
from joblib import Parallel
from matplotlib import cm
from numpy.linalg import norm
from scipy.interpolate import griddata
Expand All @@ -34,8 +34,6 @@
from symmetrize import sym
from symmetrize import tps

N_CPU = psutil.cpu_count()


class MomentumCorrector:
"""
Expand Down Expand Up @@ -72,10 +70,6 @@ def __init__(

self._config = config

self.num_cores = self._config.get("binning", {}).get("num_cores", N_CPU - 1)
if self.num_cores >= N_CPU:
self.num_cores = N_CPU - 1

self.image: np.ndarray = None
self.img_ndim: int = None
self.slice: np.ndarray = None
Expand Down Expand Up @@ -1226,7 +1220,6 @@ def calc_inverse_dfield(self):
self.cdeform_field,
self.bin_ranges,
self.detector_ranges,
self.num_cores,
)

return self.inverse_dfield
Expand Down Expand Up @@ -1716,7 +1709,6 @@ def apply_corrections(
self.cdeform_field,
self.bin_ranges,
self.detector_ranges,
self.num_cores,
)
self.dfield_updated = False

Expand Down Expand Up @@ -2052,7 +2044,6 @@ def generate_inverse_dfield(
cdeform_field: np.ndarray,
bin_ranges: List[Tuple],
detector_ranges: List[Tuple],
num_cores: int,
) -> np.ndarray:
"""Generate inverse deformation field using inperpolation with griddata.
Assuming the binning range of the input ``rdeform_field`` and ``cdeform_field``
Expand All @@ -2063,7 +2054,6 @@ def generate_inverse_dfield(
cdeform_field (np.ndarray): Column-wise deformation field.
bin_ranges (List[Tuple]): Detector ranges of the binned coordinates.
detector_ranges (List[Tuple]): Ranges of detector coordinates to interpolate to.
num_cores (int): number of cores to use for parallelization.

Returns:
np.ndarray: The calculated inverse deformation field (row/column)
Expand Down Expand Up @@ -2096,49 +2086,7 @@ def generate_inverse_dfield(
rc_position = [] # row/column position in c/rdeform_field
r_dest = [] # destination pixel row position
c_dest = [] # destination pixel column position
compute_i0 = [(cdeform_field.shape[0] * i) // num_cores for i in np.arange(0, num_cores)]
compute_i1 = [(cdeform_field.shape[0] * i) // num_cores for i in np.arange(1, num_cores + 1)]
data = [
(rdeform_field, cdeform_field, bin_ranges, bin_step, i0, i1)
for (i0, i1) in zip(compute_i0, compute_i1)
]
with Pool(num_cores) as p:
ret = p.map(generate_lists, data)

for pos, rd, cd in ret:
rc_position += pos
r_dest += rd
c_dest += cd

with Pool(2) as p:
ret = p.map(
griddata_,
[
(np.asarray(rc_position), np.asarray(r_dest), (r_mesh, c_mesh)),
(np.asarray(rc_position), np.asarray(c_dest), (r_mesh, c_mesh)),
],
)

inverse_dfield = np.asarray([ret[0], ret[1]])

return inverse_dfield


def generate_lists(args):
"""Function for paralellizing code with multiprocessing.Pool.map

Args:
args: argument tuple containing (rdeform_field, cdeform_field, bin_ranges, bin_step, i0, i1)

Returns:
return tuple of lists (rc_position, r_dest, c_dest)
"""
(rdeform_field, cdeform_field, bin_ranges, bin_step, i0, i1) = args
rc_position = [] # row/column position in c/rdeform_field
r_dest = [] # destination pixel row position
c_dest = [] # destination pixel column position

for i in np.arange(i0, i1):
for i in np.arange(cdeform_field.shape[0]):
for j in np.arange(cdeform_field.shape[1]):
if not np.isnan(rdeform_field[i, j]) and not np.isnan(
cdeform_field[i, j],
Expand All @@ -2155,19 +2103,15 @@ def generate_lists(args):
c_dest.append(
bin_step[1] * j + bin_ranges[1][0],
)
return (rc_position, r_dest, c_dest)


def griddata_(args):
"""Wrapper for griddata to use with multiprocessing.Pool.map
ret = Parallel(n_jobs=2)(
delayed(griddata)(np.asarray(rc_position), np.asarray(arg), (r_mesh, c_mesh))
for arg in [r_dest, c_dest]
)

Args:
args: argument tuple to griddata
inverse_dfield = np.asarray([ret[0], ret[1]])

Returns:
return value of griddata
"""
return griddata(*args)
return inverse_dfield


def load_dfield(file: str) -> Tuple[np.ndarray, np.ndarray]:
Expand Down