Skip to content

Commit

Permalink
ARROW-8343: [GLib] Add GArrowRecordBatchIterator
Browse files Browse the repository at this point in the history
I'd like to add `GArrowRecordBatchIterator` as a binding of `arrow::RecordBatchIterator` class.

Closes #6847 from mrkn/ARROW-8343

Lead-authored-by: Kenta Murata <mrkn@mrkn.jp>
Co-authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
  • Loading branch information
mrkn and kou committed Apr 8, 2020
1 parent 1a1047a commit 0a66565
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 12 deletions.
208 changes: 199 additions & 9 deletions c_glib/arrow-glib/record-batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,34 @@
#include <arrow-glib/record-batch.hpp>
#include <arrow-glib/schema.hpp>

#include <arrow/util/iterator.h>

#include <sstream>

G_BEGIN_DECLS

/**
* SECTION: record-batch
* @short_description: Record batch class
* @section_id: record-batch
* @title: Record batch related classes
* @include: arrow-glib/arrow-glib.h
*
* #GArrowRecordBatch is a class for record batch. Record batch is
* similar to #GArrowTable. Record batch also has also zero or more
* columns and zero or more records.
*
* Record batch is used for shared memory IPC.
*
* #GArrowRecordBatchIterator is a class for iterating record
* batches.
*/

typedef struct GArrowRecordBatchPrivate_ {
std::shared_ptr<arrow::RecordBatch> record_batch;
} GArrowRecordBatchPrivate;

enum {
PROP_0,
PROP_RECORD_BATCH
PROP_RECORD_BATCH = 1,
};

