Skip to content

Commit

Permalink
Add support to read varbinary column from Parquet fixed length byte a…
Browse files Browse the repository at this point in the history
…rray (facebookincubator#9887)

Summary:
Resolves: facebookincubator#9757

Pull Request resolved: facebookincubator#9887

Reviewed By: Yuhta, kgpai

Differential Revision: D57776408

Pulled By: mbasmanova

fbshipit-source-id: 9a282b68be810b1b99391105157b0777db7e568f
  • Loading branch information
majetideepak authored and Joe-Abraham committed Jun 7, 2024
1 parent c4ae3fd commit 660eef2
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 11 deletions.
18 changes: 12 additions & 6 deletions velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -619,12 +619,18 @@ void PageReader::makeDecoder() {
pageData_, pageData_ + encodedDataSize_);
break;
case thrift::Type::FIXED_LEN_BYTE_ARRAY:
directDecoder_ = std::make_unique<dwio::common::DirectDecoder<true>>(
std::make_unique<dwio::common::SeekableArrayInputStream>(
pageData_, encodedDataSize_),
false,
type_->typeLength_,
true);
if (type_->type()->isVarbinary()) {
stringDecoder_ = std::make_unique<StringDecoder>(
pageData_, pageData_ + encodedDataSize_, type_->typeLength_);
} else {
directDecoder_ =
std::make_unique<dwio::common::DirectDecoder<true>>(
std::make_unique<dwio::common::SeekableArrayInputStream>(
pageData_, encodedDataSize_),
false,
type_->typeLength_,
true);
}
break;
default: {
directDecoder_ = std::make_unique<dwio::common::DirectDecoder<true>>(
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ TypePtr ReaderBase::convertType(
case thrift::Type::type::INT64:
return BIGINT();
case thrift::Type::type::INT96:
return DOUBLE(); // TODO: Lose precision
return TIMESTAMP(); // INT96 only maps to a timestamp
case thrift::Type::type::FLOAT:
return REAL();
case thrift::Type::type::DOUBLE:
Expand Down
16 changes: 12 additions & 4 deletions velox/dwio/parquet/reader/StringDecoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ namespace facebook::velox::parquet {

class StringDecoder {
public:
StringDecoder(const char* start, const char* end)
StringDecoder(const char* start, const char* end, int fixedLength = -1)
: bufferStart_(start),
bufferEnd_(end),

lastSafeWord_(end - simd::kPadding) {}
lastSafeWord_(end - simd::kPadding),
fixedLength_(fixedLength) {}

void skip(uint64_t numValues) {
skip<false>(numValues, 0, nullptr);
Expand Down Expand Up @@ -62,7 +62,8 @@ class StringDecoder {
}

// We are at a non-null value on a row to visit.
toSkip = visitor.process(readString(), atEnd);
toSkip = visitor.process(
fixedLength_ > 0 ? readFixedString() : readString(), atEnd);
}
++current;
if (toSkip) {
Expand All @@ -85,9 +86,16 @@ class StringDecoder {
bufferStart_ += length + sizeof(int32_t);
return folly::StringPiece(bufferStart_ - length, length);
}

folly::StringPiece readFixedString() {
bufferStart_ += fixedLength_;
return folly::StringPiece(bufferStart_ - fixedLength_, fixedLength_);
}

const char* bufferStart_;
const char* bufferEnd_;
const char* const lastSafeWord_;
const int fixedLength_;
};

} // namespace facebook::velox::parquet
Binary file not shown.
28 changes: 28 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1128,3 +1128,31 @@ TEST_F(ParquetReaderTest, testEnumType) {

assertReadWithReaderAndExpected(fileSchema, *rowReader, expected, *leafPool_);
}

TEST_F(ParquetReaderTest, readVarbinaryFromFLBA) {
const std::string filename("varbinary_flba.parquet");
const std::string sample(getExampleFilePath(filename));

facebook::velox::dwio::common::ReaderOptions readerOptions{leafPool_.get()};
auto reader = createReader(sample, readerOptions);

auto type = reader->typeWithId();
EXPECT_EQ(type->size(), 8ULL);
auto flbaCol =
std::static_pointer_cast<const ParquetTypeWithId>(type->childAt(6));
EXPECT_EQ(flbaCol->name_, "flba_field");
EXPECT_EQ(flbaCol->parquetType_, thrift::Type::FIXED_LEN_BYTE_ARRAY);

auto selectedType = ROW({"flba_field"}, {VARBINARY()});
auto rowReaderOpts = getReaderOpts(selectedType);
rowReaderOpts.setScanSpec(makeScanSpec(selectedType));
auto rowReader = reader->createRowReader(rowReaderOpts);

auto expected = std::string(1024, '*');
VectorPtr result = BaseVector::create(selectedType, 0, &(*leafPool_));
rowReader->next(1, result);
EXPECT_EQ(
expected,
result->as<RowVector>()->childAt(0)->asFlatVector<StringView>()->valueAt(
0));
}

0 comments on commit 660eef2

Please sign in to comment.