diff --git a/datashader/compiler.py b/datashader/compiler.py index c7a4690c0..78227f29f 100644 --- a/datashader/compiler.py +++ b/datashader/compiler.py @@ -59,8 +59,26 @@ def compile_components(agg, schema, glyph, *, antialias=False, cuda=False): # List of base reductions (actually computed) bases = list(unique(concat(r._build_bases(cuda) for r in reds))) dshapes = [b.out_dshape(schema, antialias) for b in bases] + + # Information on how to perform second stage aggregation of antialiased lines, + # including whether antialiased lines self-intersect or not as we need a single + # value for this even for a compound reduction. This is by default True, but + # is False if a single constituent reduction requests it. + if antialias: + self_intersect, antialias_stage_2 = make_antialias_stage_2(reds, bases) + if cuda: + import cupy + array_module = cupy + else: + array_module = np + antialias_stage_2 = antialias_stage_2(array_module) + else: + self_intersect = False + antialias_stage_2 = False + # List of tuples of (append, base, input columns, temps) - calls = [_get_call_tuples(b, d, schema, cuda, antialias) for (b, d) in zip(bases, dshapes)] + calls = [_get_call_tuples(b, d, schema, cuda, antialias, self_intersect) + for (b, d) in zip(bases, dshapes)] # List of unique column names needed cols = list(unique(concat(pluck(2, calls)))) # List of temps needed @@ -72,17 +90,6 @@ def compile_components(agg, schema, glyph, *, antialias=False, cuda=False): combine = make_combine(bases, dshapes, temps, antialias) finalize = make_finalize(bases, agg, schema, cuda) - if antialias: - antialias_stage_2 = make_antialias_stage_2(reds, bases) - if cuda: - import cupy - array_module = cupy - else: - array_module = np - antialias_stage_2 = antialias_stage_2(array_module) - else: - antialias_stage_2 = False - return create, info, append, combine, finalize, antialias_stage_2 @@ -96,10 +103,10 @@ def traverse_aggregation(agg): yield agg -def _get_call_tuples(base, dshape, schema, cuda, antialias): +def _get_call_tuples(base, dshape, schema, cuda, antialias, self_intersect): # Comments refer to usage in make_append() return ( - base._build_append(dshape, schema, cuda, antialias), # func + base._build_append(dshape, schema, cuda, antialias, self_intersect), # func (base,), # bases base.inputs, # cols base._build_temps(cuda), # temps @@ -217,6 +224,9 @@ def finalize(bases, cuda=False, **kwargs): def make_antialias_stage_2(reds, bases): # Only called if antialias is True. + + # Prefer a single-stage antialiased aggregation, but if any requested + # reduction requires two stages then force use of two for all reductions. self_intersect = True for red in reds: if red._antialias_requires_2_stages(): @@ -226,4 +236,4 @@ def make_antialias_stage_2(reds, bases): def antialias_stage_2(array_module): return tuple(zip(*concat(b._antialias_stage_2(self_intersect, array_module) for b in bases))) - return antialias_stage_2 + return self_intersect, antialias_stage_2 diff --git a/datashader/core.py b/datashader/core.py index 963511a03..04eb53811 100644 --- a/datashader/core.py +++ b/datashader/core.py @@ -438,7 +438,10 @@ def line(self, source, x=None, y=None, agg=None, axis=0, geometry=None, if isinstance(non_cat_agg, rd.by): non_cat_agg = non_cat_agg.reduction - if not isinstance(non_cat_agg, (rd.any, rd.count, rd.max, rd.min, rd.sum)): + if not isinstance(non_cat_agg, ( + rd.any, rd.count, rd.max, rd.min, rd.sum, rd.summary, rd._sum_zero, + rd.first, rd.last, rd.mean + )): raise NotImplementedError( f"{type(non_cat_agg)} reduction not implemented for antialiased lines") diff --git a/datashader/enums.py b/datashader/enums.py index 5797dcfcb..8aed80596 100644 --- a/datashader/enums.py +++ b/datashader/enums.py @@ -9,3 +9,5 @@ class AntialiasCombination(Enum): SUM_2AGG = 2 MIN = 3 MAX = 4 + FIRST = 5 + LAST = 6 diff --git a/datashader/glyphs/line.py b/datashader/glyphs/line.py index 70c231fc2..47c7f7c23 100644 --- a/datashader/glyphs/line.py +++ b/datashader/glyphs/line.py @@ -5,9 +5,10 @@ from datashader.enums import AntialiasCombination from datashader.glyphs.points import _PointLike, _GeometryLike -from datashader.utils import (isnull, isreal, ngjit, nanmax_in_place, - nanmin_in_place, nansum_in_place, parallel_fill) -from numba import cuda +from datashader.utils import ( + isnull, isreal, nanfirst_in_place, nanlast_in_place, nanmax_in_place, + nanmin_in_place, nansum_in_place, ngjit, parallel_fill) +from numba import cuda, literal_unroll import numba.types as nb_types @@ -31,12 +32,26 @@ def _two_stage_agg(antialias_stage_2): # Not using antialiased lines, doesn't matter what is returned. return False, False - if len(antialias_stage_2[0]) > 1: - raise NotImplementedError("Currently only support single antialiased reductions") - comb = antialias_stage_2[0][0] + # A single combination in (SUM_2AGG, FIRST, LAST, MIN) means that a 2-stage + # aggregation will be used, otherwise use a 1-stage aggregation that is + # faster. + use_2_stage_agg = False + for comb in antialias_stage_2[0]: + if comb in (AntialiasCombination.SUM_2AGG, AntialiasCombination.MIN, + AntialiasCombination.FIRST, AntialiasCombination.LAST): + use_2_stage_agg = True + break + + # Boolean overwrite flag is used in _full_antialias() is True to overwrite + # pixel values (using max of previous and new values) or False for the more + # complicated correction algorithm. Prefer overwrite=True for speed, but + # any SUM_1AGG implies overwrite=False. + overwrite = True + for comb in antialias_stage_2[0]: + if comb == AntialiasCombination.SUM_1AGG: + overwrite = False + break - overwrite = comb != AntialiasCombination.SUM_1AGG - use_2_stage_agg = comb in (AntialiasCombination.SUM_2AGG, AntialiasCombination.MIN) return overwrite, use_2_stage_agg @@ -1037,10 +1052,41 @@ def _combine_in_place(accum_agg, other_agg, antialias_combination): nanmax_in_place(accum_agg, other_agg) elif antialias_combination == AntialiasCombination.MIN: nanmin_in_place(accum_agg, other_agg) + elif antialias_combination == AntialiasCombination.FIRST: + nanfirst_in_place(accum_agg, other_agg) + elif antialias_combination == AntialiasCombination.LAST: + nanlast_in_place(accum_agg, other_agg) else: nansum_in_place(accum_agg, other_agg) +@ngjit +def _aa_stage_2_accumulate(aggs_and_copies, antialias_combinations): + k = 0 + # Numba access to heterogeneous tuples is only permitted using literal_unroll. + for agg_and_copy in literal_unroll(aggs_and_copies): + _combine_in_place(agg_and_copy[1], agg_and_copy[0], antialias_combinations[k]) + k += 1 + + +@ngjit +def _aa_stage_2_clear(aggs_and_copies, antialias_zeroes): + k = 0 + # Numba access to heterogeneous tuples is only permitted using literal_unroll. + for agg_and_copy in literal_unroll(aggs_and_copies): + parallel_fill(agg_and_copy[0], antialias_zeroes[k]) + k += 1 + + +@ngjit +def _aa_stage_2_copy_back(aggs_and_copies): + k = 0 + # Numba access to heterogeneous tuples is only permitted using literal_unroll. + for agg_and_copy in literal_unroll(aggs_and_copies): + agg_and_copy[0][:] = agg_and_copy[1][:] + k += 1 + + def _build_extend_line_axis0_multi(draw_segment, expand_aggs_and_cols, use_2_stage_agg): @ngjit @@ -1085,18 +1131,17 @@ def extend_cpu_antialias_2agg(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, plot_start, antialias_stage_2, *aggs_and_cols): """Aggregate along a line formed by ``xs`` and ``ys``""" n_aggs = len(antialias_stage_2[0]) - aggs = aggs_and_cols[:n_aggs] + aggs_and_accums = tuple((agg, agg.copy()) for agg in aggs_and_cols[:n_aggs]) cpu_antialias_2agg_impl(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, - plot_start, antialias_stage_2, aggs, *aggs_and_cols) + plot_start, antialias_stage_2, aggs_and_accums, *aggs_and_cols) @ngjit - #@expand_aggs_and_cols + @expand_aggs_and_cols def cpu_antialias_2agg_impl(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, - plot_start, antialias_stage_2, aggs, *aggs_and_cols): + plot_start, antialias_stage_2, aggs_and_accums, *aggs_and_cols): antialias = antialias_stage_2 is not None buffer = np.empty(8) if antialias else None antialias_combinations, antialias_zeroes = antialias_stage_2 - n_aggs = len(antialias_combinations) nrows, ncols = xs.shape for j in range(ncols): @@ -1107,18 +1152,13 @@ def cpu_antialias_2agg_impl(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, if ncols == 1: return - if j == 0: - accum_aggs = [aggs[k].copy() for k in range(n_aggs)] - else: - for k in range(n_aggs): - _combine_in_place(accum_aggs[k], aggs[k], antialias_combinations[k]) + _aa_stage_2_accumulate(aggs_and_accums, antialias_combinations) if j < ncols - 1: - for k in range(n_aggs): - parallel_fill(aggs[k], antialias_zeroes[k]) + _aa_stage_2_clear(aggs_and_accums, antialias_zeroes) + + _aa_stage_2_copy_back(aggs_and_accums) - for k in range(n_aggs): - aggs[k][:] = accum_aggs[k][:] @cuda.jit @expand_aggs_and_cols @@ -1181,18 +1221,17 @@ def extend_cpu(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, antialias_stage_2 def extend_cpu_antialias_2agg(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, antialias_stage_2, *aggs_and_cols): n_aggs = len(antialias_stage_2[0]) - aggs = aggs_and_cols[:n_aggs] + aggs_and_accums = tuple((agg, agg.copy()) for agg in aggs_and_cols[:n_aggs]) cpu_antialias_2agg_impl(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, - antialias_stage_2, aggs, *aggs_and_cols) + antialias_stage_2, aggs_and_accums, *aggs_and_cols) @ngjit - #@expand_aggs_and_cols + @expand_aggs_and_cols def cpu_antialias_2agg_impl(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, - antialias_stage_2, aggs, *aggs_and_cols): + antialias_stage_2, aggs_and_accums, *aggs_and_cols): antialias = antialias_stage_2 is not None buffer = np.empty(8) if antialias else None antialias_combinations, antialias_zeroes = antialias_stage_2 - n_aggs = len(antialias_combinations) ncols = xs.shape[1] for i in range(xs.shape[0]): @@ -1203,18 +1242,12 @@ def cpu_antialias_2agg_impl(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, if xs.shape[0] == 1: return - if i == 0: - accum_aggs = [aggs[k].copy() for k in range(n_aggs)] - else: - for k in range(n_aggs): - _combine_in_place(accum_aggs[k], aggs[k], antialias_combinations[k]) + _aa_stage_2_accumulate(aggs_and_accums, antialias_combinations) if i < xs.shape[0] - 1: - for k in range(n_aggs): - parallel_fill(aggs[k], antialias_zeroes[k]) + _aa_stage_2_clear(aggs_and_accums, antialias_zeroes) - for k in range(n_aggs): - aggs[k][:] = accum_aggs[k][:] + _aa_stage_2_copy_back(aggs_and_accums) @cuda.jit @expand_aggs_and_cols @@ -1277,18 +1310,17 @@ def extend_cpu(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, antialias_stage_2 def extend_cpu_antialias_2agg(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, antialias_stage_2, *aggs_and_cols): n_aggs = len(antialias_stage_2[0]) - aggs = aggs_and_cols[:n_aggs] + aggs_and_accums = tuple((agg, agg.copy()) for agg in aggs_and_cols[:n_aggs]) cpu_antialias_2agg_impl(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, - antialias_stage_2, aggs, *aggs_and_cols) + antialias_stage_2, aggs_and_accums, *aggs_and_cols) @ngjit - #@expand_aggs_and_cols + @expand_aggs_and_cols def cpu_antialias_2agg_impl(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, - antialias_stage_2, aggs, *aggs_and_cols): + antialias_stage_2, aggs_and_accums, *aggs_and_cols): antialias = antialias_stage_2 is not None buffer = np.empty(8) if antialias else None antialias_combinations, antialias_zeroes = antialias_stage_2 - n_aggs = len(antialias_combinations) ncols = ys.shape[1] for i in range(ys.shape[0]): @@ -1301,18 +1333,12 @@ def cpu_antialias_2agg_impl(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, if ys.shape[0] == 1: return - if i == 0: - accum_aggs = [aggs[k].copy() for k in range(n_aggs)] - else: - for k in range(n_aggs): - _combine_in_place(accum_aggs[k], aggs[k], antialias_combinations[k]) + _aa_stage_2_accumulate(aggs_and_accums, antialias_combinations) if i < ys.shape[0] - 1: - for k in range(n_aggs): - parallel_fill(aggs[k], antialias_zeroes[k]) + _aa_stage_2_clear(aggs_and_accums, antialias_zeroes) - for k in range(n_aggs): - aggs[k][:] = accum_aggs[k][:] + _aa_stage_2_copy_back(aggs_and_accums) @cuda.jit @expand_aggs_and_cols @@ -1376,18 +1402,17 @@ def extend_cpu(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, antialias_stage_2 def extend_cpu_antialias_2agg(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, antialias_stage_2, *aggs_and_cols): n_aggs = len(antialias_stage_2[0]) - aggs = aggs_and_cols[:n_aggs] + aggs_and_accums = tuple((agg, agg.copy()) for agg in aggs_and_cols[:n_aggs]) cpu_antialias_2agg_impl(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, - antialias_stage_2, aggs, *aggs_and_cols) + antialias_stage_2, aggs_and_accums, *aggs_and_cols) @ngjit - #@expand_aggs_and_cols + @expand_aggs_and_cols def cpu_antialias_2agg_impl(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, - antialias_stage_2, aggs, *aggs_and_cols): + antialias_stage_2, aggs_and_accums, *aggs_and_cols): antialias = antialias_stage_2 is not None buffer = np.empty(8) if antialias else None antialias_combinations, antialias_zeroes = antialias_stage_2 - n_aggs = len(antialias_combinations) ncols = xs.shape[1] for i in range(xs.shape[0]): @@ -1401,18 +1426,12 @@ def cpu_antialias_2agg_impl(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, if xs.shape[0] == 1: return - if i == 0: - accum_aggs = [aggs[k].copy() for k in range(n_aggs)] - else: - for k in range(n_aggs): - _combine_in_place(accum_aggs[k], aggs[k], antialias_combinations[k]) + _aa_stage_2_accumulate(aggs_and_accums, antialias_combinations) if i < xs.shape[0] - 1: - for k in range(n_aggs): - parallel_fill(aggs[k], antialias_zeroes[k]) + _aa_stage_2_clear(aggs_and_accums, antialias_zeroes) - for k in range(n_aggs): - aggs[k][:] = accum_aggs[k][:] + _aa_stage_2_copy_back(aggs_and_accums) @cuda.jit @expand_aggs_and_cols @@ -1517,23 +1536,22 @@ def extend_cpu_antialias_2agg( y_flat = ys.flat_array n_aggs = len(antialias_stage_2[0]) - aggs = aggs_and_cols[:n_aggs] + aggs_and_accums = tuple((agg, agg.copy()) for agg in aggs_and_cols[:n_aggs]) extend_cpu_numba_antialias_2agg( sx, tx, sy, ty, xmin, xmax, ymin, ymax, x_start_i, x_flat, - y_start_i, y_flat, antialias_stage_2, aggs, *aggs_and_cols + y_start_i, y_flat, antialias_stage_2, aggs_and_accums, *aggs_and_cols ) @ngjit - #@expand_aggs_and_cols + @expand_aggs_and_cols def extend_cpu_numba_antialias_2agg( sx, tx, sy, ty, xmin, xmax, ymin, ymax, x_start_i, x_flat, - y_start_i, y_flat, antialias_stage_2, aggs, *aggs_and_cols + y_start_i, y_flat, antialias_stage_2, aggs_and_accums, *aggs_and_cols ): antialias = antialias_stage_2 is not None buffer = np.empty(8) if antialias else None antialias_combinations, antialias_zeroes = antialias_stage_2 - n_aggs = len(antialias_combinations) nrows = len(x_start_i) x_flat_len = len(x_flat) @@ -1582,18 +1600,12 @@ def extend_cpu_numba_antialias_2agg( if nrows == 1: return - if i == 0: - accum_aggs = [aggs[k].copy() for k in range(n_aggs)] - else: - for k in range(n_aggs): - _combine_in_place(accum_aggs[k], aggs[k], antialias_combinations[k]) + _aa_stage_2_accumulate(aggs_and_accums, antialias_combinations) if i < nrows - 1: - for k in range(n_aggs): - parallel_fill(aggs[k], antialias_zeroes[k]) + _aa_stage_2_clear(aggs_and_accums, antialias_zeroes) - for k in range(n_aggs): - aggs[k][:] = accum_aggs[k][:] + _aa_stage_2_copy_back(aggs_and_accums) if use_2_stage_agg: return extend_cpu_antialias_2agg @@ -1720,30 +1732,24 @@ def extend_cpu_antialias_2agg( eligible_inds = np.arange(0, len(geometry), dtype='uint32') n_aggs = len(antialias_stage_2[0]) - aggs = aggs_and_cols[:n_aggs] + aggs_and_accums = tuple((agg, agg.copy()) for agg in aggs_and_cols[:n_aggs]) extend_cpu_numba_antialias_2agg( sx, tx, sy, ty, xmin, xmax, ymin, ymax, values, missing, offsets0, offsets1, eligible_inds, - closed_rings, antialias_stage_2, aggs, *aggs_and_cols + closed_rings, antialias_stage_2, aggs_and_accums, *aggs_and_cols ) @ngjit - #@expand_aggs_and_cols + @expand_aggs_and_cols def extend_cpu_numba_antialias_2agg( sx, tx, sy, ty, xmin, xmax, ymin, ymax, values, missing, offsets0, offsets1, eligible_inds, - closed_rings, antialias_stage_2, aggs, *aggs_and_cols + closed_rings, antialias_stage_2, aggs_and_accums, *aggs_and_cols ): antialias = antialias_stage_2 is not None buffer = np.empty(8) if antialias else None antialias_combinations, antialias_zeroes = antialias_stage_2 - n_aggs = len(antialias_combinations) - - # This uses a different approach to the other line classes because the - # use of eligible_inds means we don't know which is the last polygon - # or how many there are without walking the eligible_inds. - accum_aggs = [aggs[m].copy() for m in range(n_aggs)] for i in eligible_inds: if missing[i]: @@ -1789,12 +1795,10 @@ def extend_cpu_numba_antialias_2agg( segment_start, segment_end, x0, x1, y0, y1, 0.0, 0.0, buffer, *aggs_and_cols) - for k in range(n_aggs): - _combine_in_place(accum_aggs[k], aggs[k], antialias_combinations[k]) - parallel_fill(aggs[k], antialias_zeroes[k]) + _aa_stage_2_accumulate(aggs_and_accums, antialias_combinations) + _aa_stage_2_clear(aggs_and_accums, antialias_zeroes) - for k in range(n_aggs): - aggs[k][:] = accum_aggs[k][:] + _aa_stage_2_copy_back(aggs_and_accums) if use_2_stage_agg: return extend_cpu_antialias_2agg diff --git a/datashader/reductions.py b/datashader/reductions.py index 54664b7d0..4aec1c14c 100644 --- a/datashader/reductions.py +++ b/datashader/reductions.py @@ -279,7 +279,7 @@ def _build_create(self, required_dshape): else: raise NotImplementedError(f"Unexpected dshape {dshape}") - def _build_append(self, dshape, schema, cuda, antialias): + def _build_append(self, dshape, schema, cuda, antialias, self_intersect): if cuda: if antialias and self.column is None: return self._append_no_field_antialias_cuda @@ -360,8 +360,8 @@ def __init__(self, column=None, self_intersect=True): def _antialias_requires_2_stages(self): return not self.self_intersect - def _build_append(self, dshape, schema, cuda, antialias): - if antialias and not self.self_intersect: + def _build_append(self, dshape, schema, cuda, antialias, self_intersect): + if antialias and not self_intersect: # append functions specific to antialiased lines without self_intersect if cuda: if self.column is None: @@ -375,7 +375,7 @@ def _build_append(self, dshape, schema, cuda, antialias): return self._append_antialias_not_self_intersect # Fall back to base class implementation - return super()._build_append(dshape, schema, cuda, antialias) + return super()._build_append(dshape, schema, cuda, antialias, self_intersect) def _hashable_inputs(self): # Reductions with different self_intersect attributes much have different hashes otherwise @@ -560,8 +560,8 @@ def _build_bases(self, cuda=False): return bases return tuple(by(self.categorizer, base) for base in bases) - def _build_append(self, dshape, schema, cuda, antialias): - return self.reduction._build_append(dshape, schema, cuda, antialias) + def _build_append(self, dshape, schema, cuda, antialias, self_intersect): + return self.reduction._build_append(dshape, schema, cuda, antialias, self_intersect) def _build_combine(self, dshape, antialias): return self.reduction._build_combine(dshape, antialias) @@ -689,8 +689,13 @@ class _sum_zero(FloatingReduction): ---------- column : str Name of the column to aggregate over. Column data type must be numeric. - ``NaN`` values in the column are skipped. """ + def _antialias_stage_2(self, self_intersect, array_module): + if self_intersect: + return ((AntialiasCombination.SUM_1AGG, 0),) + else: + return ((AntialiasCombination.SUM_2AGG, 0),) + def _build_create(self, required_dshape): return self._create_float64_zero @@ -699,10 +704,24 @@ def _build_create(self, required_dshape): @ngjit def _append(x, y, agg, field): if not isnull(field): - if isnull(agg[y, x]): - agg[y, x] = field - else: - agg[y, x] += field + # agg[y, x] cannot be null as initialised to zero. + agg[y, x] += field + + @staticmethod + @ngjit + def _append_antialias(x, y, agg, field, aa_factor): + value = field*aa_factor + if not isnull(value): + # agg[y, x] cannot be null as initialised to zero. + agg[y, x] += value + + @staticmethod + @ngjit + def _append_antialias_not_self_intersect(x, y, agg, field, aa_factor): + value = field*aa_factor + if not isnull(value) and value > agg[y, x]: + # agg[y, x] cannot be null as initialised to zero. + agg[y, x] = value # GPU append functions @staticmethod @@ -729,8 +748,8 @@ def __init__(self, column=None, self_intersect=True): def _antialias_requires_2_stages(self): return not self.self_intersect - def _build_append(self, dshape, schema, cuda, antialias): - if antialias and not self.self_intersect: + def _build_append(self, dshape, schema, cuda, antialias, self_intersect): + if antialias and not self_intersect: if cuda: raise NotImplementedError("SelfIntersectingOptionalFieldReduction") else: @@ -739,7 +758,7 @@ def _build_append(self, dshape, schema, cuda, antialias): else: return self._append_antialias_not_self_intersect - return super()._build_append(dshape, schema, cuda, antialias) + return super()._build_append(dshape, schema, cuda, antialias, self_intersect) def _hashable_inputs(self): # Reductions with different self_intersect attributes much have different hashes otherwise @@ -824,11 +843,11 @@ class m2(FloatingReduction): Name of the column to aggregate over. Column data type must be numeric. ``NaN`` values in the column are skipped. """ - def _build_append(self, dshape, schema, cuda, antialias): + def _build_append(self, dshape, schema, cuda, antialias, self_intersect): if cuda: raise ValueError("""\ The 'std' and 'var' reduction operations are not yet supported on the GPU""") - return super(m2, self)._build_append(dshape, schema, cuda, antialias) + return super(m2, self)._build_append(dshape, schema, cuda, antialias, self_intersect) def _build_create(self, required_dshape): return self._create_float64_zero @@ -1032,12 +1051,25 @@ class first(Reduction): def out_dshape(self, in_dshape, antialias): return dshape(Option(ct.float64)) + def _antialias_requires_2_stages(self): + return True + + def _antialias_stage_2(self, self_intersect, array_module): + return ((AntialiasCombination.FIRST, array_module.nan),) + @staticmethod @ngjit def _append(x, y, agg, field): if not isnull(field) and isnull(agg[y, x]): agg[y, x] = field + @staticmethod + @ngjit + def _append_antialias(x, y, agg, field, aa_factor): + value = field*aa_factor + if isnull(agg[y, x]) or value > agg[y, x]: + agg[y, x] = value + @staticmethod def _combine(aggs): raise NotImplementedError("first is not implemented for dask DataFrames") @@ -1065,12 +1097,25 @@ class last(Reduction): def out_dshape(self, in_dshape, antialias): return dshape(Option(ct.float64)) + def _antialias_requires_2_stages(self): + return True + + def _antialias_stage_2(self, self_intersect, array_module): + return ((AntialiasCombination.LAST, array_module.nan),) + @staticmethod @ngjit def _append(x, y, agg, field): if not isnull(field): agg[y, x] = field + @staticmethod + @ngjit + def _append_antialias(x, y, agg, field, aa_factor): + value = field*aa_factor + if isnull(agg[y, x]) or value > agg[y, x]: + agg[y, x] = value + @staticmethod def _combine(aggs): raise NotImplementedError("last is not implemented for dask DataFrames") @@ -1128,6 +1173,15 @@ class summary(Expr): >>> import datashader as ds >>> red = ds.summary(mean_a=ds.mean('a'), sum_b=ds.sum('b')) + + Notes + ----- + A single pass of the source dataset using antialiased lines can either be + performed using a single-stage aggregation (e.g. ``self_intersect=True``) + or two stages (``self_intersect=False``). If a ``summary`` contains a + ``count`` or ``sum`` reduction with ``self_intersect=False``, or any of + ``first``, ``last`` or ``min``, then the antialiased line pass will be + performed in two stages. """ def __init__(self, **kwargs): ks, vs = zip(*sorted(kwargs.items())) diff --git a/datashader/tests/test_pandas.py b/datashader/tests/test_pandas.py index 23aa2a0fc..80b2a5090 100644 --- a/datashader/tests/test_pandas.py +++ b/datashader/tests/test_pandas.py @@ -1872,6 +1872,17 @@ def test_line_antialias(): agg = cvs.line(agg=ds.min("value"), **kwargs) assert_eq_ndarray(agg.data, 3*line_antialias_sol_0, close=True) + agg = cvs.line(agg=ds.first("value"), **kwargs) + assert_eq_ndarray(agg.data, 3*line_antialias_sol_0, close=True) + + agg = cvs.line(agg=ds.last("value"), **kwargs) + assert_eq_ndarray(agg.data, 3*line_antialias_sol_0, close=True) + + agg = cvs.line(agg=ds.mean("value"), **kwargs) + # Sum = 3*count so mean is 3 everywhere that there is any fraction of an antialiased line + sol = np.where(line_antialias_sol_0 > 0, 3.0, np.nan) + assert_eq_ndarray(agg.data, sol, close=True) + # Second line only, doesn't self-intersect kwargs = dict(source=line_antialias_df, x="x1", y="y1", line_width=1) agg = cvs.line(agg=ds.any(), **kwargs) @@ -1895,6 +1906,16 @@ def test_line_antialias(): agg = cvs.line(agg=ds.min("value"), **kwargs) assert_eq_ndarray(agg.data, 3*line_antialias_sol_1, close=True) + agg = cvs.line(agg=ds.first("value"), **kwargs) + assert_eq_ndarray(agg.data, 3*line_antialias_sol_1, close=True) + + agg = cvs.line(agg=ds.last("value"), **kwargs) + assert_eq_ndarray(agg.data, 3*line_antialias_sol_1, close=True) + + agg = cvs.line(agg=ds.mean("value"), **kwargs) + sol_mean = np.where(line_antialias_sol_1 > 0, 3.0, np.nan) + assert_eq_ndarray(agg.data, sol_mean, close=True) + # Both lines. kwargs = dict(source=line_antialias_df, x=["x0", "x1"], y=["y0", "y1"], line_width=1) agg = cvs.line(agg=ds.any(), **kwargs) @@ -1902,26 +1923,104 @@ def test_line_antialias(): assert_eq_ndarray(agg.data, sol_max, close=True) agg = cvs.line(agg=ds.count(self_intersect=False), **kwargs) - sol_sum = nansum(line_antialias_sol_0, line_antialias_sol_1) - assert_eq_ndarray(agg.data, sol_sum, close=True) + sol_count = nansum(line_antialias_sol_0, line_antialias_sol_1) + assert_eq_ndarray(agg.data, sol_count, close=True) agg = cvs.line(agg=ds.count(self_intersect=True), **kwargs) - sol_sum_intersect = nansum(line_antialias_sol_0_intersect, line_antialias_sol_1) - assert_eq_ndarray(agg.data, sol_sum_intersect, close=True) + sol_count_intersect = nansum(line_antialias_sol_0_intersect, line_antialias_sol_1) + assert_eq_ndarray(agg.data, sol_count_intersect, close=True) agg = cvs.line(agg=ds.sum("value", self_intersect=False), **kwargs) - assert_eq_ndarray(agg.data, 3*sol_sum, close=True) + assert_eq_ndarray(agg.data, 3*sol_count, close=True) agg = cvs.line(agg=ds.sum("value", self_intersect=True), **kwargs) - assert_eq_ndarray(agg.data, 3*sol_sum_intersect, close=True) + assert_eq_ndarray(agg.data, 3*sol_count_intersect, close=True) agg = cvs.line(agg=ds.max("value"), **kwargs) assert_eq_ndarray(agg.data, 3*sol_max, close=True) - agg = cvs.line(source=line_antialias_df, x=["x0", "x1"], y=["y0", "y1"], agg=ds.min("value"), line_width=1) + agg = cvs.line(agg=ds.min("value"), **kwargs) sol_min = nanmin(line_antialias_sol_0, line_antialias_sol_1) assert_eq_ndarray(agg.data, 3*sol_min, close=True) + agg = cvs.line(agg=ds.first("value"), **kwargs) + sol_first = 3*np.where(np.isnan(line_antialias_sol_0), line_antialias_sol_1, line_antialias_sol_0) + assert_eq_ndarray(agg.data, sol_first, close=True) + + agg = cvs.line(agg=ds.last("value"), **kwargs) + sol_last = 3*np.where(np.isnan(line_antialias_sol_1), line_antialias_sol_0, line_antialias_sol_1) + assert_eq_ndarray(agg.data, sol_last, close=True) + + agg = cvs.line(agg=ds.mean("value"), **kwargs) + sol_mean = np.where(sol_count>0, 3.0, np.nan) + assert_eq_ndarray(agg.data, sol_mean, close=True) + + +def test_line_antialias_summary(): + kwargs = dict(source=line_antialias_df, x=["x0", "x1"], y=["y0", "y1"], line_width=1) + + x_range = y_range = (-0.1875, 1.1875) + cvs = ds.Canvas(plot_width=11, plot_height=11, x_range=x_range, y_range=y_range) + + # Precalculate expected solutions + sol_count = nansum(line_antialias_sol_0, line_antialias_sol_1) + sol_count_intersect = nansum(line_antialias_sol_0_intersect, line_antialias_sol_1) + sol_min = 3*nanmin(line_antialias_sol_0, line_antialias_sol_1) + sol_first = 3*np.where(np.isnan(line_antialias_sol_0), line_antialias_sol_1, line_antialias_sol_0) + sol_last = 3*np.where(np.isnan(line_antialias_sol_1), line_antialias_sol_0, line_antialias_sol_1) + + # Summary of count and sum using self_intersect=True + agg = cvs.line( + agg=ds.summary( + count=ds.count("value", self_intersect=True), + sum=ds.sum("value", self_intersect=True), + ), **kwargs) + assert_eq_ndarray(agg["count"].data, sol_count_intersect, close=True) + assert_eq_ndarray(agg["sum"].data, 3*sol_count_intersect, close=True) + + # Summary of count and sum using self_intersect=False + agg = cvs.line( + agg=ds.summary( + count=ds.count("value", self_intersect=False), + sum=ds.sum("value", self_intersect=False), + ), **kwargs) + assert_eq_ndarray(agg["count"].data, sol_count, close=True) + assert_eq_ndarray(agg["sum"].data, 3*sol_count, close=True) + + # Summary of count/sum with mix of self_intersect will force self_intersect=False for both + agg = cvs.line( + agg=ds.summary( + count=ds.count("value", self_intersect=True), + sum=ds.sum("value", self_intersect=False), + ), **kwargs) + assert_eq_ndarray(agg["count"].data, sol_count, close=True) + assert_eq_ndarray(agg["sum"].data, 3*sol_count, close=True) + + # min, first and last also force use of self_intersect=False + agg = cvs.line( + agg=ds.summary( + count=ds.count("value", self_intersect=True), + min=ds.min("value"), + ), **kwargs) + assert_eq_ndarray(agg["count"].data, sol_count, close=True) + assert_eq_ndarray(agg["min"].data, sol_min, close=True) + + agg = cvs.line( + agg=ds.summary( + count=ds.count("value", self_intersect=True), + first=ds.first("value"), + ), **kwargs) + assert_eq_ndarray(agg["count"].data, sol_count, close=True) + assert_eq_ndarray(agg["first"].data, sol_first, close=True) + + agg = cvs.line( + agg=ds.summary( + count=ds.count("value", self_intersect=True), + last=ds.last("value"), + ), **kwargs) + assert_eq_ndarray(agg["count"].data, sol_count, close=True) + assert_eq_ndarray(agg["last"].data, sol_last, close=True) + line_antialias_nan_sol_intersect = np.array([ [0.085786, 0.5, 0.085786, nan, nan, nan, nan, nan, nan, 0.085786, 0.5, 0.085786], @@ -2085,8 +2184,13 @@ def test_line_antialias_duplicate_points(self_intersect): @pytest.mark.parametrize('reduction', [ - ds.mean('value'), ds.std('value'), ds.summary(c=ds.count()), ds.var('value'), - ds.first('value'), ds.last('value')]) + #ds.mean('value'), + ds.std('value'), + ds.var('value'), + #ds.summary(c=ds.count()), ds.sum('value'), + #ds.first('value'), + #ds.last('value'), +]) def test_line_antialias_reduction_not_implemented(reduction): # Issue #1133, detect and report reductions that are not implemented. cvs = ds.Canvas(plot_width=10, plot_height=10) diff --git a/datashader/utils.py b/datashader/utils.py index f13b27036..d5a4c2b3b 100644 --- a/datashader/utils.py +++ b/datashader/utils.py @@ -584,6 +584,30 @@ def isnull(val): return not (val <= 0 or val > 0) +@ngjit_parallel +def nanfirst_in_place(ret, other): + """First of 2 arrays but taking nans into account. + Return the first array. + """ + ret = ret.ravel() + other = other.ravel() + for i in nb.prange(len(ret)): + if isnull(ret[i]) and not isnull(other[i]): + ret[i] = other[i] + + +@ngjit_parallel +def nanlast_in_place(ret, other): + """Last of 2 arrays but taking nans into account. + Return the first array. + """ + ret = ret.ravel() + other = other.ravel() + for i in nb.prange(len(ret)): + if not isnull(other[i]): + ret[i] = other[i] + + @ngjit_parallel def nanmax_in_place(ret, other): """Max of 2 arrays but taking nans into account. Could use np.nanmax but