G_DEFINE_TYPE_WITH_PRIVATE(GArrowRecordBatch,
Expand All @@ -73,9 +79,9 @@ garrow_record_batch_finalize(GObject *object)

static void
garrow_record_batch_set_property(GObject *object,
guint prop_id,
const GValue *value,
GParamSpec *pspec)
guint prop_id,
const GValue *value,
GParamSpec *pspec)
{
auto priv = GARROW_RECORD_BATCH_GET_PRIVATE(object);

Expand All @@ -92,9 +98,9 @@ garrow_record_batch_set_property(GObject *object,

static void
garrow_record_batch_get_property(GObject *object,
guint prop_id,
GValue *value,
GParamSpec *pspec)
guint prop_id,
GValue *value,
GParamSpec *pspec)
{
switch (prop_id) {
default:
Expand Down Expand Up @@ -402,6 +408,174 @@ garrow_record_batch_remove_column(GArrowRecordBatch *record_batch,
}
}


typedef struct GArrowRecordBatchIteratorPrivate_ {
arrow::RecordBatchIterator iterator;
} GArrowRecordBatchIteratorPrivate;

enum {
PROP_ITERATOR = 1,
};

G_DEFINE_TYPE_WITH_PRIVATE(GArrowRecordBatchIterator,
garrow_record_batch_iterator,
G_TYPE_OBJECT)

#define GARROW_RECORD_BATCH_ITERATOR_GET_PRIVATE(obj) \
static_cast<GArrowRecordBatchIteratorPrivate *>( \
garrow_record_batch_iterator_get_instance_private( \
GARROW_RECORD_BATCH_ITERATOR(obj)))

static void
garrow_record_batch_iterator_finalize(GObject *object)
{
auto priv = GARROW_RECORD_BATCH_ITERATOR_GET_PRIVATE(object);

priv->iterator.~Iterator();

G_OBJECT_CLASS(garrow_record_batch_iterator_parent_class)->finalize(object);
}

static void
garrow_record_batch_iterator_set_property(GObject *object,
guint prop_id,
const GValue *value,
GParamSpec *pspec)
{
auto priv = GARROW_RECORD_BATCH_ITERATOR_GET_PRIVATE(object);

switch (prop_id) {
case PROP_ITERATOR:
priv->iterator =
std::move(*static_cast<arrow::RecordBatchIterator *>(g_value_get_pointer(value)));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
}
}

static void
garrow_record_batch_iterator_init(GArrowRecordBatchIterator *object)
{
auto priv = GARROW_RECORD_BATCH_ITERATOR_GET_PRIVATE(object);
new(&priv->iterator) arrow::RecordBatchIterator;
}

static void
garrow_record_batch_iterator_class_init(GArrowRecordBatchIteratorClass *klass)
{
auto gobject_class = G_OBJECT_CLASS(klass);

gobject_class->finalize = garrow_record_batch_iterator_finalize;
gobject_class->set_property = garrow_record_batch_iterator_set_property;

GParamSpec *spec;

spec = g_param_spec_pointer("iterator",
"Iterator",
"The raw arrow::RecordBatchIterator",
static_cast<GParamFlags>(G_PARAM_WRITABLE |
G_PARAM_CONSTRUCT_ONLY));
g_object_class_install_property(gobject_class, PROP_ITERATOR, spec);
}

/**
* garrow_record_batch_iterator_new:
* @record_batches: (element-type GArrowRecordBatch):
* The record batches.
*
* Returns: A newly created #GArrowRecordBatchIterator.
*
* Since: 0.17.0
*/
GArrowRecordBatchIterator *
garrow_record_batch_iterator_new(GList *record_batches)
{
std::vector<std::shared_ptr<arrow::RecordBatch>> arrow_record_batches;
for (auto node = record_batches; node; node = node->next) {
auto record_batch = GARROW_RECORD_BATCH(node->data);
arrow_record_batches.push_back(garrow_record_batch_get_raw(record_batch));
}

auto arrow_iterator = arrow::MakeVectorIterator(arrow_record_batches);
return garrow_record_batch_iterator_new_raw(&arrow_iterator);
}

/**
* garrow_record_batch_iterator_next:
* @iterator: A #GArrowRecordBatchIterator.
* @error: (nullable): Return location for a #GError or %NULL.
*
* Returns: (nullable) (transfer full):
* The next #GArrowRecordBatch, or %NULL when the iterator is completed.
*
* Since: 0.17.0
*/
GArrowRecordBatch *
garrow_record_batch_iterator_next(GArrowRecordBatchIterator *iterator,
GError **error)
{
auto priv = GARROW_RECORD_BATCH_ITERATOR_GET_PRIVATE(iterator);

auto result = priv->iterator.Next();
if (garrow::check(error, result, "[record-batch-iterator][next]")) {
auto arrow_record_batch = *result;
if (arrow_record_batch) {
return garrow_record_batch_new_raw(&arrow_record_batch);
}
}
return NULL;
}

/**
* garrow_record_batch_iterator_equal:
* @iterator: A #GArrowRecordBatchIterator.
* @other_iterator: A #GArrowRecordBatchIterator to be compared.
*
* Returns: %TRUE if both iterators are the same, %FALSE otherwise.
*
* Since: 0.17.0
*/
gboolean
garrow_record_batch_iterator_equal(GArrowRecordBatchIterator *iterator,
GArrowRecordBatchIterator *other_iterator)
{
auto priv = GARROW_RECORD_BATCH_ITERATOR_GET_PRIVATE(iterator);
auto priv_other = GARROW_RECORD_BATCH_ITERATOR_GET_PRIVATE(other_iterator);
return priv->iterator.Equals(priv_other->iterator);
}

/**
* garrow_record_batch_iterator_to_list:
* @iterator: A #GArrowRecordBatchIterator.
* @error: (nullable): Return location for a #GError or %NULL.
*
* Returns: (element-type GArrowRecordBatch) (transfer full):
* A #GList contains every moved elements from the iterator.
*
* Since: 0.17.0
*/
GList*
garrow_record_batch_iterator_to_list(GArrowRecordBatchIterator *iterator,
GError **error)
{
auto priv = GARROW_RECORD_BATCH_ITERATOR_GET_PRIVATE(iterator);
GList *record_batches = NULL;
for (auto arrow_record_batch_result : priv->iterator) {
if (!garrow::check(error,
arrow_record_batch_result,
"[record-batch-iterator][to-list]")) {
g_list_free_full(record_batches, g_object_unref);
return NULL;
}
auto arrow_record_batch = *std::move(arrow_record_batch_result);
auto record_batch = garrow_record_batch_new_raw(&arrow_record_batch);
record_batches = g_list_prepend(record_batches, record_batch);
}
return g_list_reverse(record_batches);
}

G_END_DECLS

GArrowRecordBatch *
Expand All @@ -420,3 +594,19 @@ garrow_record_batch_get_raw(GArrowRecordBatch *record_batch)
auto priv = GARROW_RECORD_BATCH_GET_PRIVATE(record_batch);
return priv->record_batch;
}

GArrowRecordBatchIterator *
garrow_record_batch_iterator_new_raw(arrow::RecordBatchIterator *arrow_iterator)
{
auto iterator = g_object_new(GARROW_TYPE_RECORD_BATCH_ITERATOR,
"iterator", arrow_iterator,
NULL);
return GARROW_RECORD_BATCH_ITERATOR(iterator);
}

arrow::RecordBatchIterator *
garrow_record_batch_iterator_get_raw(GArrowRecordBatchIterator *iterator)
{
auto priv = GARROW_RECORD_BATCH_ITERATOR_GET_PRIVATE(iterator);
return &priv->iterator;
}
32 changes: 32 additions & 0 deletions c_glib/arrow-glib/record-batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,36 @@ GArrowRecordBatch *garrow_record_batch_remove_column(GArrowRecordBatch *record_b
guint i,
GError **error);


#define GARROW_TYPE_RECORD_BATCH_ITERATOR \
(garrow_record_batch_iterator_get_type())
G_DECLARE_DERIVABLE_TYPE(GArrowRecordBatchIterator,
garrow_record_batch_iterator,
GARROW,
RECORD_BATCH_ITERATOR,
GObject)
struct _GArrowRecordBatchIteratorClass
{
GObjectClass parent_class;
};

GARROW_AVAILABLE_IN_0_17
GArrowRecordBatchIterator *
garrow_record_batch_iterator_new(GList *record_batches);

GARROW_AVAILABLE_IN_0_17
GArrowRecordBatch *
garrow_record_batch_iterator_next(GArrowRecordBatchIterator *iterator,
GError **error);

GARROW_AVAILABLE_IN_0_17
gboolean
garrow_record_batch_iterator_equal(GArrowRecordBatchIterator *iterator,
GArrowRecordBatchIterator *other_iterator);

GARROW_AVAILABLE_IN_0_17
GList*
garrow_record_batch_iterator_to_list(GArrowRecordBatchIterator *iterator,
GError **error);

G_END_DECLS
6 changes: 6 additions & 0 deletions c_glib/arrow-glib/record-batch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,9 @@

GArrowRecordBatch *garrow_record_batch_new_raw(std::shared_ptr<arrow::RecordBatch> *arrow_record_batch);
std::shared_ptr<arrow::RecordBatch> garrow_record_batch_get_raw(GArrowRecordBatch *record_batch);

GArrowRecordBatchIterator *
garrow_record_batch_iterator_new_raw(arrow::RecordBatchIterator *arrow_iterator);

arrow::RecordBatchIterator *
garrow_record_batch_iterator_get_raw(GArrowRecordBatchIterator *iterator);
51 changes: 51 additions & 0 deletions c_glib/test/test-record-batch-iterator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

class TestRecordBatchIterator <Test::Unit::TestCase
include Helper::Buildable

def setup
fields = [
Arrow::Field.new("visible", Arrow::BooleanDataType.new),
Arrow::Field.new("point", Arrow::Int32DataType.new),
]
schema = Arrow::Schema.new(fields)
@record_batches = [
[
build_boolean_array([true, false, true]),
build_int32_array([1, 2, 3]),
],
[
build_boolean_array([false, true, false, true]),
build_int32_array([-1, -2, -3, -4]),
]
].collect do |columns|
Arrow::RecordBatch.new(schema, columns[0].length, columns)
end
@iterator = Arrow::RecordBatchIterator.new(@record_batches)
end

def test_next
assert_equal(@record_batches[0], @iterator.next)
assert_equal(@record_batches[1], @iterator.next)
assert_equal(nil, @iterator.next)
end

def test_to_list
assert_equal(@record_batches, @iterator.to_list)
end
end
5 changes: 2 additions & 3 deletions ci/docker/ubuntu-20.04-cpp.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ RUN apt-get update -y -q && \
libre2-dev \
libsnappy-dev \
libssl-dev \
libthrift-dev \
libzstd-dev \
ninja-build \
pkg-config \
Expand All @@ -80,7 +81,6 @@ RUN apt-get update -y -q && \
# - flatbuffer is not packaged
# - libgtest-dev only provide sources
# - libprotobuf-dev only provide sources
# - thrift is too old
ENV ARROW_BUILD_TESTS=ON \
ARROW_DEPENDENCY_SOURCE=SYSTEM \
ARROW_DATASET=ON \
Expand All @@ -106,5 +106,4 @@ ENV ARROW_BUILD_TESTS=ON \
ORC_SOURCE=BUNDLED \
PARQUET_BUILD_EXECUTABLES=ON \
PARQUET_BUILD_EXAMPLES=ON \
PATH=/usr/lib/ccache/:$PATH \
Thrift_SOURCE=BUNDLED
PATH=/usr/lib/ccache/:$PATH

0 comments on commit 0a66565

Please sign in to comment.