-
Notifications
You must be signed in to change notification settings - Fork 23
LAMA to Dask migration: Data.concatenate
#425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c2b5446
6092a28
7ac8ea3
bd341ac
95125db
f923514
708a4bb
846e5be
adbf979
42e0536
ad36b62
dc9592c
6836a71
9f8672b
4b34129
33bcdb7
051238d
4f61852
52f9580
aa5e43f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3401,6 +3401,7 @@ def _set_subspace(self, *args, **kwargs): | |
| raise NotImplementedError("'cf.Data._set_subspace' is unavailable.") | ||
|
|
||
| @classmethod | ||
| @daskified(_DASKIFIED_VERBOSE) | ||
| def concatenate(cls, data, axis=0, _preserve=True): | ||
| """Join a sequence of data arrays together. | ||
|
|
||
|
|
@@ -3418,14 +3419,11 @@ def concatenate(cls, data, axis=0, _preserve=True): | |
| default is 0. Note that scalar arrays are treated as if | ||
| they were one dimensional. | ||
|
|
||
| .. note:: If the axis specified is cyclic, it will become | ||
| non-cyclic in the output. | ||
|
|
||
| _preserve: `bool`, optional | ||
| If False then the time taken to do the concatenation is | ||
| reduced at the expense of changing the input data arrays | ||
| given by the *data* parameter in place and **these in | ||
| place changes will render the input data arrays | ||
| unusable**. Therefore, only set to False if it is 100% | ||
| certain that the input data arrays will not be accessed | ||
| again. By default the input data arrays are preserved. | ||
| Deprecated at version TODODASK. | ||
|
|
||
| :Returns: | ||
|
|
||
|
|
@@ -3468,235 +3466,48 @@ def concatenate(cls, data, axis=0, _preserve=True): | |
| "Can't concatenate: Must provide at least two data arrays" | ||
| ) | ||
|
|
||
| data0 = data[0] | ||
| data = data[1:] | ||
|
|
||
| if _preserve: | ||
| data0 = data0.copy() | ||
| else: | ||
| # If data0 appears more than once in the input data arrays | ||
| # then we need to copy it | ||
| for d in data: | ||
| if d is data0: | ||
| data0 = data0.copy() | ||
| break | ||
| # --- End: if | ||
|
|
||
| # Turn a scalar array into a 1-d array | ||
| ndim = data0._ndim | ||
| if not ndim: | ||
| data0.insert_dimension(inplace=True) | ||
| ndim = 1 | ||
|
|
||
| # ------------------------------------------------------------ | ||
| # Check that the axis, shapes and units of all of the input | ||
| # data arrays are consistent | ||
| # ------------------------------------------------------------ | ||
| if axis < 0: | ||
| axis += ndim | ||
| if not 0 <= axis < ndim: | ||
| raise ValueError( | ||
| "Can't concatenate: Invalid axis specification: Expected " | ||
| "-{0}<=axis<{0}, got axis={1}".format(ndim, axis) | ||
| ) | ||
| data0 = data[0].copy() | ||
|
|
||
| shape0 = data0._shape | ||
| processed_data = [] | ||
| units0 = data0.Units | ||
| axis_p1 = axis + 1 | ||
| for data1 in data: | ||
| shape1 = data1._shape | ||
| if ( | ||
| shape0[axis_p1:] != shape1[axis_p1:] | ||
| or shape0[:axis] != shape1[:axis] | ||
| ): | ||
| raise ValueError( | ||
| "Can't concatenate: All the input array axes except " | ||
| "for the concatenation axis must have the same size" | ||
| ) | ||
| for index, data1 in enumerate(data): | ||
| copied = False # to avoid making two copies in a given case | ||
|
|
||
| # Turn any scalar array into a 1-d array | ||
| if not data1.ndim: | ||
| data1 = data1.copy() | ||
| copied = True | ||
| data1.insert_dimension(inplace=True) | ||
sadielbartholomew marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| # Check and conform, if necessary, the units of all inputs | ||
| if not units0.equivalent(data1.Units): | ||
| raise ValueError( | ||
| "Can't concatenate: All the input arrays must have " | ||
| "equivalent units" | ||
| ) | ||
| # --- End: for | ||
|
|
||
| for i, data1 in enumerate(data): | ||
| if _preserve: | ||
| data1 = data1.copy() | ||
| else: | ||
| # If data1 appears more than once in the input data | ||
| # arrays then we need to copy it | ||
| for d in data[i + 1 :]: | ||
| if d is data1: | ||
| data1 = data1.copy() | ||
| break | ||
| # --- End: if | ||
|
|
||
| # Turn a scalar array into a 1-d array | ||
| if not data1._ndim: | ||
| data1.insert_dimension(inplace=True) | ||
|
|
||
| shape1 = data1._shape | ||
|
|
||
| # ------------------------------------------------------------ | ||
| # 1. Make sure that the internal names of the axes match | ||
| # ------------------------------------------------------------ | ||
| axis_map = {} | ||
| if data1._pmsize < data0._pmsize: | ||
| for axis1, axis0 in zip(data1._axes, data0._axes): | ||
| axis_map[axis1] = axis0 | ||
|
|
||
| data1._change_axis_names(axis_map) | ||
| else: | ||
| for axis1, axis0 in zip(data1._axes, data0._axes): | ||
| axis_map[axis0] = axis1 | ||
|
|
||
| data0._change_axis_names(axis_map) | ||
| # --- End: if | ||
|
|
||
| # ------------------------------------------------------------ | ||
| # Find the internal name of the concatenation axis | ||
| # ------------------------------------------------------------ | ||
| Paxis = data0._axes[axis] | ||
|
|
||
| # ------------------------------------------------------------ | ||
| # 2. Make sure that the aggregating axis is an axis of the | ||
| # partition matrix of both arrays and that the partition | ||
| # matrix axes are the same in both arrays (although, for | ||
| # now, they may have different orders) | ||
| # | ||
| # Note: | ||
| # | ||
| # a) This may involve adding new partition matrix axes to | ||
| # either or both of data0 and data1. | ||
| # | ||
| # b) If the aggregating axis needs to be added it is inserted | ||
| # as the outer (slowest varying) axis to reduce the | ||
| # likelihood of having to (expensively) transpose the | ||
| # partition matrix. | ||
| # ------------------------------------------------------------ | ||
| for f, g in zip((data0, data1), (data1, data0)): | ||
|
|
||
| g_pmaxes = g.partitions.axes | ||
| if Paxis in g_pmaxes: | ||
| g_pmaxes = g_pmaxes[:] | ||
| g_pmaxes.remove(Paxis) | ||
|
|
||
| f_partitions = f.partitions | ||
| f_pmaxes = f_partitions.axes | ||
| for pmaxis in g_pmaxes[::-1] + [Paxis]: | ||
| if pmaxis not in f_pmaxes: | ||
| f_partitions.insert_dimension(pmaxis, inplace=True) | ||
|
|
||
| # if Paxis not in f_partitions.axes: | ||
| # f_partitions.insert_dimension(Paxis, inplace=True) | ||
| # --- End: for | ||
|
|
||
| # ------------------------------------------------------------ | ||
| # 3. Make sure that aggregating axis is the outermost (slowest | ||
| # varying) axis of the partition matrix of data0 | ||
| # ------------------------------------------------------------ | ||
| ipmaxis = data0.partitions.axes.index(Paxis) | ||
| if ipmaxis: | ||
| data0.partitions.swapaxes(ipmaxis, 0, inplace=True) | ||
|
|
||
| # ------------------------------------------------------------ | ||
| # 4. Make sure that the partition matrix axes of data1 are in | ||
| # the same order as those in data0 | ||
| # ------------------------------------------------------------ | ||
| pmaxes1 = data1.partitions.axes | ||
| ipmaxes = [ | ||
| pmaxes1.index(pmaxis) for pmaxis in data0.partitions.axes | ||
| ] | ||
| data1.partitions.transpose(ipmaxes, inplace=True) | ||
|
|
||
| # -------------------------------------------------------- | ||
| # 5. Create new partition boundaries in the partition | ||
| # matrices of data0 and data1 so that their partition | ||
| # arrays may be considered as different slices of a | ||
| # common, larger hyperrectangular partition array. | ||
| # | ||
| # Note: | ||
| # | ||
| # * There is no need to add any boundaries across the | ||
| # concatenation axis. | ||
| # -------------------------------------------------------- | ||
| boundaries0 = data0.partition_boundaries() | ||
| boundaries1 = data1.partition_boundaries() | ||
|
|
||
| for dim in data0.partitions.axes[1:]: | ||
|
|
||
| # Still here? Then see if there are any partition matrix | ||
| # boundaries to be created for this partition dimension | ||
| bounds0 = boundaries0[dim] | ||
| bounds1 = boundaries1[dim] | ||
|
|
||
| symmetric_diff = set(bounds0).symmetric_difference(bounds1) | ||
| if not symmetric_diff: | ||
| # The partition boundaries for this partition | ||
| # dimension are already the same in data0 and data1 | ||
| continue | ||
|
|
||
| # Still here? Then there are some partition boundaries to | ||
| # be created for this partition dimension in data0 and/or | ||
| # data1. | ||
| for f, g, bf, bg in ( | ||
| (data0, data1, bounds0, bounds1), | ||
| (data1, data0, bounds1, bounds0), | ||
| ): | ||
| extra_bounds = [i for i in bg if i in symmetric_diff] | ||
| f.add_partitions(extra_bounds, dim) | ||
| # --- End: for | ||
| # --- End: for | ||
|
|
||
| # ------------------------------------------------------------ | ||
| # 6. Concatenate data0 and data1 partition matrices | ||
| # ------------------------------------------------------------ | ||
| # if data0._flip != data1._flip: | ||
| if data0._flip() != data1._flip(): | ||
| data0._move_flip_to_partitions() | ||
| data1._move_flip_to_partitions() | ||
|
|
||
| matrix0 = data0.partitions.matrix | ||
| matrix1 = data1.partitions.matrix | ||
|
|
||
| new_pmshape = list(matrix0.shape) | ||
| new_pmshape[0] += matrix1.shape[0] | ||
|
|
||
| # Initialise an empty partition matrix with the new shape | ||
| new_matrix = np.empty(new_pmshape, dtype=object) | ||
|
|
||
| # Insert the data0 partition matrix | ||
| new_matrix[: matrix0.shape[0]] = matrix0 | ||
|
|
||
| # Insert the data1 partition matrix | ||
| new_matrix[matrix0.shape[0] :] = matrix1 | ||
|
|
||
| data0.partitions.matrix = new_matrix | ||
|
|
||
| # Update the location map of the partition matrix of data0 | ||
| data0.partitions.set_location_map((Paxis,), (axis,)) | ||
| elif not units0.equals(data1.Units): # conform for consistency | ||
| if not copied: | ||
| data1 = data1.copy() | ||
| data1.Units = units0 | ||
|
|
||
| # ------------------------------------------------------------ | ||
| # 7. Update the size, shape and dtype of data0 | ||
| # ------------------------------------------------------------ | ||
| # original_shape0 = data0._shape | ||
| processed_data.append(data1) | ||
|
|
||
| data0._size += data1._size | ||
| # Get data as dask arrays and apply concatenation operation | ||
| dxs = [] | ||
| for data1 in processed_data: | ||
| dxs.append(data1.to_dask_array()) | ||
|
|
||
| shape0 = list(shape0) | ||
| shape0[axis] += shape1[axis] | ||
| data0._shape = tuple(shape0) | ||
| data0._set_dask(da.concatenate(dxs, axis=axis)) | ||
|
|
||
| dtype0 = data0.dtype | ||
| dtype1 = data1.dtype | ||
| if dtype0 != dtype1: | ||
| data0.dtype = np.result_type(dtype0, dtype1) | ||
| # Manage cyclicity of axes: if join axis was cyclic, it is no longer | ||
| axis = data0._parse_axes(axis)[0] | ||
| if axis in data0.cyclic(): | ||
| logger.warning( | ||
| f"Concatenating along a cyclic axis ({axis}) therefore the " | ||
| f"axis has been set as non-cyclic in the output." | ||
| ) | ||
| data0.cyclic(axes=axis, iscyclic=False) | ||
|
|
||
| # ------------------------------------------------------------ | ||
| # Done | ||
| # ------------------------------------------------------------ | ||
| return data0 | ||
|
|
||
| def _move_flip_to_partitions(self): | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can nuke this LAMA-only method.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. Do you think there are any / many more to nuke, since if there are I would prefer to do it another PR for separation of concerns? Otherwise for one (or maybe two) I can nuke them here as suggested.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know, but |
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.