From 5d41991a07328a23610bc090afc00445bbbda7d6 Mon Sep 17 00:00:00 2001 From: rettigl Date: Sun, 26 Jan 2025 20:59:34 +0100 Subject: [PATCH 1/4] change dtype of normalized data to that of unnormalized data use sed binning for histogram computation --- src/sed/binning/binning.py | 8 +------- src/sed/core/processor.py | 2 ++ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/sed/binning/binning.py b/src/sed/binning/binning.py index 00103a8f..18e9e7f0 100644 --- a/src/sed/binning/binning.py +++ b/src/sed/binning/binning.py @@ -479,13 +479,7 @@ def normalization_histogram_from_timed_dataframe( Returns: xr.DataArray: Calculated normalization histogram. """ - bins = df[axis].map_partitions( - pd.cut, - bins=bin_centers_to_bin_edges(bin_centers), - ) - - histogram = df[axis].groupby([bins]).count().compute().values * time_unit - # histogram = bin_dataframe(df, axes=[axis], bins=[bin_centers]) * time_unit + histogram = bin_dataframe(df, axes=[axis], bins=[bin_centers]) * time_unit data_array = xr.DataArray( data=histogram, diff --git a/src/sed/core/processor.py b/src/sed/core/processor.py index 2345f206..2e602ec5 100644 --- a/src/sed/core/processor.py +++ b/src/sed/core/processor.py @@ -2283,6 +2283,8 @@ def compute( ) # if the axes are named correctly, xarray figures out the normalization correctly self._normalized = self._binned / self._normalization_histogram + # Set datatype of binned data + self._normalized.data = self._normalized.data.astype(self._binned.data.dtype) self._attributes.add( self._normalization_histogram.values, name="normalization_histogram", From c0053ffe57db6c89677cad745d04eab489295ddd Mon Sep 17 00:00:00 2001 From: rettigl Date: Sun, 26 Jan 2025 21:16:52 +0100 Subject: [PATCH 2/4] add test for dtype --- tests/test_processor.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_processor.py b/tests/test_processor.py index 3cfe9bcf..853cd1c3 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -1008,6 +1008,8 @@ def test_compute_with_normalization() -> None: processor.binned.data, (processor.normalized * processor.normalization_histogram).data, ) + # check dtype + assert processor.normalized.dtype == processor.binned.dtype # bin only second dataframe partition result2 = processor.compute( bins=bins, From ae464dd28f82e295d2643ae79a2f7e68c4e4abfb Mon Sep 17 00:00:00 2001 From: rettigl Date: Mon, 27 Jan 2025 15:39:45 +0100 Subject: [PATCH 3/4] pass config parameters to histogram calculation --- src/sed/binning/binning.py | 4 +++- src/sed/core/processor.py | 12 ++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/sed/binning/binning.py b/src/sed/binning/binning.py index 18e9e7f0..9599d284 100644 --- a/src/sed/binning/binning.py +++ b/src/sed/binning/binning.py @@ -465,6 +465,7 @@ def normalization_histogram_from_timed_dataframe( axis: str, bin_centers: np.ndarray, time_unit: float, + **kwds, ) -> xr.DataArray: """Get a normalization histogram from a timed dataframe. @@ -475,11 +476,12 @@ def normalization_histogram_from_timed_dataframe( histogram. bin_centers (np.ndarray): Bin centers used for binning of the axis. time_unit (float): Time unit the data frame entries are based on. + **kwds: Additional keyword arguments passed to the bin_dataframe function. Returns: xr.DataArray: Calculated normalization histogram. """ - histogram = bin_dataframe(df, axes=[axis], bins=[bin_centers]) * time_unit + histogram = bin_dataframe(df, axes=[axis], bins=[bin_centers], **kwds) * time_unit data_array = xr.DataArray( data=histogram, diff --git a/src/sed/core/processor.py b/src/sed/core/processor.py index 2e602ec5..2da8b0fe 100644 --- a/src/sed/core/processor.py +++ b/src/sed/core/processor.py @@ -2377,6 +2377,12 @@ def get_normalization_histogram( axis, self._binned.coords[axis].values, self._config["dataframe"]["timed_dataframe_unit_time"], + hist_mode=self.config["binning"]["hist_mode"], + mode=self.config["binning"]["mode"], + pbar=self.config["binning"]["pbar"], + n_cores=self.config["core"]["num_cores"], + threads_per_worker=self.config["binning"]["threads_per_worker"], + threadpool_api=self.config["binning"]["threadpool_API"], ) else: self._normalization_histogram = normalization_histogram_from_timed_dataframe( @@ -2384,6 +2390,12 @@ def get_normalization_histogram( axis, self._binned.coords[axis].values, self._config["dataframe"]["timed_dataframe_unit_time"], + hist_mode=self.config["binning"]["hist_mode"], + mode=self.config["binning"]["mode"], + pbar=self.config["binning"]["pbar"], + n_cores=self.config["core"]["num_cores"], + threads_per_worker=self.config["binning"]["threads_per_worker"], + threadpool_api=self.config["binning"]["threadpool_API"], ) return self._normalization_histogram From 6cc76551f021548f6b6ac5dad7fbbc4910ede152 Mon Sep 17 00:00:00 2001 From: rettigl Date: Fri, 7 Feb 2025 19:22:23 +0100 Subject: [PATCH 4/4] select dataframe before binning call --- src/sed/core/processor.py | 59 +++++++++++++++------------------------ 1 file changed, 23 insertions(+), 36 deletions(-) diff --git a/src/sed/core/processor.py b/src/sed/core/processor.py index 2da8b0fe..3ea0eda9 100644 --- a/src/sed/core/processor.py +++ b/src/sed/core/processor.py @@ -2355,48 +2355,35 @@ def get_normalization_histogram( if isinstance(df_partitions, int): df_partitions = list(range(0, min(df_partitions, self._dataframe.npartitions))) + if use_time_stamps or self._timed_dataframe is None: if df_partitions is not None: - self._normalization_histogram = normalization_histogram_from_timestamps( - self._dataframe.partitions[df_partitions], - axis, - self._binned.coords[axis].values, - self._config["dataframe"]["columns"]["timestamp"], - ) + dataframe = self._dataframe.partitions[df_partitions] else: - self._normalization_histogram = normalization_histogram_from_timestamps( - self._dataframe, - axis, - self._binned.coords[axis].values, - self._config["dataframe"]["columns"]["timestamp"], - ) + dataframe = self._dataframe + self._normalization_histogram = normalization_histogram_from_timestamps( + df=dataframe, + axis=axis, + bin_centers=self._binned.coords[axis].values, + time_stamp_column=self._config["dataframe"]["columns"]["timestamp"], + ) else: if df_partitions is not None: - self._normalization_histogram = normalization_histogram_from_timed_dataframe( - self._timed_dataframe.partitions[df_partitions], - axis, - self._binned.coords[axis].values, - self._config["dataframe"]["timed_dataframe_unit_time"], - hist_mode=self.config["binning"]["hist_mode"], - mode=self.config["binning"]["mode"], - pbar=self.config["binning"]["pbar"], - n_cores=self.config["core"]["num_cores"], - threads_per_worker=self.config["binning"]["threads_per_worker"], - threadpool_api=self.config["binning"]["threadpool_API"], - ) + timed_dataframe = self._timed_dataframe.partitions[df_partitions] else: - self._normalization_histogram = normalization_histogram_from_timed_dataframe( - self._timed_dataframe, - axis, - self._binned.coords[axis].values, - self._config["dataframe"]["timed_dataframe_unit_time"], - hist_mode=self.config["binning"]["hist_mode"], - mode=self.config["binning"]["mode"], - pbar=self.config["binning"]["pbar"], - n_cores=self.config["core"]["num_cores"], - threads_per_worker=self.config["binning"]["threads_per_worker"], - threadpool_api=self.config["binning"]["threadpool_API"], - ) + timed_dataframe = self._timed_dataframe + self._normalization_histogram = normalization_histogram_from_timed_dataframe( + df=timed_dataframe, + axis=axis, + bin_centers=self._binned.coords[axis].values, + time_unit=self._config["dataframe"]["timed_dataframe_unit_time"], + hist_mode=self.config["binning"]["hist_mode"], + mode=self.config["binning"]["mode"], + pbar=self.config["binning"]["pbar"], + n_cores=self.config["core"]["num_cores"], + threads_per_worker=self.config["binning"]["threads_per_worker"], + threadpool_api=self.config["binning"]["threadpool_API"], + ) return self._normalization_histogram