-
Notifications
You must be signed in to change notification settings - Fork 3.4k
/
exec.cc
1413 lines (1235 loc) · 47.5 KB
/
exec.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/compute/exec.h"
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <sstream>
#include <utility>
#include <vector>
#include "arrow/array/array_base.h"
#include "arrow/array/array_primitive.h"
#include "arrow/array/data.h"
#include "arrow/array/util.h"
#include "arrow/buffer.h"
#include "arrow/chunked_array.h"
#include "arrow/compute/exec_internal.h"
#include "arrow/compute/function.h"
#include "arrow/compute/function_internal.h"
#include "arrow/compute/kernel.h"
#include "arrow/compute/registry.h"
#include "arrow/datum.h"
#include "arrow/pretty_print.h"
#include "arrow/record_batch.h"
#include "arrow/scalar.h"
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/cpu_info.h"
#include "arrow/util/logging.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/vector.h"
namespace arrow {
using internal::BitmapAnd;
using internal::checked_cast;
using internal::CopyBitmap;
using internal::CpuInfo;
using internal::GetCpuThreadPool;
namespace compute {
ExecContext* default_exec_context() {
static ExecContext default_ctx;
return &default_ctx;
}
ExecContext* threaded_exec_context() {
static ExecContext threaded_ctx(default_memory_pool(), GetCpuThreadPool());
return &threaded_ctx;
}
ExecBatch::ExecBatch(const RecordBatch& batch)
: values(batch.num_columns()), length(batch.num_rows()) {
auto columns = batch.column_data();
std::move(columns.begin(), columns.end(), values.begin());
}
bool ExecBatch::Equals(const ExecBatch& other) const {
return guarantee == other.guarantee && values == other.values;
}
void PrintTo(const ExecBatch& batch, std::ostream* os) {
*os << "ExecBatch\n";
static const std::string indent = " ";
*os << indent << "# Rows: " << batch.length << "\n";
if (batch.guarantee != literal(true)) {
*os << indent << "Guarantee: " << batch.guarantee.ToString() << "\n";
}
int i = 0;
for (const Datum& value : batch.values) {
*os << indent << "" << i++ << ": ";
if (value.is_scalar()) {
*os << "Scalar[" << value.scalar()->ToString() << "]\n";
} else if (value.is_array() || value.is_chunked_array()) {
PrettyPrintOptions options;
options.skip_new_lines = true;
if (value.is_array()) {
auto array = value.make_array();
*os << "Array";
ARROW_CHECK_OK(PrettyPrint(*array, options, os));
} else {
auto array = value.chunked_array();
*os << "Chunked Array";
ARROW_CHECK_OK(PrettyPrint(*array, options, os));
}
*os << "\n";
} else {
ARROW_DCHECK(false);
}
}
}
int64_t ExecBatch::TotalBufferSize() const {
int64_t sum = 0;
for (const auto& value : values) {
sum += value.TotalBufferSize();
}
return sum;
}
std::string ExecBatch::ToString() const {
std::stringstream ss;
PrintTo(*this, &ss);
return ss.str();
}
ExecBatch ExecBatch::Slice(int64_t offset, int64_t length) const {
ExecBatch out = *this;
for (auto& value : out.values) {
if (value.is_scalar()) {
// keep value as is
} else if (value.is_array()) {
value = value.array()->Slice(offset, length);
} else if (value.is_chunked_array()) {
value = value.chunked_array()->Slice(offset, length);
} else {
ARROW_DCHECK(false);
}
}
out.length = std::min(length, this->length - offset);
return out;
}
Result<ExecBatch> ExecBatch::SelectValues(const std::vector<int>& ids) const {
std::vector<Datum> selected_values;
selected_values.reserve(ids.size());
for (int id : ids) {
if (id < 0 || static_cast<size_t>(id) >= values.size()) {
return Status::Invalid("ExecBatch invalid value selection: ", id);
}
selected_values.push_back(values[id]);
}
return ExecBatch(std::move(selected_values), length);
}
namespace {
enum LengthInferenceError {
kEmptyInput = -1,
kInvalidValues = -2,
};
/// \brief Infer the ExecBatch length from values.
///
/// \return the inferred length of the batch. If there are no values in the
/// batch then kEmptyInput (-1) is returned. If the values in the batch have
/// different lengths then kInvalidValues (-2) is returned.
int64_t DoInferLength(const std::vector<Datum>& values) {
if (values.empty()) {
return kEmptyInput;
}
int64_t length = -1;
for (const auto& value : values) {
if (value.is_scalar()) {
continue;
}
if (length == -1) {
length = value.length();
continue;
}
if (length != value.length()) {
// all the arrays should have the same length
return kInvalidValues;
}
}
return length == -1 ? 1 : length;
}
} // namespace
Result<int64_t> ExecBatch::InferLength(const std::vector<Datum>& values) {
const int64_t length = DoInferLength(values);
switch (length) {
case kInvalidValues:
return Status::Invalid(
"Arrays used to construct an ExecBatch must have equal length");
case kEmptyInput:
return Status::Invalid("Cannot infer ExecBatch length without at least one value");
default:
break;
}
return {length};
}
Result<ExecBatch> ExecBatch::Make(std::vector<Datum> values, int64_t length) {
// Infer the length again and/or validate the given length.
const int64_t inferred_length = DoInferLength(values);
switch (inferred_length) {
case kEmptyInput:
if (length < 0) {
return Status::Invalid(
"Cannot infer ExecBatch length without at least one value");
}
break;
case kInvalidValues:
return Status::Invalid(
"Arrays used to construct an ExecBatch must have equal length");
default:
if (length < 0) {
length = inferred_length;
} else if (length != inferred_length) {
return Status::Invalid("Length used to construct an ExecBatch is invalid");
}
break;
}
return ExecBatch(std::move(values), length);
}
Result<std::shared_ptr<RecordBatch>> ExecBatch::ToRecordBatch(
std::shared_ptr<Schema> schema, MemoryPool* pool) const {
if (static_cast<size_t>(schema->num_fields()) > values.size()) {
return Status::Invalid("ExecBatch::ToRecordBatch mismatching schema size");
}
ArrayVector columns(schema->num_fields());
for (size_t i = 0; i < columns.size(); ++i) {
const Datum& value = values[i];
if (value.is_array()) {
columns[i] = value.make_array();
continue;
} else if (value.is_scalar()) {
ARROW_ASSIGN_OR_RAISE(columns[i],
MakeArrayFromScalar(*value.scalar(), length, pool));
} else {
return Status::TypeError("ExecBatch::ToRecordBatch value ", i, " with unsupported ",
"value kind ", ::arrow::ToString(value.kind()));
}
}
return RecordBatch::Make(std::move(schema), length, std::move(columns));
}
namespace {
Result<std::shared_ptr<Buffer>> AllocateDataBuffer(KernelContext* ctx, int64_t length,
int bit_width) {
if (bit_width == 1) {
return ctx->AllocateBitmap(length);
} else {
int64_t buffer_size = bit_util::BytesForBits(length * bit_width);
return ctx->Allocate(buffer_size);
}
}
struct BufferPreallocation {
explicit BufferPreallocation(int bit_width = -1, int added_length = 0)
: bit_width(bit_width), added_length(added_length) {}
int bit_width;
int added_length;
};
void ComputeDataPreallocate(const DataType& type,
std::vector<BufferPreallocation>* widths) {
if (is_fixed_width(type.id()) && type.id() != Type::NA) {
widths->emplace_back(checked_cast<const FixedWidthType&>(type).bit_width());
return;
}
// Preallocate binary and list offsets
switch (type.id()) {
case Type::BINARY:
case Type::STRING:
case Type::LIST:
case Type::MAP:
widths->emplace_back(32, /*added_length=*/1);
return;
case Type::LARGE_BINARY:
case Type::LARGE_STRING:
case Type::LARGE_LIST:
widths->emplace_back(64, /*added_length=*/1);
return;
default:
break;
}
}
} // namespace
namespace detail {
// ----------------------------------------------------------------------
// ExecSpanIterator
namespace {
void PromoteExecSpanScalars(ExecSpan* span) {
// In the "all scalar" case, we "promote" the scalars to ArraySpans of
// length 1, since the kernel implementations do not handle the all
// scalar case
for (int i = 0; i < span->num_values(); ++i) {
ExecValue* value = &span->values[i];
if (value->is_scalar()) {
value->array.FillFromScalar(*value->scalar);
value->scalar = nullptr;
}
}
}
bool CheckIfAllScalar(const ExecBatch& batch) {
for (const Datum& value : batch.values) {
if (!value.is_scalar()) {
DCHECK(value.is_arraylike());
return false;
}
}
return batch.num_values() > 0;
}
} // namespace
Status ExecSpanIterator::Init(const ExecBatch& batch, int64_t max_chunksize,
bool promote_if_all_scalars) {
if (batch.num_values() > 0) {
// Validate arguments
bool all_args_same_length = false;
int64_t inferred_length = InferBatchLength(batch.values, &all_args_same_length);
if (inferred_length != batch.length) {
return Status::Invalid("Value lengths differed from ExecBatch length");
}
if (!all_args_same_length) {
return Status::Invalid("Array arguments must all be the same length");
}
}
args_ = &batch.values;
initialized_ = have_chunked_arrays_ = false;
have_all_scalars_ = CheckIfAllScalar(batch);
promote_if_all_scalars_ = promote_if_all_scalars;
position_ = 0;
length_ = batch.length;
chunk_indexes_.clear();
chunk_indexes_.resize(args_->size(), 0);
value_positions_.clear();
value_positions_.resize(args_->size(), 0);
value_offsets_.clear();
value_offsets_.resize(args_->size(), 0);
max_chunksize_ = std::min(length_, max_chunksize);
return Status::OK();
}
int64_t ExecSpanIterator::GetNextChunkSpan(int64_t iteration_size, ExecSpan* span) {
for (size_t i = 0; i < args_->size() && iteration_size > 0; ++i) {
// If the argument is not a chunked array, it's either a Scalar or Array,
// in which case it doesn't influence the size of this span
if (!args_->at(i).is_chunked_array()) {
continue;
}
const ChunkedArray* arg = args_->at(i).chunked_array().get();
if (arg->num_chunks() == 0) {
iteration_size = 0;
continue;
}
const Array* current_chunk;
while (true) {
current_chunk = arg->chunk(chunk_indexes_[i]).get();
if (value_positions_[i] == current_chunk->length()) {
// Chunk is zero-length, or was exhausted in the previous
// iteration. Move to the next chunk
++chunk_indexes_[i];
current_chunk = arg->chunk(chunk_indexes_[i]).get();
span->values[i].SetArray(*current_chunk->data());
value_positions_[i] = 0;
value_offsets_[i] = current_chunk->offset();
continue;
}
break;
}
iteration_size =
std::min(current_chunk->length() - value_positions_[i], iteration_size);
}
return iteration_size;
}
bool ExecSpanIterator::Next(ExecSpan* span) {
if (!initialized_) {
span->length = 0;
// The first time this is called, we populate the output span with any
// Scalar or Array arguments in the ExecValue struct, and then just
// increment array offsets below. If any arguments are ChunkedArray, then
// the internal ArraySpans will see their members updated during hte
// iteration
span->values.resize(args_->size());
for (size_t i = 0; i < args_->size(); ++i) {
const Datum& arg = (*args_)[i];
if (arg.is_scalar()) {
span->values[i].SetScalar(arg.scalar().get());
} else if (arg.is_array()) {
const ArrayData& arr = *arg.array();
span->values[i].SetArray(arr);
value_offsets_[i] = arr.offset;
} else {
// Populate members from the first chunk
const ChunkedArray& carr = *arg.chunked_array();
if (carr.num_chunks() > 0) {
const ArrayData& arr = *carr.chunk(0)->data();
span->values[i].SetArray(arr);
value_offsets_[i] = arr.offset;
} else {
// Fill as zero-length array
::arrow::internal::FillZeroLengthArray(carr.type().get(),
&span->values[i].array);
span->values[i].scalar = nullptr;
}
have_chunked_arrays_ = true;
}
}
if (have_all_scalars_ && promote_if_all_scalars_) {
PromoteExecSpanScalars(span);
}
initialized_ = true;
} else if (position_ == length_) {
// We've emitted at least one span and we're at the end so we are done
return false;
}
// Determine how large the common contiguous "slice" of all the arguments is
int64_t iteration_size = std::min(length_ - position_, max_chunksize_);
if (have_chunked_arrays_) {
iteration_size = GetNextChunkSpan(iteration_size, span);
}
// Now, adjust the span
span->length = iteration_size;
for (size_t i = 0; i < args_->size(); ++i) {
const Datum& arg = args_->at(i);
if (!arg.is_scalar()) {
ArraySpan* arr = &span->values[i].array;
arr->SetSlice(value_positions_[i] + value_offsets_[i], iteration_size);
value_positions_[i] += iteration_size;
}
}
position_ += iteration_size;
DCHECK_LE(position_, length_);
return true;
}
namespace {
struct NullGeneralization {
enum type { PERHAPS_NULL, ALL_VALID, ALL_NULL };
static type Get(const ExecValue& value) {
const auto dtype_id = value.type()->id();
if (dtype_id == Type::NA) {
return ALL_NULL;
}
if (!arrow::internal::HasValidityBitmap(dtype_id)) {
return ALL_VALID;
}
if (value.is_scalar()) {
return value.scalar->is_valid ? ALL_VALID : ALL_NULL;
} else {
const ArraySpan& arr = value.array;
// Do not count the bits if they haven't been counted already
if ((arr.null_count == 0) || (arr.buffers[0].data == nullptr)) {
return ALL_VALID;
}
if (arr.null_count == arr.length) {
return ALL_NULL;
}
}
return PERHAPS_NULL;
}
static type Get(const Datum& datum) {
// Temporary workaround to help with ARROW-16756
ExecValue value;
if (datum.is_array()) {
value.SetArray(*datum.array());
} else if (datum.is_scalar()) {
value.SetScalar(datum.scalar().get());
} else {
// TODO(wesm): ChunkedArray, I think
return PERHAPS_NULL;
}
return Get(value);
}
};
// Null propagation implementation that deals both with preallocated bitmaps
// and maybe-to-be allocated bitmaps
//
// If the bitmap is preallocated, it MUST be populated (since it might be a
// view of a much larger bitmap). If it isn't preallocated, then we have
// more flexibility.
//
// * If the batch has no nulls, then we do nothing
// * If only a single array has nulls, and its offset is a multiple of 8,
// then we can zero-copy the bitmap into the output
// * Otherwise, we allocate the bitmap and populate it
class NullPropagator {
public:
NullPropagator(KernelContext* ctx, const ExecSpan& batch, ArrayData* output)
: ctx_(ctx), batch_(batch), output_(output) {
for (const ExecValue& value : batch_.values) {
auto null_generalization = NullGeneralization::Get(value);
if (null_generalization == NullGeneralization::ALL_NULL) {
is_all_null_ = true;
}
if (null_generalization != NullGeneralization::ALL_VALID && value.is_array()) {
arrays_with_nulls_.push_back(&value.array);
}
}
if (output->buffers[0] != nullptr) {
bitmap_preallocated_ = true;
bitmap_ = output_->buffers[0]->mutable_data();
}
}
Status EnsureAllocated() {
if (bitmap_preallocated_) {
return Status::OK();
}
ARROW_ASSIGN_OR_RAISE(output_->buffers[0], ctx_->AllocateBitmap(output_->length));
bitmap_ = output_->buffers[0]->mutable_data();
return Status::OK();
}
Status AllNullShortCircuit() {
// OK, the output should be all null
output_->null_count = output_->length;
if (bitmap_preallocated_) {
bit_util::SetBitsTo(bitmap_, output_->offset, output_->length, false);
return Status::OK();
}
// Walk all the values with nulls instead of breaking on the first in case
// we find a bitmap that can be reused in the non-preallocated case
for (const ArraySpan* arr : arrays_with_nulls_) {
if (arr->null_count == arr->length && arr->buffers[0].owner != nullptr) {
// Reuse this all null bitmap
output_->buffers[0] = arr->GetBuffer(0);
return Status::OK();
}
}
RETURN_NOT_OK(EnsureAllocated());
bit_util::SetBitsTo(bitmap_, output_->offset, output_->length, false);
return Status::OK();
}
Status PropagateSingle() {
// One array
const ArraySpan& arr = *arrays_with_nulls_[0];
const uint8_t* arr_bitmap = arr.buffers[0].data;
// Reuse the null count if it's known
output_->null_count = arr.null_count;
if (bitmap_preallocated_) {
CopyBitmap(arr_bitmap, arr.offset, arr.length, bitmap_, output_->offset);
return Status::OK();
}
// Two cases when memory was not pre-allocated:
//
// * Offset is zero: we reuse the bitmap as is
// * Offset is nonzero but a multiple of 8: we can slice the bitmap
// * Offset is not a multiple of 8: we must allocate and use CopyBitmap
//
// Keep in mind that output_->offset is not permitted to be nonzero when
// the bitmap is not preallocated, and that precondition is asserted
// higher in the call stack.
if (arr.offset == 0) {
output_->buffers[0] = arr.GetBuffer(0);
} else if (arr.offset % 8 == 0) {
output_->buffers[0] = SliceBuffer(arr.GetBuffer(0), arr.offset / 8,
bit_util::BytesForBits(arr.length));
} else {
RETURN_NOT_OK(EnsureAllocated());
CopyBitmap(arr_bitmap, arr.offset, arr.length, bitmap_, /*dst_offset=*/0);
}
return Status::OK();
}
Status PropagateMultiple() {
// More than one array. We use BitmapAnd to intersect their bitmaps
// Do not compute the intersection null count until it's needed
RETURN_NOT_OK(EnsureAllocated());
auto Accumulate = [&](const uint8_t* left_data, int64_t left_offset,
const uint8_t* right_data, int64_t right_offset) {
BitmapAnd(left_data, left_offset, right_data, right_offset, output_->length,
output_->offset, bitmap_);
};
DCHECK_GT(arrays_with_nulls_.size(), 1);
// Seed the output bitmap with the & of the first two bitmaps
Accumulate(arrays_with_nulls_[0]->buffers[0].data, arrays_with_nulls_[0]->offset,
arrays_with_nulls_[1]->buffers[0].data, arrays_with_nulls_[1]->offset);
// Accumulate the rest
for (size_t i = 2; i < arrays_with_nulls_.size(); ++i) {
Accumulate(bitmap_, output_->offset, arrays_with_nulls_[i]->buffers[0].data,
arrays_with_nulls_[i]->offset);
}
return Status::OK();
}
Status Execute() {
if (is_all_null_) {
// An all-null value (scalar null or all-null array) gives us a short
// circuit opportunity
return AllNullShortCircuit();
}
// At this point, by construction we know that all of the values in
// arrays_with_nulls_ are arrays that are not all null. So there are a
// few cases:
//
// * No arrays. This is a no-op w/o preallocation but when the bitmap is
// pre-allocated we have to fill it with 1's
// * One array, whose bitmap can be zero-copied (w/o preallocation, and
// when no byte is split) or copied (split byte or w/ preallocation)
// * More than one array, we must compute the intersection of all the
// bitmaps
//
// BUT, if the output offset is nonzero for some reason, we copy into the
// output unconditionally
output_->null_count = kUnknownNullCount;
if (arrays_with_nulls_.empty()) {
// No arrays with nulls case
output_->null_count = 0;
if (bitmap_preallocated_) {
bit_util::SetBitsTo(bitmap_, output_->offset, output_->length, true);
}
return Status::OK();
}
if (arrays_with_nulls_.size() == 1) {
return PropagateSingle();
}
return PropagateMultiple();
}
private:
KernelContext* ctx_;
const ExecSpan& batch_;
std::vector<const ArraySpan*> arrays_with_nulls_;
bool is_all_null_ = false;
ArrayData* output_;
uint8_t* bitmap_;
bool bitmap_preallocated_ = false;
};
std::shared_ptr<ChunkedArray> ToChunkedArray(const std::vector<Datum>& values,
const TypeHolder& type) {
std::vector<std::shared_ptr<Array>> arrays;
arrays.reserve(values.size());
for (const Datum& val : values) {
if (val.length() == 0) {
// Skip empty chunks
continue;
}
arrays.emplace_back(val.make_array());
}
return std::make_shared<ChunkedArray>(std::move(arrays), type.GetSharedPtr());
}
bool HaveChunkedArray(const std::vector<Datum>& values) {
for (const auto& value : values) {
if (value.kind() == Datum::CHUNKED_ARRAY) {
return true;
}
}
return false;
}
template <typename KernelType>
class KernelExecutorImpl : public KernelExecutor {
public:
Status Init(KernelContext* kernel_ctx, KernelInitArgs args) override {
kernel_ctx_ = kernel_ctx;
kernel_ = static_cast<const KernelType*>(args.kernel);
// Resolve the output type for this kernel
ARROW_ASSIGN_OR_RAISE(
output_type_, kernel_->signature->out_type().Resolve(kernel_ctx_, args.inputs));
return Status::OK();
}
protected:
// Prepare an output ArrayData to be written to. If
// Kernel::mem_allocation is not MemAllocation::PREALLOCATE, then no
// data buffers will be set
Result<std::shared_ptr<ArrayData>> PrepareOutput(int64_t length) {
auto out = std::make_shared<ArrayData>(output_type_.GetSharedPtr(), length);
out->buffers.resize(output_num_buffers_);
if (validity_preallocated_) {
ARROW_ASSIGN_OR_RAISE(out->buffers[0], kernel_ctx_->AllocateBitmap(length));
}
if (kernel_->null_handling == NullHandling::OUTPUT_NOT_NULL) {
out->null_count = 0;
}
for (size_t i = 0; i < data_preallocated_.size(); ++i) {
const auto& prealloc = data_preallocated_[i];
if (prealloc.bit_width >= 0) {
ARROW_ASSIGN_OR_RAISE(
out->buffers[i + 1],
AllocateDataBuffer(kernel_ctx_, length + prealloc.added_length,
prealloc.bit_width));
}
}
return out;
}
Status CheckResultType(const Datum& out, const char* function_name) override {
const auto& type = out.type();
if (type != nullptr && !type->Equals(*output_type_.type)) {
return Status::TypeError(
"kernel type result mismatch for function '", function_name, "': declared as ",
output_type_.type->ToString(), ", actual is ", type->ToString());
}
return Status::OK();
}
ExecContext* exec_context() { return kernel_ctx_->exec_context(); }
KernelState* state() { return kernel_ctx_->state(); }
// Not all of these members are used for every executor type
KernelContext* kernel_ctx_;
const KernelType* kernel_;
TypeHolder output_type_;
int output_num_buffers_;
// If true, then memory is preallocated for the validity bitmap with the same
// strategy as the data buffer(s).
bool validity_preallocated_ = false;
// The kernel writes into data buffers preallocated for these bit widths
// (0 indicates no preallocation);
std::vector<BufferPreallocation> data_preallocated_;
};
class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
public:
Status Execute(const ExecBatch& batch, ExecListener* listener) override {
RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize()));
if (batch.length == 0) {
// For zero-length batches, we do nothing except return a zero-length
// array of the correct output type
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> result,
MakeArrayOfNull(output_type_.GetSharedPtr(), /*length=*/0,
exec_context()->memory_pool()));
return EmitResult(result->data(), listener);
}
// If the executor is configured to produce a single large Array output for
// kernels supporting preallocation, then we do so up front and then
// iterate over slices of that large array. Otherwise, we preallocate prior
// to processing each span emitted from the ExecSpanIterator
RETURN_NOT_OK(SetupPreallocation(span_iterator_.length(), batch.values));
// ARROW-16756: Here we have to accommodate the distinct cases
//
// * Fully-preallocated contiguous output
// * Fully-preallocated, non-contiguous kernel output
// * Not-fully-preallocated kernel output: we pass an empty or
// partially-filled ArrayData to the kernel
if (preallocating_all_buffers_) {
return ExecuteSpans(listener);
} else {
return ExecuteNonSpans(listener);
}
}
Datum WrapResults(const std::vector<Datum>& inputs,
const std::vector<Datum>& outputs) override {
// If execution yielded multiple chunks (because large arrays were split
// based on the ExecContext parameters, then the result is a ChunkedArray
if (HaveChunkedArray(inputs) || outputs.size() > 1) {
return ToChunkedArray(outputs, output_type_);
} else {
// Outputs have just one element
return outputs[0];
}
}
protected:
Status EmitResult(std::shared_ptr<ArrayData> out, ExecListener* listener) {
if (span_iterator_.have_all_scalars()) {
// ARROW-16757 We boxed scalar inputs as ArraySpan, so now we have to
// unbox the output as a scalar
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> scalar, MakeArray(out)->GetScalar(0));
return listener->OnResult(std::move(scalar));
} else {
return listener->OnResult(std::move(out));
}
}
Status ExecuteSpans(ExecListener* listener) {
// We put the preallocation in an ArraySpan to be passed to the
// kernel which is expecting to receive that. More
// performance-critical code (e.g. expression evaluation) should
// eventually skip the creation of ArrayData altogether
std::shared_ptr<ArrayData> preallocation;
ExecSpan input;
ExecResult output;
ArraySpan* output_span = output.array_span_mutable();
if (preallocate_contiguous_) {
// Make one big output allocation
ARROW_ASSIGN_OR_RAISE(preallocation, PrepareOutput(span_iterator_.length()));
// Populate and then reuse the ArraySpan inside
output_span->SetMembers(*preallocation);
output_span->offset = 0;
int64_t result_offset = 0;
while (span_iterator_.Next(&input)) {
// Set absolute output span position and length
output_span->SetSlice(result_offset, input.length);
RETURN_NOT_OK(ExecuteSingleSpan(input, &output));
result_offset = span_iterator_.position();
}
// Kernel execution is complete; emit result
return EmitResult(std::move(preallocation), listener);
} else {
// Fully preallocating, but not contiguously
// We preallocate (maybe) only for the output of processing the current
// chunk
while (span_iterator_.Next(&input)) {
ARROW_ASSIGN_OR_RAISE(preallocation, PrepareOutput(input.length));
output_span->SetMembers(*preallocation);
RETURN_NOT_OK(ExecuteSingleSpan(input, &output));
// Emit the result for this chunk
RETURN_NOT_OK(EmitResult(std::move(preallocation), listener));
}
return Status::OK();
}
}
Status ExecuteSingleSpan(const ExecSpan& input, ExecResult* out) {
ArraySpan* result_span = out->array_span_mutable();
if (output_type_.type->id() == Type::NA) {
result_span->null_count = result_span->length;
} else if (kernel_->null_handling == NullHandling::INTERSECTION) {
if (!elide_validity_bitmap_) {
PropagateNullsSpans(input, result_span);
}
} else if (kernel_->null_handling == NullHandling::OUTPUT_NOT_NULL) {
result_span->null_count = 0;
}
RETURN_NOT_OK(kernel_->exec(kernel_ctx_, input, out));
// Output type didn't change
DCHECK(out->is_array_span());
return Status::OK();
}
Status ExecuteNonSpans(ExecListener* listener) {
// ARROW-16756: Kernel is going to allocate some memory and so
// for the time being we pass in an empty or partially-filled
// shared_ptr<ArrayData> or shared_ptr<Scalar> to be populated
// by the kernel.
//
// We will eventually delete the Scalar output path per
// ARROW-16757.
ExecSpan input;
ExecResult output;
while (span_iterator_.Next(&input)) {
ARROW_ASSIGN_OR_RAISE(output.value, PrepareOutput(input.length));
DCHECK(output.is_array_data());
ArrayData* out_arr = output.array_data().get();
if (output_type_.type->id() == Type::NA) {
out_arr->null_count = out_arr->length;
} else if (kernel_->null_handling == NullHandling::INTERSECTION) {
RETURN_NOT_OK(PropagateNulls(kernel_ctx_, input, out_arr));
} else if (kernel_->null_handling == NullHandling::OUTPUT_NOT_NULL) {
out_arr->null_count = 0;
}
RETURN_NOT_OK(kernel_->exec(kernel_ctx_, input, &output));
// Output type didn't change
DCHECK(output.is_array_data());
// Emit a result for each chunk
RETURN_NOT_OK(EmitResult(std::move(output.array_data()), listener));
}
return Status::OK();
}
Status SetupPreallocation(int64_t total_length, const std::vector<Datum>& args) {
output_num_buffers_ = static_cast<int>(output_type_.type->layout().buffers.size());
auto out_type_id = output_type_.type->id();
// Default to no validity pre-allocation for following cases:
// - Output Array is NullArray
// - kernel_->null_handling is COMPUTED_NO_PREALLOCATE or OUTPUT_NOT_NULL
validity_preallocated_ = false;
if (out_type_id != Type::NA) {
if (kernel_->null_handling == NullHandling::COMPUTED_PREALLOCATE) {
// Override the flag if kernel asks for pre-allocation
validity_preallocated_ = true;
} else if (kernel_->null_handling == NullHandling::INTERSECTION) {
elide_validity_bitmap_ = true;
for (const auto& arg : args) {
auto null_gen = NullGeneralization::Get(arg) == NullGeneralization::ALL_VALID;
// If not all valid, this becomes false
elide_validity_bitmap_ = elide_validity_bitmap_ && null_gen;
}
validity_preallocated_ = !elide_validity_bitmap_;
} else if (kernel_->null_handling == NullHandling::OUTPUT_NOT_NULL) {
elide_validity_bitmap_ = true;
}
}
if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) {
data_preallocated_.clear();
ComputeDataPreallocate(*output_type_.type, &data_preallocated_);
}
// Validity bitmap either preallocated or elided, and all data
// buffers allocated. This is basically only true for primitive
// types that are not dictionary-encoded
preallocating_all_buffers_ =
((validity_preallocated_ || elide_validity_bitmap_) &&
data_preallocated_.size() == static_cast<size_t>(output_num_buffers_ - 1) &&
!is_nested(out_type_id) && !is_dictionary(out_type_id));
// TODO(wesm): why was this check ever here? Fixed width binary
// can be 0-width but anything else?
DCHECK(std::all_of(
data_preallocated_.begin(), data_preallocated_.end(),
[](const BufferPreallocation& prealloc) { return prealloc.bit_width >= 0; }));
// Contiguous preallocation only possible on non-nested types if all
// buffers are preallocated. Otherwise, we must go chunk-by-chunk.
//
// Some kernels are also unable to write into sliced outputs, so we respect the
// kernel's attributes.
preallocate_contiguous_ =
(exec_context()->preallocate_contiguous() && kernel_->can_write_into_slices &&
preallocating_all_buffers_);
return Status::OK();
}
// Used to account for the case where we do not preallocate a
// validity bitmap because the inputs are all non-null and we're
// using NullHandling::INTERSECTION to compute the validity bitmap
bool elide_validity_bitmap_ = false;
// All memory is preallocated for output, contiguous and
// non-contiguous
bool preallocating_all_buffers_ = false;
// If true, and the kernel and output type supports preallocation (for both
// the validity and data buffers), then we allocate one big array and then
// iterate through it while executing the kernel in chunks
bool preallocate_contiguous_ = false;
ExecSpanIterator span_iterator_;