Skip to content

Commit

Permalink
ARROW-2255: [C++][Developer][Integration] Serialize custom field/sche…
Browse files Browse the repository at this point in the history
…ma metadata

Note (Re C++ test): if generated json and a converted file differ (only) in the metadata of a field of a nested type, the `VALIDATE` step failed to register that they were not equivalent. This was due to a bug in `Field::Equals`.

Closes #6556 from bkietz/2255-Serialize-schema-and-fiel

Authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
  • Loading branch information
bkietz committed Mar 13, 2020
1 parent bfd5155 commit 963ac4f
Show file tree
Hide file tree
Showing 13 changed files with 422 additions and 252 deletions.
22 changes: 13 additions & 9 deletions cpp/src/arrow/ipc/json_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ using internal::TemporaryDir;

namespace ipc {

bool file_exists(const char* path) {
std::ifstream handle(path);
return handle.good();
}

// Convert JSON file to IPC binary format
static Status ConvertJsonToArrow(const std::string& json_path,
const std::string& arrow_path) {
Expand All @@ -72,7 +67,8 @@ static Status ConvertJsonToArrow(const std::string& json_path,
RETURN_NOT_OK(internal::json::JsonReader::Open(json_buffer, &reader));

if (FLAGS_verbose) {
std::cout << "Found schema:\n" << reader->schema()->ToString() << std::endl;
std::cout << "Found schema:\n"
<< reader->schema()->ToString(/* show_metadata = */ true) << std::endl;
}

std::shared_ptr<RecordBatchWriter> writer;
Expand Down Expand Up @@ -136,9 +132,9 @@ static Status ValidateArrowVsJson(const std::string& arrow_path,
if (!json_schema->Equals(*arrow_schema)) {
std::stringstream ss;
ss << "JSON schema: \n"
<< json_schema->ToString() << "\n"
<< json_schema->ToString(/* show_metadata = */ true) << "\n\n"
<< "Arrow schema: \n"
<< arrow_schema->ToString();
<< arrow_schema->ToString(/* show_metadata = */ true) << "\n";

if (FLAGS_verbose) {
std::cout << ss.str() << std::endl;
Expand Down Expand Up @@ -196,6 +192,8 @@ Status RunCommand(const std::string& json_path, const std::string& arrow_path,
return Status::Invalid("Must specify arrow file name");
}

auto file_exists = [](const char* path) { return std::ifstream(path).good(); };

if (command == "ARROW_TO_JSON") {
if (!file_exists(arrow_path.c_str())) {
return Status::Invalid("Input file does not exist");
Expand Down Expand Up @@ -328,8 +326,14 @@ static const char* JSON_EXAMPLE2 = R"example(
{"type": "VALIDITY", "typeBitWidth": 1},
{"type": "DATA", "typeBitWidth": 32}
]
}
},
"metadata": [
{"key": "converted_from_time32", "value": "true"}
]
}
],
"metadata": [
{"key": "schema_custom_0", "value": "eh"}
]
},
"batches": [
Expand Down
69 changes: 66 additions & 3 deletions cpp/src/arrow/ipc/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "arrow/util/bit_util.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/decimal.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/string.h"
#include "arrow/visitor_inline.h"
Expand Down Expand Up @@ -105,10 +106,32 @@ class SchemaWriter {
RETURN_NOT_OK(VisitField(field));
}
writer_->EndArray();
WriteKeyValueMetadata(schema_.metadata());
writer_->EndObject();
return Status::OK();
}

void WriteKeyValueMetadata(const std::shared_ptr<const KeyValueMetadata>& metadata) {
if (metadata == nullptr || metadata->size() == 0) {
return;
}
writer_->Key("metadata");

writer_->StartArray();
for (int64_t i = 0; i < metadata->size(); ++i) {
writer_->StartObject();

writer_->Key("key");
writer_->String(metadata->key(i).c_str());

writer_->Key("value");
writer_->String(metadata->value(i).c_str());

writer_->EndObject();
}
writer_->EndArray();
}

Status WriteDictionaryMetadata(int64_t id, const DictionaryType& type) {
writer_->Key("dictionary");

Expand Down Expand Up @@ -156,6 +179,7 @@ class SchemaWriter {
RETURN_NOT_OK(WriteChildren(type.children()));
}

WriteKeyValueMetadata(field->metadata());
writer_->EndObject();

return Status::OK();
Expand Down Expand Up @@ -987,6 +1011,39 @@ static Status ParseDictionary(const RjObject& obj, int64_t* id, bool* is_ordered
return GetInteger(json_index_type, index_type);
}

template <typename FieldOrStruct>
static Status GetKeyValueMetadata(const FieldOrStruct& field_or_struct,
std::shared_ptr<KeyValueMetadata>* out) {
out->reset(new KeyValueMetadata);
auto it = field_or_struct.FindMember("metadata");
if (it == field_or_struct.MemberEnd()) {
return Status::OK();
}

if (it->value.IsNull()) {
return Status::OK();
}

if (!it->value.IsArray()) {
return Status::Invalid("Metadata was not a JSON array");
}
const auto& key_value_pairs = it->value.GetArray();

for (auto it = key_value_pairs.Begin(); it != key_value_pairs.End(); ++it) {
if (!it->IsObject()) {
return Status::Invalid("Metadata KeyValue was not a JSON object");
}
const auto& key_value_pair = it->GetObject();

std::string key, value;
RETURN_NOT_OK(GetObjectString(key_value_pair, "key", &key));
RETURN_NOT_OK(GetObjectString(key_value_pair, "value", &value));

(*out)->Append(std::move(key), std::move(value));
}
return Status::OK();
}

static Status GetField(const rj::Value& obj, DictionaryMemo* dictionary_memo,
std::shared_ptr<Field>* field) {
if (!obj.IsObject()) {
Expand All @@ -1010,6 +1067,9 @@ static Status GetField(const rj::Value& obj, DictionaryMemo* dictionary_memo,
RETURN_NOT_OK(GetFieldsFromArray(it_children->value, dictionary_memo, &children));
RETURN_NOT_OK(GetType(it_type->value.GetObject(), children, &type));

std::shared_ptr<KeyValueMetadata> metadata;
RETURN_NOT_OK(GetKeyValueMetadata(json_field, &metadata));

const auto& it_dictionary = json_field.FindMember("dictionary");
if (dictionary_memo != nullptr && it_dictionary != json_field.MemberEnd()) {
// Parse dictionary id in JSON and add dictionary field to the
Expand All @@ -1022,10 +1082,10 @@ static Status GetField(const rj::Value& obj, DictionaryMemo* dictionary_memo,
&is_ordered, &index_type));

type = ::arrow::dictionary(index_type, type, is_ordered);
*field = ::arrow::field(name, type, nullable);
*field = ::arrow::field(name, type, nullable, metadata);
RETURN_NOT_OK(dictionary_memo->AddField(dictionary_id, *field));
} else {
*field = ::arrow::field(name, type, nullable);
*field = ::arrow::field(name, type, nullable, metadata);
}

return Status::OK();
Expand Down Expand Up @@ -1532,13 +1592,16 @@ Status ReadSchema(const rj::Value& json_schema, MemoryPool* pool,
const auto& it_fields = obj_schema.FindMember("fields");
RETURN_NOT_ARRAY("fields", it_fields, obj_schema);

std::shared_ptr<KeyValueMetadata> metadata;
RETURN_NOT_OK(GetKeyValueMetadata(obj_schema, &metadata));

std::vector<std::shared_ptr<Field>> fields;
RETURN_NOT_OK(GetFieldsFromArray(it_fields->value, dictionary_memo, &fields));

// Read the dictionaries (if any) and cache in the memo
RETURN_NOT_OK(ReadDictionaries(json_schema, pool, dictionary_memo));

*schema = ::arrow::schema(fields);
*schema = ::arrow::schema(fields, metadata);
return Status::OK();
}

Expand Down
12 changes: 5 additions & 7 deletions cpp/src/arrow/type.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1098,8 +1098,6 @@ static void AppendMetadataFingerprint(const KeyValueMetadata& metadata,
}
}

static void AppendEmptyMetadataFingerprint(std::stringstream* ss) {}

std::string Field::ComputeFingerprint() const {
const auto& type_fingerprint = type_->fingerprint();
if (type_fingerprint.empty()) {
Expand All @@ -1122,8 +1120,10 @@ std::string Field::ComputeMetadataFingerprint() const {
std::stringstream ss;
if (metadata_) {
AppendMetadataFingerprint(*metadata_, &ss);
} else {
AppendEmptyMetadataFingerprint(&ss);
}
const auto& type_fingerprint = type_->metadata_fingerprint();
if (!type_fingerprint.empty()) {
ss << "+{" << type_->metadata_fingerprint() << "}";
}
return ss.str();
}
Expand All @@ -1146,8 +1146,6 @@ std::string Schema::ComputeMetadataFingerprint() const {
std::stringstream ss;
if (HasMetadata()) {
AppendMetadataFingerprint(*metadata(), &ss);
} else {
AppendEmptyMetadataFingerprint(&ss);
}
ss << "S{";
for (const auto& field : fields()) {
Expand All @@ -1170,7 +1168,7 @@ std::string DataType::ComputeMetadataFingerprint() const {
// Whatever the data type, metadata can only be found on child fields
std::string s;
for (const auto& child : children_) {
s += child->metadata_fingerprint();
s += child->metadata_fingerprint() + ";";
}
return s;
}
Expand Down
56 changes: 56 additions & 0 deletions cpp/src/arrow/type_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,48 @@ TEST_F(TestSchema, TestMetadataConstruction) {
AssertSchemaEqual(schema2, schema3, false);
}

TEST_F(TestSchema, TestNestedMetadataComparison) {
auto item0 = field("item", int32(), true);
auto item1 = field("item", int32(), true, key_value_metadata({{"foo", "baz"}}));

Schema schema0({field("f", list(item0))});
Schema schema1({field("f", list(item1))});

ASSERT_EQ(schema0.fingerprint(), schema1.fingerprint());
ASSERT_NE(schema0.metadata_fingerprint(), schema1.metadata_fingerprint());

AssertSchemaEqual(schema0, schema1, /* check_metadata = */ false);
AssertSchemaNotEqual(schema0, schema1);
}

TEST_F(TestSchema, TestDeeplyNestedMetadataComparison) {
auto item0 = field("item", int32(), true);
auto item1 = field("item", int32(), true, key_value_metadata({{"foo", "baz"}}));

Schema schema0({field("f", list(list(union_({field("struct", struct_({item0}))}))))});
Schema schema1({field("f", list(list(union_({field("struct", struct_({item1}))}))))});

ASSERT_EQ(schema0.fingerprint(), schema1.fingerprint());
ASSERT_NE(schema0.metadata_fingerprint(), schema1.metadata_fingerprint());

AssertSchemaEqual(schema0, schema1, /* check_metadata = */ false);
AssertSchemaNotEqual(schema0, schema1);
}

TEST_F(TestSchema, TestFieldsDifferOnlyInMetadata) {
auto f0 = field("f", utf8(), true, nullptr);
auto f1 = field("f", utf8(), true, key_value_metadata({{"foo", "baz"}}));

Schema schema0({f0, f1});
Schema schema1({f1, f0});

AssertSchemaEqual(schema0, schema1, /* check_metadata = */ false);
AssertSchemaNotEqual(schema0, schema1);

ASSERT_EQ(schema0.fingerprint(), schema1.fingerprint());
ASSERT_NE(schema0.metadata_fingerprint(), schema1.metadata_fingerprint());
}

TEST_F(TestSchema, TestEmptyMetadata) {
// Empty metadata should be equivalent to no metadata at all
auto f1 = field("f1", int32());
Expand Down Expand Up @@ -1320,6 +1362,20 @@ TEST(TestStructType, GetFieldDuplicates) {
ASSERT_EQ(results.size(), 0);
}

TEST(TestStructType, TestFieldsDifferOnlyInMetadata) {
auto f0 = field("f", utf8(), true, nullptr);
auto f1 = field("f", utf8(), true, key_value_metadata({{"foo", "baz"}}));

StructType s0({f0, f1});
StructType s1({f1, f0});

AssertTypeEqual(s0, s1, /* check_metadata = */ false);
AssertTypeNotEqual(s0, s1);

ASSERT_EQ(s0.fingerprint(), s1.fingerprint());
ASSERT_NE(s0.metadata_fingerprint(), s1.metadata_fingerprint());
}

TEST(TestUnionType, Basics) {
auto f0_type = int32();
auto f0 = field("f0", f0_type);
Expand Down
5 changes: 4 additions & 1 deletion dev/archery/archery/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ def _set_default(opt, default):
@archery.command(short_help="Execute protocol and Flight integration tests")
@click.option('--with-all', is_flag=True, default=False,
help=('Include all known languages by default '
' in integration tests'))
'in integration tests'))
@click.option('--random-seed', type=int, default=12345,
help="Seed for PRNG when generating test data")
@click.option('--with-cpp', type=bool, default=False,
Expand Down Expand Up @@ -560,6 +560,9 @@ def integration(with_all=False, random_seed=12345, **args):
from .integration.runner import write_js_test_json, run_all_tests
import numpy as np

# FIXME(bkietz) Include help strings for individual testers.
# For example, CPPTester's ARROW_CPP_EXE_PATH environment variable.

# Make runs involving data generation deterministic
np.random.seed(random_seed)

Expand Down
Loading

0 comments on commit 963ac4f

Please sign in to comment.