/
element.py
705 lines (539 loc) · 26.1 KB
/
element.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
"""
Collection of either extremely generic or simple ElementOperation
examples.
"""
import numpy as np
import param
from param import _is_number
from ..core import (ElementOperation, NdOverlay, Overlay, GridMatrix,
HoloMap, Dataset, Element, Collator)
from ..core.data import ArrayInterface, DictInterface
from ..core.util import find_minmax, group_sanitizer, label_sanitizer, pd
from ..element.chart import Histogram, Scatter
from ..element.raster import Raster, Image, RGB, QuadMesh
from ..element.path import Contours, Polygons
from ..element.util import categorical_aggregate2d
from ..streams import RangeXY
column_interfaces = [ArrayInterface, DictInterface]
if pd:
from ..core.data import PandasInterface
column_interfaces.append(PandasInterface)
def identity(x,k): return x
class operation(ElementOperation):
"""
The most generic operation that wraps any callable into an
ElementOperation. The callable needs to accept an HoloViews
component and a key (that may be ignored) and must return a new
HoloViews component.
This class may be useful for turning a HoloViews method into an
operation to define as compositor operation. For instance, the
following definition:
operation.instance(op=lambda x, k: x.collapse(np.subtract))
Could be used to implement a collapse operation to subtracts the
data between Rasters in an Overlay.
"""
output_type = param.Parameter(None, doc="""
The output element type which may be None to disable type
checking.
May be used to declare useful information to other code in
HoloViews e.g required for tab-completion support of operations
registered with compositors.""")
group = param.String(default='Operation', doc="""
The group assigned to the result after having applied the
operator.""")
op = param.Callable(default=identity, doc="""
The operation used to generate a new HoloViews object returned
by the operation. By default, the identity operation is
applied.""")
def _process(self, view, key=None):
retval = self.p.op(view, key)
if (self.p.output_type is not None):
assert isinstance(retval, self.p.output_type), \
"Return value does not match the declared output type."
return retval.relabel(group=self.p.group)
class factory(ElementOperation):
"""
Simple operation that constructs any element that accepts some
other element as input. For instance, RGB and HSV elements can be
created from overlays of Image elements.
"""
output_type = param.Parameter(RGB, doc="""
The output type of the factor operation.
By default, if three overlaid Images elements are supplied,
the corresponding RGB element will be returned. """)
def _process(self, view, key=None):
return self.p.output_type(view)
class chain(ElementOperation):
"""
Defining an ElementOperation chain is an easy way to define a new
ElementOperation from a series of existing ones. The argument is a
list of ElementOperation (or ElementOperation instances) that are
called in sequence to generate the returned element.
chain(operations=[gradient, threshold.instance(level=2)])
This operation can accept an Image instance and would first
compute the gradient before thresholding the result at a level of
2.0.
Instances are only required when arguments need to be passed to
individual operations so the resulting object is a function over a
single argument.
"""
output_type = param.Parameter(Image, doc="""
The output type of the chain operation. Must be supplied if
the chain is to be used as a channel operation.""")
group = param.String(default='Chain', doc="""
The group assigned to the result after having applied the chain.""")
operations = param.List(default=[], class_=ElementOperation, doc="""
A list of ElementOperations (or ElementOperation instances)
that are applied on the input from left to right..""")
def _process(self, view, key=None):
processed = view
for operation in self.p.operations:
processed = operation.process_element(processed, key,
input_ranges=self.p.input_ranges)
return processed.clone(group=self.p.group)
class transform(ElementOperation):
"""
Generic ElementOperation to transform an input Image or RGBA
element into an output Image. The transformation is defined by
the supplied callable that accepts the data of the input Image
(typically a numpy array) and returns the transformed data of the
output Image.
This operator is extremely versatile; for instance, you could
implement an alternative to the explict threshold operator with:
operator=lambda x: np.clip(x, 0, 0.5)
Alternatively, you can implement a transform computing the 2D
autocorrelation using the scipy library with:
operator=lambda x: scipy.signal.correlate2d(x, x)
"""
output_type = Image
group = param.String(default='Transform', doc="""
The group assigned to the result after applying the
transform.""")
operator = param.Callable(doc="""
Function of one argument that transforms the data in the input
Image to the data in the output Image. By default, acts as
the identity function such that the output matches the input.""")
def _process(self, matrix, key=None):
processed = (matrix.data if not self.p.operator
else self.p.operator(matrix.data))
return Image(processed, matrix.bounds, group=self.p.group)
class image_overlay(ElementOperation):
"""
Operation to build a overlay of images to a specification from a
subset of the required elements.
This is useful for reordering the elements of an overlay,
duplicating layers of an overlay or creating blank image elements
in the appropriate positions.
For instance, image_overlay may build a three layered input
suitable for the RGB factory operation even if supplied with one
or two of the required channels (creating blank channels for the
missing elements).
Note that if there is any ambiguity regarding the match, the
strongest match will be used. In the case of a tie in match
strength, the first layer in the input is used. One successful
match is always required.
"""
output_type = Overlay
spec = param.String(doc="""
Specification of the output Overlay structure. For instance:
Image.R * Image.G * Image.B
Will ensure an overlay of this structure is created even if
(for instance) only (Image.R * Image.B) is supplied.
Elements in the input overlay that match are placed in the
appropriate positions and unavailable specification elements
are created with the specified fill group.""")
fill = param.Number(default=0)
default_range = param.Tuple(default=(0,1), doc="""
The default range that will be set on the value_dimension of
any automatically created blank image elements.""")
group = param.String(default='Transform', doc="""
The group assigned to the resulting overlay.""")
@classmethod
def _match(cls, el, spec):
"Return the strength of the match (None if no match)"
spec_dict = dict(zip(['type', 'group', 'label'], spec.split('.')))
if not isinstance(el, Image) or spec_dict['type'] != 'Image':
raise NotImplementedError("Only Image currently supported")
sanitizers = {'group':group_sanitizer, 'label':label_sanitizer}
strength = 1
for key in ['group', 'label']:
attr_value = sanitizers[key](getattr(el, key))
if key in spec_dict:
if spec_dict[key] != attr_value: return None
strength += 1
return strength
def _match_overlay(self, raster, overlay_spec):
"""
Given a raster or input overlay, generate a list of matched
elements (None if no match) and corresponding tuple of match
strength values.
"""
ordering = [None]*len(overlay_spec) # Elements to overlay
strengths = [0]*len(overlay_spec) # Match strengths
elements = raster.values() if isinstance(raster, Overlay) else [raster]
for el in elements:
for pos in range(len(overlay_spec)):
strength = self._match(el, overlay_spec[pos])
if strength is None: continue # No match
elif (strength <= strengths[pos]): continue # Weaker match
else: # Stronger match
ordering[pos] = el
strengths[pos] = strength
return ordering, strengths
def _process(self, raster, key=None):
specs = tuple(el.strip() for el in self.p.spec.split('*'))
ordering, strengths = self._match_overlay(raster, specs)
if all(el is None for el in ordering):
raise Exception("The image_overlay operation requires at least one match")
completed = []
strongest = ordering[np.argmax(strengths)]
for el, spec in zip(ordering, specs):
if el is None:
spec_dict = dict(zip(['type', 'group', 'label'], spec.split('.')))
el = Image(np.ones(strongest.data.shape) * self.p.fill,
group=spec_dict.get('group','Image'),
label=spec_dict.get('label',''))
el.vdims[0].range = self.p.default_range
completed.append(el)
return np.prod(completed)
class threshold(ElementOperation):
"""
Threshold a given Image whereby all values higher than a given
level map to the specified high value and all values lower than
that level map to the specified low value.
"""
output_type = Image
level = param.Number(default=0.5, doc="""
The value at which the threshold is applied. Values lower than
the threshold map to the 'low' value and values above map to
the 'high' value.""")
high = param.Number(default=1.0, doc="""
The value given to elements greater than (or equal to) the
threshold.""")
low = param.Number(default=0.0, doc="""
The value given to elements below the threshold.""")
group = param.String(default='Threshold', doc="""
The group assigned to the thresholded output.""")
def _process(self, matrix, key=None):
if not isinstance(matrix, Image):
raise TypeError("The threshold operation requires a Image as input.")
arr = matrix.data
high = np.ones(arr.shape) * self.p.high
low = np.ones(arr.shape) * self.p.low
thresholded = np.where(arr > self.p.level, high, low)
return matrix.clone(thresholded, group=self.p.group)
class gradient(ElementOperation):
"""
Compute the gradient plot of the supplied Image.
If the Image value dimension is cyclic, the smallest step is taken
considered the cyclic range
"""
output_type = Image
group = param.String(default='Gradient', doc="""
The group assigned to the output gradient matrix.""")
def _process(self, matrix, key=None):
if len(matrix.vdims) != 1:
raise ValueError("Input matrix to gradient operation must "
"have single value dimension.")
matrix_dim = matrix.vdims[0]
data = matrix.data
r, c = data.shape
if matrix_dim.cyclic and (None in matrix_dim.range):
raise Exception("Cyclic range must be specified to compute "
"the gradient of cyclic quantities")
cyclic_range = None if not matrix_dim.cyclic else np.diff(matrix_dim.range)
if cyclic_range is not None:
# shift values such that wrapping works ok
data = data - matrix_dim.range[0]
dx = np.diff(data, 1, axis=1)[0:r-1, 0:c-1]
dy = np.diff(data, 1, axis=0)[0:r-1, 0:c-1]
if cyclic_range is not None: # Wrap into the specified range
# Convert negative differences to an equivalent positive value
dx = dx % cyclic_range
dy = dy % cyclic_range
#
# Prefer small jumps
dx_negatives = dx - cyclic_range
dy_negatives = dy - cyclic_range
dx = np.where(np.abs(dx_negatives)<dx, dx_negatives, dx)
dy = np.where(np.abs(dy_negatives)<dy, dy_negatives, dy)
return Image(np.sqrt(dx * dx + dy * dy), matrix.bounds, group=self.p.group)
class convolve(ElementOperation):
"""
Apply a convolution to an overlay using the top layer as the
kernel for convolving the bottom layer. Both Image elements in
the input overlay should have a single value dimension.
"""
output_type = Image
group = param.String(default='Convolution', doc="""
The group assigned to the convolved output.""")
kernel_roi = param.NumericTuple(default=(0,0,0,0), length=4, doc="""
A 2-dimensional slice of the kernel layer to use in the
convolution in lbrt (left, bottom, right, top) format. By
default, no slicing is applied.""")
def _process(self, overlay, key=None):
if len(overlay) != 2:
raise Exception("Overlay must contain at least to items.")
[target, kernel] = overlay.get(0), overlay.get(1)
if len(target.vdims) != 1:
raise Exception("Convolution requires inputs with single value dimensions.")
xslice = slice(self.p.kernel_roi[0], self.p.kernel_roi[2])
yslice = slice(self.p.kernel_roi[1], self.p.kernel_roi[3])
k = kernel.data if self.p.kernel_roi == (0,0,0,0) else kernel[xslice, yslice].data
fft1 = np.fft.fft2(target.data)
fft2 = np.fft.fft2(k, s= target.data.shape)
convolved_raw = np.fft.ifft2(fft1 * fft2).real
k_rows, k_cols = k.shape
rolled = np.roll(np.roll(convolved_raw, -(k_cols//2), axis=-1), -(k_rows//2), axis=-2)
convolved = rolled / float(k.sum())
return Image(convolved, bounds=target.bounds, group=self.p.group)
class contours(ElementOperation):
"""
Given a Image with a single channel, annotate it with contour
lines for a given set of contour levels.
The return is an NdOverlay with a Contours layer for each given
level, overlaid on top of the input Image.
"""
output_type = Overlay
levels = param.NumericTuple(default=(0.5,), doc="""
A list of scalar values used to specify the contour levels.""")
group = param.String(default='Level', doc="""
The group assigned to the output contours.""")
filled = param.Boolean(default=False, doc="""
Whether to generate filled contours""")
overlaid = param.Boolean(default=True, doc="""
Whether to overlay the contour on the supplied Element.""")
def _process(self, element, key=None):
try:
from matplotlib import pyplot as plt
except ImportError:
raise ImportError("contours operation requires matplotlib.")
figure_handle = plt.figure()
extent = element.range(0) + element.range(1)[::-1]
if self.p.filled:
contour_fn = plt.contourf
contour_type = Polygons
else:
contour_fn = plt.contour
contour_type = Contours
if type(element) is Raster:
data = [np.flipud(element.data)]
elif isinstance(element, Raster):
data = [element.data]
elif isinstance(element, QuadMesh):
data = (element.dimension_values(0, False),
element.dimension_values(1, False),
element.data[2])
contour_set = contour_fn(*data, extent=extent,
levels=self.p.levels)
contours = NdOverlay(None, kdims=['Levels'])
for level, cset in zip(self.p.levels, contour_set.collections):
paths = []
for path in cset.get_paths():
paths.extend(np.split(path.vertices, np.where(path.codes==1)[0][1:]))
contours[level] = contour_type(paths, level=level, group=self.p.group,
label=element.label, kdims=element.kdims,
vdims=element.vdims)
plt.close(figure_handle)
if self.p.overlaid:
contours = element * contours
return contours
class histogram(ElementOperation):
"""
Returns a Histogram of the input element data, binned into
num_bins over the bin_range (if specified) along the specified
dimension.
"""
bin_range = param.NumericTuple(default=None, length=2, doc="""
Specifies the range within which to compute the bins.""")
dimension = param.String(default=None, doc="""
Along which dimension of the Element to compute the histogram.""")
individually = param.Boolean(default=True, doc="""
Specifies whether the histogram will be rescaled for each Element in a UniformNdMapping.""")
log = param.Boolean(default=False, doc="""
Whether to use base 10 logarithmic samples for the bin edges.""")
mean_weighted = param.Boolean(default=False, doc="""
Whether the weighted frequencies are averaged.""")
normed = param.Boolean(default=True, doc="""
Whether the histogram frequencies are normalized.""")
nonzero = param.Boolean(default=False, doc="""
Whether to use only nonzero values when computing the histogram""")
num_bins = param.Integer(default=20, doc="""
Number of bins in the histogram .""")
weight_dimension = param.String(default=None, doc="""
Name of the dimension the weighting should be drawn from""")
style_prefix = param.String(default=None, allow_None=None, doc="""
Used for setting a common style for histograms in a HoloMap or AdjointLayout.""")
def _process(self, view, key=None):
if self.p.dimension:
selected_dim = self.p.dimension
else:
selected_dim = [d.name for d in view.vdims + view.kdims][0]
data = np.array(view.dimension_values(selected_dim))
if self.p.nonzero:
mask = data > 0
data = data[mask]
if self.p.weight_dimension:
weights = np.array(view.dimension_values(self.p.weight_dimension))
if self.p.nonzero:
weights = weights[mask]
else:
weights = None
hist_range = find_minmax((np.nanmin(data), np.nanmax(data)), (0, -float('inf')))\
if self.p.bin_range is None else self.p.bin_range
# Avoids range issues including zero bin range and empty bins
if hist_range == (0, 0):
hist_range = (0, 1)
data = data[np.invert(np.isnan(data))]
if self.p.log:
bin_min = max([abs(hist_range[0]), data[data>0].min()])
edges = np.logspace(np.log10(bin_min), np.log10(hist_range[1]),
self.p.num_bins+1)
else:
edges = np.linspace(hist_range[0], hist_range[1], self.p.num_bins + 1)
normed = False if self.p.mean_weighted and self.p.weight_dimension else self.p.normed
try:
hist, edges = np.histogram(data[np.isfinite(data)], normed=normed,
range=hist_range, weights=weights, bins=edges)
if not normed and self.p.weight_dimension and self.p.mean_weighted:
hist_mean, _ = np.histogram(data[np.isfinite(data)], normed=normed,
range=hist_range, bins=self.p.num_bins)
hist /= hist_mean
except:
hist = np.zeros(self.p.num_bins)
hist[np.isnan(hist)] = 0
params = {}
if self.p.weight_dimension:
params['vdims'] = [view.get_dimension(self.p.weight_dimension)]
if view.group != view.__class__.__name__:
params['group'] = view.group
return Histogram(hist, edges, kdims=[view.get_dimension(selected_dim)],
label=view.label, **params)
class decimate(ElementOperation):
"""
Decimates any column based Element to a specified number of random
rows if the current view defined by the x_range and y_range
contains more than max_samples. By default the operation returns a
DynamicMap with a RangeXY stream allowing dynamic downsampling.
"""
dynamic = param.Boolean(default=True, doc="""
Enables dynamic processing by default.""")
max_samples = param.Integer(default=5000, doc="""
Maximum number of samples to display at the same time.""")
random_seed = param.Integer(default=42, doc="""
Seed used to initialize randomization.""")
streams = param.List(default=[RangeXY], doc="""
List of streams that are applied if dynamic=True, allowing
for dynamic interaction with the plot.""")
x_range = param.NumericTuple(default=None, length=2, doc="""
The x_range as a tuple of min and max x-value. Auto-ranges
if set to None.""")
y_range = param.NumericTuple(default=None, length=2, doc="""
The x_range as a tuple of min and max y-value. Auto-ranges
if set to None.""")
def _process(self, element, key=None):
if not isinstance(element, Dataset):
raise ValueError("Cannot downsample non-Dataset types.")
if element.interface not in column_interfaces:
element = plot.current_frame.clone(datatype=['dataframe', 'dictionary'])
xstart, xend = self.p.x_range if self.p.x_range else element.range(0)
ystart, yend = self.p.y_range if self.p.y_range else element.range(1)
# Slice element to current ranges
xdim, ydim = element.dimensions(label=True)[0:2]
sliced = element.select(**{xdim: (xstart, xend),
ydim: (ystart, yend)})
if len(sliced) > self.p.max_samples:
prng = np.random.RandomState(self.p.random_seed)
length = len(sliced)
if element.interface is PandasInterface:
data = sliced.data.sample(self.p.max_samples,
random_state=prng)
else:
inds = prng.choice(length, self.p.max_samples, False)
if isinstance(element.interface, DictInterface):
data = {k: v[inds] for k, v in sliced.data.items()}
else:
data = sliced.data[inds, :]
sliced = element.clone(data)
return sliced
#==================#
# Other operations #
#==================#
class collapse(ElementOperation):
"""
Given an overlay of Element types, collapse into single Element
object using supplied function. Collapsing aggregates over the
key dimensions of each object applying the supplied fn to each group.
This is an example of an ElementOperation that does not involve
any Raster types.
"""
fn = param.Callable(default=np.mean, doc="""
The function that is used to collapse the curve y-values for
each x-value.""")
def _process(self, overlay, key=None):
if isinstance(overlay, NdOverlay):
collapse_map = HoloMap(overlay)
else:
collapse_map = HoloMap({i: el for i, el in enumerate(overlay)})
return collapse_map.collapse(function=self.p.fn)
class gridmatrix(param.ParameterizedFunction):
"""
The gridmatrix operation takes an Element or HoloMap
of Elements as input and creates a GridMatrix object,
which plots each dimension in the Element against
each other dimension. This provides a very useful
overview of high-dimensional data and is inspired
by pandas and seaborn scatter_matrix implementations.
"""
chart_type = param.Parameter(default=Scatter, doc="""
The Element type used to display bivariate distributions
of the data.""")
diagonal_type = param.Parameter(default=Histogram, doc="""
The Element type along the diagonal, may be a Histogram or any
other plot type which can visualize a univariate distribution.""")
overlay_dims = param.List(default=[], doc="""
If a HoloMap is supplied this will allow overlaying one or
more of it's key dimensions.""")
def __call__(self, data, **params):
p = param.ParamOverrides(self, params)
if isinstance(data, (HoloMap, NdOverlay)):
ranges = {d.name: data.range(d) for d in data.dimensions()}
data = data.clone({k: GridMatrix(self._process(p, v, ranges))
for k, v in data.items()})
data = Collator(data, merge_type=type(data))()
if p.overlay_dims:
data = data.map(lambda x: x.overlay(p.overlay_dims), (HoloMap,))
return data
elif isinstance(data, Element):
data = self._process(p, data)
return GridMatrix(data)
def _process(self, p, element, ranges={}):
# Creates a unified Dataset.data attribute
# to draw the data from
if isinstance(element.data, np.ndarray):
if 'dataframe' in Dataset.datatype:
el_data = element.table('dataframe')
else:
el_data = element.table('dictionary')
else:
el_data = element.data
# Get dimensions to plot against each other
dims = [d for d in element.dimensions()
if _is_number(element.range(d)[0])]
permuted_dims = [(d1, d2) for d1 in dims
for d2 in dims[::-1]]
data = {}
for d1, d2 in permuted_dims:
if d1 == d2:
if p.diagonal_type is Histogram:
bin_range = ranges.get(d1.name, element.range(d1))
el = histogram(element, dimension=d1.name, bin_range=bin_range)
el = el(norm=dict(axiswise=True, framewise=True))
else:
values = element.dimension_values(d1)
el = p.diagonal_type(values, vdims=[d1])
else:
el = p.chart_type(el_data, kdims=[d1],
vdims=[d2], datatype=['dataframe', 'dictionary'])
data[(d1.name, d2.name)] = el
return data