@@ -31,6 +31,9 @@ namespace facebook::velox::connector::hive::iceberg {
3131
3232namespace {
3333
34+ constexpr std::string_view kNotClusteredRowsErrorMsg =
35+ " Incoming records violate the writer assumption that records are clustered by spec and \n by partition within each spec. Either cluster the incoming records or switch to fanout writers.\n Encountered records that belong to already closed files:\n " ;
36+
3437#define WRITER_NON_RECLAIMABLE_SECTION_GUARD (index ) \
3538 memory::NonReclaimableSectionGuard nonReclaimableGuard ( \
3639 writerInfo_[(index)]->nonReclaimableSectionHolder.get())
@@ -208,7 +211,10 @@ IcebergDataSink::IcebergDataSink(
208211 insertTableHandle->columnTransforms(),
209212 hiveConfig->isPartitionPathAsLowerCase(
210213 connectorQueryCtx->sessionProperties ()))
211- : nullptr) {
214+ : nullptr),
215+ fanoutEnabled_(
216+ hiveConfig_->fanoutEnabled (connectorQueryCtx_->sessionProperties ())),
217+ currentWriterId_(0 ) {
212218 if (isPartitioned ()) {
213219 partitionData_.resize (maxOpenWriters_);
214220 }
@@ -325,8 +331,6 @@ std::vector<std::string> IcebergDataSink::commitMessage() const {
325331}
326332
327333void IcebergDataSink::splitInputRowsAndEnsureWriters (RowVectorPtr input) {
328- VELOX_CHECK (isPartitioned ());
329-
330334 std::fill (partitionSizes_.begin (), partitionSizes_.end (), 0 );
331335
332336 const auto numRows = partitionIds_.size ();
@@ -339,26 +343,7 @@ void IcebergDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr input) {
339343 if (!partitionData_[index].empty ()) {
340344 continue ;
341345 }
342-
343- std::vector<folly::dynamic> partitionValues (partitionChannels_.size ());
344- auto icebergPartitionIdGenerator =
345- dynamic_cast <const IcebergPartitionIdGenerator*>(
346- partitionIdGenerator_.get ());
347- VELOX_CHECK_NOT_NULL (icebergPartitionIdGenerator);
348- const RowVectorPtr transformedValues =
349- icebergPartitionIdGenerator->partitionValues ();
350- for (auto i = 0 ; i < partitionChannels_.size (); ++i) {
351- auto block = transformedValues->childAt (i);
352- if (block->isNullAt (index)) {
353- partitionValues[i] = nullptr ;
354- } else {
355- DecodedVector decoded (*block);
356- partitionValues[i] = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH (
357- extractPartitionValue, block->typeKind (), &decoded, index);
358- }
359- }
360-
361- partitionData_[index] = partitionValues;
346+ buildPartitionData (index);
362347 }
363348
364349 for (auto i = 0 ; i < partitionSizes_.size (); ++i) {
@@ -369,6 +354,11 @@ void IcebergDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr input) {
369354 }
370355}
371356
357+ void IcebergDataSink::computePartition (const RowVectorPtr& input) {
358+ VELOX_CHECK (isPartitioned ());
359+ partitionIdGenerator_->run (input, partitionIds_);
360+ }
361+
372362void IcebergDataSink::appendData (RowVectorPtr input) {
373363 checkRunning ();
374364 if (!isPartitioned ()) {
@@ -377,22 +367,79 @@ void IcebergDataSink::appendData(RowVectorPtr input) {
377367 return ;
378368 }
379369
380- // Compute partition and bucket numbers.
381- computePartitionAndBucketIds (input);
370+ computePartition (input);
382371
383- splitInputRowsAndEnsureWriters (input);
372+ if (fanoutEnabled_) {
373+ splitInputRowsAndEnsureWriters (input);
384374
385- for (auto index = 0 ; index < writers_.size (); ++index) {
386- const vector_size_t partitionSize = partitionSizes_[index];
387- if (partitionSize == 0 ) {
388- continue ;
375+ for (auto index = 0 ; index < writers_.size (); ++index) {
376+ const vector_size_t partitionSize = partitionSizes_[index];
377+ if (partitionSize == 0 ) {
378+ continue ;
379+ }
380+
381+ const RowVectorPtr writerInput = partitionSize == input->size ()
382+ ? input
383+ : exec::wrap (partitionSize, partitionRows_[index], input);
384+ write (index, writerInput);
385+ }
386+ } else { // Clustered mode.
387+ std::fill (partitionSizes_.begin (), partitionSizes_.end (), 0 );
388+ const auto numRows = input->size ();
389+ uint32_t index = 0 ;
390+ for (auto row = 0 ; row < numRows; ++row) {
391+ auto id = getIcebergWriterId (row);
392+ index = ensureWriter (id);
393+ if (currentWriterId_ != index) {
394+ clusteredWrite (input, currentWriterId_);
395+ closeWriter (currentWriterId_);
396+ completedWriterIds_.insert (currentWriterId_);
397+ VELOX_USER_CHECK_EQ (
398+ completedWriterIds_.count (index),
399+ 0 ,
400+ " {}" ,
401+ kNotClusteredRowsErrorMsg );
402+ currentWriterId_ = index;
403+ }
404+ updatePartitionRows (index, numRows, row);
405+ buildPartitionData (index);
389406 }
407+ clusteredWrite (input, index);
408+ }
409+ }
390410
391- const RowVectorPtr writerInput = partitionSize == input->size ()
392- ? input
393- : exec::wrap (partitionSize, partitionRows_[index], input);
394- write (index, writerInput);
411+ void IcebergDataSink::buildPartitionData (int32_t index) {
412+ std::vector<folly::dynamic> partitionValues (partitionChannels_.size ());
413+ auto icebergPartitionIdGenerator =
414+ dynamic_cast <const IcebergPartitionIdGenerator*>(
415+ partitionIdGenerator_.get ());
416+ VELOX_CHECK_NOT_NULL (icebergPartitionIdGenerator);
417+ const RowVectorPtr transformedValues =
418+ icebergPartitionIdGenerator->partitionValues ();
419+ for (auto i = 0 ; i < partitionChannels_.size (); ++i) {
420+ auto block = transformedValues->childAt (i);
421+ if (block->isNullAt (index)) {
422+ partitionValues[i] = nullptr ;
423+ } else {
424+ DecodedVector decoded (*block);
425+ partitionValues[i] = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH (
426+ extractPartitionValue, block->typeKind (), &decoded, index);
427+ }
395428 }
429+ partitionData_[index] = partitionValues;
430+ }
431+
432+ void IcebergDataSink::clusteredWrite (RowVectorPtr input, int32_t writerIdx) {
433+ if (partitionSizes_[writerIdx] != 0 ) {
434+ VELOX_CHECK_NOT_NULL (partitionRows_[writerIdx]);
435+ partitionRows_[writerIdx]->setSize (
436+ partitionSizes_[writerIdx] * sizeof (vector_size_t ));
437+ }
438+ const vector_size_t partitionSize = partitionSizes_[writerIdx];
439+ const RowVectorPtr writerInput = partitionSize == input->size ()
440+ ? input
441+ : exec::wrap (partitionSize, partitionRows_[writerIdx], input);
442+ write (writerIdx, writerInput);
396443}
397444
398445HiveWriterId IcebergDataSink::getIcebergWriterId (size_t row) const {
@@ -463,9 +510,11 @@ void IcebergDataSink::closeInternal() {
463510
464511 if (state_ == State::kClosed ) {
465512 for (int i = 0 ; i < writers_.size (); ++i) {
466- WRITER_NON_RECLAIMABLE_SECTION_GUARD (i);
467- writers_[i]->close ();
468- dataFileStats_.push_back (writers_[i]->dataFileStats ());
513+ if (writers_[i]) {
514+ WRITER_NON_RECLAIMABLE_SECTION_GUARD (i);
515+ writers_[i]->close ();
516+ dataFileStats_.push_back (writers_[i]->dataFileStats ());
517+ }
469518 }
470519 } else {
471520 for (int i = 0 ; i < writers_.size (); ++i) {
@@ -475,6 +524,63 @@ void IcebergDataSink::closeInternal() {
475524 }
476525}
477526
527+ void IcebergDataSink::closeWriter (int32_t index) {
528+ common::testutil::TestValue::adjust (
529+ " facebook::velox::connector::hive::iceberg::IcebergDataSink::closeWriter" ,
530+ this );
531+
532+ if (writers_[index]) {
533+ WRITER_NON_RECLAIMABLE_SECTION_GUARD (index);
534+ if (sortWrite ()) {
535+ finishWriter (index);
536+ }
537+ writers_[index]->close ();
538+ dataFileStats_.push_back (writers_[index]->dataFileStats ());
539+ writers_[index] = nullptr ;
540+ }
541+ }
542+
543+ bool IcebergDataSink::finishWriter (int32_t index) {
544+ if (!sortWrite ()) {
545+ return true ;
546+ }
547+
548+ if (writers_[index]) {
549+ const uint64_t startTimeMs = getCurrentTimeMs ();
550+ if (!writers_[index]->finish ()) {
551+ return false ;
552+ }
553+ if (getCurrentTimeMs () - startTimeMs > sortWriterFinishTimeSliceLimitMs_) {
554+ return false ;
555+ }
556+ }
557+ return true ;
558+ }
559+
560+ bool IcebergDataSink::finish () {
561+ // Flush is reentry state.
562+ setState (State::kFinishing );
563+
564+ // As for now, only sorted writer needs flush buffered data. For non-sorted
565+ // writer, data is directly written to the underlying file writer.
566+ if (!sortWrite ()) {
567+ return true ;
568+ }
569+
570+ // TODO: we might refactor to move the data sorting logic into hive data sink.
571+ const uint64_t startTimeMs = getCurrentTimeMs ();
572+ for (auto i = 0 ; i < writers_.size (); ++i) {
573+ WRITER_NON_RECLAIMABLE_SECTION_GUARD (i);
574+ if (writers_[i] && !writers_[i]->finish ()) {
575+ return false ;
576+ }
577+ if (getCurrentTimeMs () - startTimeMs > sortWriterFinishTimeSliceLimitMs_) {
578+ return false ;
579+ }
580+ }
581+ return true ;
582+ }
583+
478584std::unique_ptr<facebook::velox::dwio::common::Writer>
479585IcebergDataSink::maybeCreateBucketSortWriter (
480586 std::unique_ptr<facebook::velox::dwio::common::Writer> writer) {
0 commit comments