From e2150c1901c22041e3323f90a1e57be4e5fc875a Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Fri, 20 Oct 2017 21:13:16 -0400 Subject: [PATCH 1/3] Remove page id from Buffer metadata, increment metadata version number Change-Id: I731fa4c54a597fa48eefe88fb47755077b159114 --- cpp/src/arrow/ipc/ipc-read-write-test.cc | 2 +- cpp/src/arrow/ipc/message.cc | 15 +---------- cpp/src/arrow/ipc/message.h | 14 ++++++++++- cpp/src/arrow/ipc/metadata-internal.cc | 25 +++++++++++++++++-- cpp/src/arrow/ipc/metadata-internal.h | 10 ++++---- cpp/src/arrow/ipc/reader.cc | 15 +---------- cpp/src/arrow/ipc/writer.cc | 12 +-------- format/Schema.fbs | 14 +++++++---- .../arrow/vector/schema/ArrowBuffer.java | 16 +++--------- .../arrow/vector/schema/ArrowRecordBatch.java | 2 +- js/src/format/Schema_generated.ts | 20 +++------------ 11 files changed, 62 insertions(+), 83 deletions(-) diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc index adf34a9eb54..6f2f5cf8560 100644 --- a/cpp/src/arrow/ipc/ipc-read-write-test.cc +++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc @@ -243,7 +243,7 @@ TEST_F(TestIpcRoundTrip, MetadataVersion) { std::unique_ptr message; ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message)); - ASSERT_EQ(MetadataVersion::V3, message->metadata_version()); + ASSERT_EQ(MetadataVersion::V4, message->metadata_version()); } TEST_P(TestIpcRoundTrip, SliceRoundTrip) { diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index 0dd5c72e519..21d6a69a286 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -67,20 +67,7 @@ class Message::MessageImpl { } MetadataVersion version() const { - switch (message_->version()) { - case flatbuf::MetadataVersion_V1: - // Arrow 0.1 - return MetadataVersion::V1; - case flatbuf::MetadataVersion_V2: - // Arrow 0.2 - return MetadataVersion::V2; - case flatbuf::MetadataVersion_V3: - // Arrow >= 0.3 - return MetadataVersion::V3; - // Add cases as other versions become available - default: - return MetadataVersion::V3; - } + return internal::GetMetadataVersion(message_->version()); } const void* header() const { return message_->header(); } diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h index 67a95c7d247..ba9dece4cff 100644 --- a/cpp/src/arrow/ipc/message.h +++ b/cpp/src/arrow/ipc/message.h @@ -42,7 +42,19 @@ class RandomAccessFile; namespace ipc { -enum class MetadataVersion : char { V1, V2, V3 }; +enum class MetadataVersion : char { + /// 0.1.0 + V1, + + /// 0.2.0 + V2, + + /// 0.3.0 to 0.7.1 + V3, + + /// >= 0.8.0 + V4 +}; // ARROW-109: We set this number arbitrarily to help catch user mistakes. For // deeply nested schemas, it is expected the user will indicate explicitly the diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc index ad00cfb6c09..f04e9b05a01 100644 --- a/cpp/src/arrow/ipc/metadata-internal.cc +++ b/cpp/src/arrow/ipc/metadata-internal.cc @@ -33,6 +33,7 @@ #include "arrow/ipc/Message_generated.h" #include "arrow/ipc/Tensor_generated.h" #include "arrow/ipc/dictionary.h" +#include "arrow/ipc/message.h" #include "arrow/ipc/util.h" #include "arrow/status.h" #include "arrow/tensor.h" @@ -57,6 +58,26 @@ using VectorLayoutOffset = flatbuffers::Offset; using Offset = flatbuffers::Offset; using FBString = flatbuffers::Offset; +MetadataVersion GetMetadataVersion(flatbuf::MetadataVersion version) { + switch (version) { + case flatbuf::MetadataVersion_V1: + // Arrow 0.1 + return MetadataVersion::V1; + case flatbuf::MetadataVersion_V2: + // Arrow 0.2 + return MetadataVersion::V2; + case flatbuf::MetadataVersion_V3: + // Arrow 0.3 to 0.7.1 + return MetadataVersion::V4; + case flatbuf::MetadataVersion_V4: + // Arrow >= 0.8 + return MetadataVersion::V4; + // Add cases as other versions become available + default: + return MetadataVersion::V4; + } +} + static Status IntFromFlatbuffer(const flatbuf::Int* int_data, std::shared_ptr* out) { if (int_data->bitWidth() > 64) { @@ -700,7 +721,7 @@ static Status WriteBuffers(FBB& fbb, const std::vector& buffers, for (size_t i = 0; i < buffers.size(); ++i) { const BufferMetadata& buffer = buffers[i]; - fb_buffers.emplace_back(buffer.page, buffer.offset, buffer.length); + fb_buffers.emplace_back(buffer.offset, buffer.length); } *out = fbb.CreateVectorOfStructs(fb_buffers); return Status::OK(); @@ -751,7 +772,7 @@ Status WriteTensorMessage(const Tensor& tensor, int64_t buffer_start_offset, auto fb_shape = fbb.CreateVector(dims); auto fb_strides = fbb.CreateVector(tensor.strides()); int64_t body_length = tensor.data()->size(); - flatbuf::Buffer buffer(-1, buffer_start_offset, body_length); + flatbuf::Buffer buffer(buffer_start_offset, body_length); TensorOffset fb_tensor = flatbuf::CreateTensor(fbb, fb_type_type, fb_type, fb_shape, fb_strides, &buffer); diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h index 309e7587a75..380f3c9eb10 100644 --- a/cpp/src/arrow/ipc/metadata-internal.h +++ b/cpp/src/arrow/ipc/metadata-internal.h @@ -27,6 +27,7 @@ #include "arrow/ipc/Schema_generated.h" #include "arrow/ipc/dictionary.h" +#include "arrow/ipc/message.h" namespace arrow { @@ -48,10 +49,12 @@ namespace ipc { namespace internal { static constexpr flatbuf::MetadataVersion kCurrentMetadataVersion = - flatbuf::MetadataVersion_V3; + flatbuf::MetadataVersion_V4; static constexpr flatbuf::MetadataVersion kMinMetadataVersion = - flatbuf::MetadataVersion_V3; + flatbuf::MetadataVersion_V4; + +MetadataVersion GetMetadataVersion(flatbuf::MetadataVersion version); static constexpr const char* kArrowMagicBytes = "ARROW1"; @@ -62,9 +65,6 @@ struct FieldMetadata { }; struct BufferMetadata { - /// The shared memory page id where to find this. Set to -1 if unused - int32_t page; - /// The relative offset into the memory page to the starting byte of the buffer int64_t offset; diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 50eb9039c6a..8e10d7d66f9 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -550,20 +550,7 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl { int num_record_batches() const { return footer_->recordBatches()->size(); } MetadataVersion version() const { - switch (footer_->version()) { - case flatbuf::MetadataVersion_V1: - // Arrow 0.1 - return MetadataVersion::V1; - case flatbuf::MetadataVersion_V2: - // Arrow 0.2 - return MetadataVersion::V2; - case flatbuf::MetadataVersion_V3: - // Arrow 0.3 - return MetadataVersion::V3; - // Add cases as other versions become available - default: - return MetadataVersion::V3; - } + return internal::GetMetadataVersion(footer_->version()); } FileBlock record_batch(int i) const { diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 279a69544fa..5598cc68296 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -149,8 +149,6 @@ class RecordBatchSerializer : public ArrayVisitor { buffer_meta_.reserve(buffers_.size()); - const int32_t kNoPageId = -1; - // Construct the buffer metadata for the record batch header for (size_t i = 0; i < buffers_.size(); ++i) { const Buffer* buffer = buffers_[i].get(); @@ -163,15 +161,7 @@ class RecordBatchSerializer : public ArrayVisitor { padding = BitUtil::RoundUpToMultipleOf8(size) - size; } - // TODO(wesm): We currently have no notion of shared memory page id's, - // but we've included it in the metadata IDL for when we have it in the - // future. Use page = -1 for now - // - // Note that page ids are a bespoke notion for Arrow and not a feature we - // are using from any OS-level shared memory. The thought is that systems - // may (in the future) associate integer page id's with physical memory - // pages (according to whatever is the desired shared memory mechanism) - buffer_meta_.push_back({kNoPageId, offset, size + padding}); + buffer_meta_.push_back({offset, size + padding}); offset += size + padding; } diff --git a/format/Schema.fbs b/format/Schema.fbs index 186f8e362bd..6021e92b847 100644 --- a/format/Schema.fbs +++ b/format/Schema.fbs @@ -20,9 +20,17 @@ namespace org.apache.arrow.flatbuf; enum MetadataVersion:short { + /// 0.1.0 V1, + + /// 0.2.0 V2, - V3 + + /// 0.3.0 -> 0.7.1 + V3, + + /// >= 0.8.0 + V4 } /// These are stored in the flatbuffer in the Type union below @@ -293,10 +301,6 @@ enum Endianness:short { Little, Big } /// ---------------------------------------------------------------------- /// A Buffer represents a single contiguous memory segment struct Buffer { - /// The shared memory page id where this buffer is located. Currently this is - /// not used - page: int; - /// The relative offset into the shared memory page where the bytes for this /// buffer starts offset: long; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java index d8c9e3001d0..4e0187e791b 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java @@ -24,21 +24,15 @@ public class ArrowBuffer implements FBSerializable { - private int page; private long offset; private long size; - public ArrowBuffer(int page, long offset, long size) { + public ArrowBuffer(long offset, long size) { super(); - this.page = page; this.offset = offset; this.size = size; } - public int getPage() { - return page; - } - public long getOffset() { return offset; } @@ -52,7 +46,6 @@ public int hashCode() { final int prime = 31; int result = 1; result = prime * result + (int) (offset ^ (offset >>> 32)); - result = prime * result + page; result = prime * result + (int) (size ^ (size >>> 32)); return result; } @@ -72,9 +65,6 @@ public boolean equals(Object obj) { if (offset != other.offset) { return false; } - if (page != other.page) { - return false; - } if (size != other.size) { return false; } @@ -83,12 +73,12 @@ public boolean equals(Object obj) { @Override public int writeTo(FlatBufferBuilder builder) { - return Buffer.createBuffer(builder, page, offset, size); + return Buffer.createBuffer(builder, offset, size); } @Override public String toString() { - return "ArrowBuffer [page=" + page + ", offset=" + offset + ", size=" + size + "]"; + return "ArrowBuffer [offset=" + offset + ", size=" + size + "]"; } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java index c842d4c3f9a..bf0967a2797 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java @@ -72,7 +72,7 @@ public ArrowRecordBatch(int length, List nodes, List b for (ArrowBuf arrowBuf : buffers) { arrowBuf.retain(); long size = arrowBuf.readableBytes(); - arrowBuffers.add(new ArrowBuffer(0, offset, size)); + arrowBuffers.add(new ArrowBuffer(offset, size)); LOGGER.debug(String.format("Buffer in RecordBatch at %d, length: %d", offset, size)); offset += size; if (alignBuffers && offset % 8 != 0) { // align on 8 byte boundaries diff --git a/js/src/format/Schema_generated.ts b/js/src/format/Schema_generated.ts index 65493b7f685..c5b3e5011d7 100644 --- a/js/src/format/Schema_generated.ts +++ b/js/src/format/Schema_generated.ts @@ -2027,16 +2027,6 @@ export namespace org.apache.arrow.flatbuf { return this; } - /** - * The shared memory page id where this buffer is located. Currently this is - * not used - * - * @returns {number} - */ - page(): number { - return this.bb.readInt32(this.bb_pos); - } - /** * The relative offset into the shared memory page where the bytes for this * buffer starts @@ -2044,7 +2034,7 @@ export namespace org.apache.arrow.flatbuf { * @returns {flatbuffers.Long} */ offset(): flatbuffers.Long { - return this.bb.readInt64(this.bb_pos + 8); + return this.bb.readInt64(this.bb_pos); } /** @@ -2054,7 +2044,7 @@ export namespace org.apache.arrow.flatbuf { * @returns {flatbuffers.Long} */ length(): flatbuffers.Long { - return this.bb.readInt64(this.bb_pos + 16); + return this.bb.readInt64(this.bb_pos + 8); } /** @@ -2064,12 +2054,10 @@ export namespace org.apache.arrow.flatbuf { * @param {flatbuffers.Long} length * @returns {flatbuffers.Offset} */ - static createBuffer(builder: flatbuffers.Builder, page: number, offset: flatbuffers.Long, length: flatbuffers.Long): flatbuffers.Offset { - builder.prep(8, 24); + static createBuffer(builder: flatbuffers.Builder, offset: flatbuffers.Long, length: flatbuffers.Long): flatbuffers.Offset { + builder.prep(8, 16); builder.writeInt64(length); builder.writeInt64(offset); - builder.pad(4); - builder.writeInt32(page); return builder.offset(); } From 845f290f1cb87ceb210fb8c69955634d42fcbcc6 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Fri, 20 Oct 2017 21:20:18 -0400 Subject: [PATCH 2/3] Bump metadata version in Java, add check for V4 Change-Id: Ie41bf47bcc81f5f27dc441988aa19fc94e7bb047 --- .../org/apache/arrow/vector/stream/MessageSerializer.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java index f69aa41e7f6..c397cec72f0 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java @@ -385,6 +385,10 @@ public static ArrowMessage deserializeMessageBatch(ReadChannel in, BufferAllocat throw new IOException("Cannot currently deserialize record batches over 2GB"); } + if (message.version() != MetadataVersion.V4) { + throw new IOException("Received metadata with an incompatible version number"); + } + switch (message.headerType()) { case MessageHeader.RecordBatch: return deserializeRecordBatch(in, message, alloc); @@ -409,7 +413,7 @@ public static ByteBuffer serializeMessage(FlatBufferBuilder builder, byte header Message.startMessage(builder); Message.addHeaderType(builder, headerType); Message.addHeader(builder, headerOffset); - Message.addVersion(builder, MetadataVersion.V3); + Message.addVersion(builder, MetadataVersion.V4); Message.addBodyLength(builder, bodyLength); builder.finish(Message.endMessage(builder)); return builder.dataBuffer(); From 582fad901e7a15e98cf53a182dc446f47920bea4 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 23 Oct 2017 18:00:57 -0400 Subject: [PATCH 3/3] Disable JS in Travis CI for now Change-Id: I4dfb8548bd704fe03db5ed0bbebfa71c7e8341fd --- .travis.yml | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/.travis.yml b/.travis.yml index c682a9d9db8..e752c9acc55 100644 --- a/.travis.yml +++ b/.travis.yml @@ -110,13 +110,14 @@ matrix: - $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh script: - $TRAVIS_BUILD_DIR/ci/travis_script_integration.sh - - language: node_js - os: linux - node_js: node - before_script: - - $TRAVIS_BUILD_DIR/ci/travis_before_script_js.sh - script: - - $TRAVIS_BUILD_DIR/ci/travis_script_js.sh + # TODO(wesm): Re-enable after issues in ARROW-1409 resolved + # - language: node_js + # os: linux + # node_js: node + # before_script: + # - $TRAVIS_BUILD_DIR/ci/travis_before_script_js.sh + # script: + # - $TRAVIS_BUILD_DIR/ci/travis_script_js.sh - compiler: gcc language: cpp os: linux