Skip to content

Commit

Permalink
PARQUET-1799: [C++] Stream API: Relax schema checking when reading
Browse files Browse the repository at this point in the history
The converted type expected by the StreamReader may not match with
that found in the Parquet file schema in the following cases:

C++ type | StreamReader expected converted type | Parquet file converted type
-------------|-------------------------------------------------------|------------------------------------
int32_t     | INT_32         | NONE or DECIMAL
int64_t     | INT_64         | NONE or DECIMAL
std::string | UTF8           | NONE

A set of these exceptions is created to allow for these cases when the
StreamReader and Parquet file schema converted types do not match
exactly.

Closes #6434 from gawain-bolton/PARQUET-1799_stream_api_relax_schema_checking and squashes the following commits:

9fb8935 <gawain.bolton> Updates after review by Antoine Pitrou on 20200217
5756b08 <gawain.bolton> PARQUET-1799:  Stream API: Relax schema checking when reading

Authored-by: gawain.bolton <gawain.bolton@cfm.fr>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
gawain.bolton authored and pitrou committed Feb 19, 2020
1 parent 7c33680 commit 4d82549
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 5 deletions.
36 changes: 31 additions & 5 deletions cpp/src/parquet/stream_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,31 @@

#include "parquet/stream_reader.h"

#include <set>
#include <utility>

namespace parquet {

constexpr int64_t StreamReader::kBatchSizeOne;

// The converted type expected by the stream reader does not always
// exactly match with the schema in the Parquet file. The following
// is a list of converted types which are allowed instead of the
// expected converted type.
// Each pair given is:
// {<StreamReader expected type>, <Parquet file converted type>}
// So for example {ConvertedType::INT_32, ConvertedType::NONE} means
// that if the StreamReader was expecting the converted type INT_32,
// then it will allow the Parquet file to use the converted type
// NONE.
//
static const std::set<std::pair<ConvertedType::type, ConvertedType::type> >
converted_type_exceptions = {{ConvertedType::INT_32, ConvertedType::NONE},
{ConvertedType::INT_64, ConvertedType::NONE},
{ConvertedType::INT_32, ConvertedType::DECIMAL},
{ConvertedType::INT_64, ConvertedType::DECIMAL},
{ConvertedType::UTF8, ConvertedType::NONE}};

StreamReader::StreamReader(std::unique_ptr<ParquetFileReader> reader)
: file_reader_{std::move(reader)}, eof_{false} {
file_metadata_ = file_reader_->metadata();
Expand Down Expand Up @@ -439,6 +458,8 @@ void StreamReader::SkipRowsInColumn(ColumnReader* reader, int64_t num_rows_to_sk
num_skipped = static_cast<DoubleReader*>(reader)->Skip(num_rows_to_skip);
break;
case Type::INT96:
num_skipped = static_cast<Int96Reader*>(reader)->Skip(num_rows_to_skip);
break;
case Type::UNDEFINED:
throw ParquetException("Unexpected type: " + TypeToString(reader->type()));
break;
Expand Down Expand Up @@ -468,18 +489,23 @@ void StreamReader::CheckColumn(Type::type physical_type,
"' not '" + TypeToString(physical_type) + "'");
}
if (converted_type != node->converted_type()) {
throw ParquetException("Column converted type mismatch. Column '" + node->name() +
"' has converted type '" +
ConvertedTypeToString(node->converted_type()) + "' not '" +
ConvertedTypeToString(converted_type) + "'");
// The converted type does not always match with the value
// provided so check the set of exceptions.
if (converted_type_exceptions.find({converted_type, node->converted_type()}) ==
converted_type_exceptions.end()) {
throw ParquetException("Column converted type mismatch. Column '" + node->name() +
"' has converted type '" +
ConvertedTypeToString(node->converted_type()) + "' not '" +
ConvertedTypeToString(converted_type) + "'");
}
}
// Length must be exact.
if (length != node->type_length()) {
throw ParquetException("Column length mismatch. Column '" + node->name() +
"' has length " + std::to_string(node->type_length()) +
"] not " + std::to_string(length));
}
}
} // namespace parquet

void StreamReader::ThrowReadFailedException(
const std::shared_ptr<schema::PrimitiveNode>& node) {
Expand Down
85 changes: 85 additions & 0 deletions cpp/src/parquet/stream_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -827,5 +827,90 @@ TEST_F(TestOptionalFields, ReadOptionalFieldAsRequiredField) {
}
}

class TestReadingDataFiles : public ::testing::Test {
protected:
std::string GetDataFile(const std::string& filename) const {
return std::string(get_data_dir()) + "/" + filename;
}
};

TEST_F(TestReadingDataFiles, AllTypesPlain) {
PARQUET_ASSIGN_OR_THROW(
auto infile, arrow::io::ReadableFile::Open(GetDataFile("alltypes_plain.parquet")));

auto file_reader = ParquetFileReader::Open(infile);
auto reader = StreamReader{std::move(file_reader)};

int32_t c0;
bool c1;
int32_t c2;
int32_t c3;
int32_t c4;
int64_t c5;
float c6;
double c7;
std::string c8;
std::string c9;

const char* expected_date_str[] = {"03/01/09", "03/01/09", "04/01/09", "04/01/09",
"02/01/09", "02/01/09", "01/01/09", "01/01/09"};
int i;

for (i = 0; !reader.eof(); ++i) {
reader >> c0 >> c1 >> c2 >> c3 >> c4 >> c5;
reader >> c6 >> c7;
reader >> c8 >> c9;
reader.SkipColumns(1); // Skip column with unsupported 96-bit type
reader >> EndRow;

EXPECT_EQ(c1, (i & 1) == 0);
EXPECT_EQ(c2, i & 1);
EXPECT_EQ(c3, i & 1);
EXPECT_EQ(c4, i & 1);
EXPECT_EQ(c5, i & 1 ? 10 : 0);
EXPECT_FLOAT_EQ(c6, i & 1 ? 1.1f : 0.f);
EXPECT_DOUBLE_EQ(c7, i & 1 ? 10.1 : 0.);
ASSERT_LT(static_cast<std::size_t>(i),
sizeof(expected_date_str) / sizeof(expected_date_str[0]));
EXPECT_EQ(c8, expected_date_str[i]);
EXPECT_EQ(c9, i & 1 ? "1" : "0");
}
EXPECT_EQ(i, sizeof(expected_date_str) / sizeof(expected_date_str[0]));
}

TEST_F(TestReadingDataFiles, Int32Decimal) {
PARQUET_ASSIGN_OR_THROW(
auto infile, arrow::io::ReadableFile::Open(GetDataFile("int32_decimal.parquet")));

auto file_reader = ParquetFileReader::Open(infile);
auto reader = StreamReader{std::move(file_reader)};

int32_t x;
int i;

for (i = 1; !reader.eof(); ++i) {
reader >> x >> EndRow;
EXPECT_EQ(x, i * 100);
}
EXPECT_EQ(i, 25);
}

TEST_F(TestReadingDataFiles, Int64Decimal) {
PARQUET_ASSIGN_OR_THROW(
auto infile, arrow::io::ReadableFile::Open(GetDataFile("int64_decimal.parquet")));

auto file_reader = ParquetFileReader::Open(infile);
auto reader = StreamReader{std::move(file_reader)};

int64_t x;
int i;

for (i = 1; !reader.eof(); ++i) {
reader >> x >> EndRow;
EXPECT_EQ(x, i * 100);
}
EXPECT_EQ(i, 25);
}

} // namespace test
} // namespace parquet

0 comments on commit 4d82549

Please sign in to comment.