From e6b707872889e55c111bc601c9967054e9842108 Mon Sep 17 00:00:00 2001 From: Kouhei Sutou Date: Sat, 10 Nov 2018 14:28:32 +0900 Subject: [PATCH] [GLib] Add GArrowCSVReader --- c_glib/arrow-glib/reader.cpp | 566 +++++++++++++++++++++- c_glib/arrow-glib/reader.h | 31 ++ c_glib/arrow-glib/reader.hpp | 6 + c_glib/doc/arrow-glib/arrow-glib-docs.xml | 4 + c_glib/test/test-csv-reader.rb | 59 +++ 5 files changed, 657 insertions(+), 9 deletions(-) create mode 100644 c_glib/test/test-csv-reader.rb diff --git a/c_glib/arrow-glib/reader.cpp b/c_glib/arrow-glib/reader.cpp index 296b911a6ef5..5253a45dbbd3 100644 --- a/c_glib/arrow-glib/reader.cpp +++ b/c_glib/arrow-glib/reader.cpp @@ -51,6 +51,9 @@ G_BEGIN_DECLS * * #GArrowFeatherFileReader is a class for reading columns in Feather * file format from input. + * + * #GArrowCSVReader is a class for reading table in CSV format from + * input. */ typedef struct GArrowRecordBatchReaderPrivate_ { @@ -888,6 +891,541 @@ garrow_feather_file_reader_read_names(GArrowFeatherFileReader *reader, } } + +typedef struct GArrowCSVReadOptionsPrivate_ { + arrow::MemoryPool *pool; + arrow::csv::ReadOptions read_options; + arrow::csv::ParseOptions parse_options; + arrow::csv::ConvertOptions convert_options; +} GArrowCSVReadOptionsPrivate; + +enum { + PROP_POOL = 1, + PROP_USE_THREADS, + PROP_BLOCK_SIZE, + PROP_DELIMITER, + PROP_IS_QUOTED, + PROP_QUOTE_CHARACTER, + PROP_IS_DOUBLE_QUOTED, + PROP_IS_ESCAPED, + PROP_ESCAPE_CHARACTER, + PROP_ALLOW_NEWLINES_IN_VALUES, + PROP_IGNORE_EMPTY_LINES, + PROP_N_HEADER_ROWS, + PROP_CHECK_UTF8 +}; + +G_DEFINE_TYPE_WITH_PRIVATE(GArrowCSVReadOptions, + garrow_csv_read_options, + G_TYPE_OBJECT) + +#define GARROW_CSV_READ_OPTIONS_GET_PRIVATE(object) \ + static_cast( \ + garrow_csv_read_options_get_instance_private( \ + GARROW_CSV_READ_OPTIONS(object))) + +static void +garrow_csv_read_options_set_property(GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + auto priv = GARROW_CSV_READ_OPTIONS_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_POOL: + priv->pool = static_cast(g_value_get_pointer(value)); + break; + case PROP_USE_THREADS: + priv->read_options.use_threads = g_value_get_boolean(value); + break; + case PROP_BLOCK_SIZE: + priv->read_options.block_size = g_value_get_int(value); + break; + case PROP_DELIMITER: + priv->parse_options.delimiter = g_value_get_schar(value); + break; + case PROP_IS_QUOTED: + priv->parse_options.quoting = g_value_get_boolean(value); + break; + case PROP_QUOTE_CHARACTER: + priv->parse_options.quote_char = g_value_get_schar(value); + break; + case PROP_IS_DOUBLE_QUOTED: + priv->parse_options.double_quote = g_value_get_boolean(value); + break; + case PROP_IS_ESCAPED: + priv->parse_options.escaping = g_value_get_boolean(value); + break; + case PROP_ESCAPE_CHARACTER: + priv->parse_options.escape_char = g_value_get_schar(value); + break; + case PROP_ALLOW_NEWLINES_IN_VALUES: + priv->parse_options.newlines_in_values = g_value_get_boolean(value); + break; + case PROP_IGNORE_EMPTY_LINES: + priv->parse_options.ignore_empty_lines = g_value_get_boolean(value); + break; + case PROP_N_HEADER_ROWS: + priv->parse_options.header_rows = g_value_get_uint(value); + break; + case PROP_CHECK_UTF8: + priv->convert_options.check_utf8 = g_value_get_boolean(value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +garrow_csv_read_options_get_property(GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + auto priv = GARROW_CSV_READ_OPTIONS_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_USE_THREADS: + g_value_set_boolean(value, priv->read_options.use_threads); + break; + case PROP_BLOCK_SIZE: + g_value_set_int(value, priv->read_options.block_size); + break; + case PROP_DELIMITER: + g_value_set_schar(value, priv->parse_options.delimiter); + break; + case PROP_IS_QUOTED: + g_value_set_boolean(value, priv->parse_options.quoting); + break; + case PROP_QUOTE_CHARACTER: + g_value_set_schar(value, priv->parse_options.quote_char); + break; + case PROP_IS_DOUBLE_QUOTED: + g_value_set_boolean(value, priv->parse_options.double_quote); + break; + case PROP_IS_ESCAPED: + g_value_set_boolean(value, priv->parse_options.escaping); + break; + case PROP_ESCAPE_CHARACTER: + g_value_set_schar(value, priv->parse_options.escape_char); + break; + case PROP_ALLOW_NEWLINES_IN_VALUES: + g_value_set_boolean(value, priv->parse_options.newlines_in_values); + break; + case PROP_IGNORE_EMPTY_LINES: + g_value_set_boolean(value, priv->parse_options.ignore_empty_lines); + break; + case PROP_N_HEADER_ROWS: + g_value_set_uint(value, priv->parse_options.header_rows); + break; + case PROP_CHECK_UTF8: + g_value_set_boolean(value, priv->convert_options.check_utf8); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +garrow_csv_read_options_init(GArrowCSVReadOptions *object) +{ + auto priv = GARROW_CSV_READ_OPTIONS_GET_PRIVATE(object); + priv->pool = arrow::default_memory_pool(); + priv->read_options = arrow::csv::ReadOptions::Defaults(); + priv->parse_options = arrow::csv::ParseOptions::Defaults(); + priv->convert_options = arrow::csv::ConvertOptions::Defaults(); +} + +static void +garrow_csv_read_options_class_init(GArrowCSVReadOptionsClass *klass) +{ + GParamSpec *spec; + + auto gobject_class = G_OBJECT_CLASS(klass); + + gobject_class->set_property = garrow_csv_read_options_set_property; + gobject_class->get_property = garrow_csv_read_options_get_property; + + spec = g_param_spec_pointer("pool", + "Pool", + "The raw arrow::MemoryPool *", + static_cast(G_PARAM_WRITABLE | + G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(gobject_class, PROP_POOL, spec); + + auto read_options = arrow::csv::ReadOptions::Defaults(); + + /** + * GArrowCSVReadOptions:use-threads: + * + * Whether to use the global CPU thread pool. + * + * Since: 0.12.0 + */ + spec = g_param_spec_boolean("use-threads", + "Use threads", + "Whether to use the global CPU thread pool", + read_options.use_threads, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, PROP_USE_THREADS, spec); + + /** + * GArrowCSVReadOptions:block-size: + * + * Block size we request from the IO layer; also determines the size + * of chunks when #GArrowCSVReadOptions:use-threads is %TRUE. + * + * Since: 0.12.0 + */ + spec = g_param_spec_int("block-size", + "Block size", + "Block size we request from the IO layer; " + "also determines the size of chunks " + "when ::use-threads is %TRUE", + 0, + G_MAXINT, + read_options.block_size, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, PROP_BLOCK_SIZE, spec); + + + auto parse_options = arrow::csv::ParseOptions::Defaults(); + + /** + * GArrowCSVReadOptions:delimiter: + * + * Field delimiter character. + * + * Since: 0.12.0 + */ + spec = g_param_spec_char("delimiter", + "Delimiter", + "Field delimiter character", + 0, + G_MAXINT8, + parse_options.delimiter, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, PROP_DELIMITER, spec); + + /** + * GArrowCSVReadOptions:is-quoted: + * + * Whether quoting is used. + * + * Since: 0.12.0 + */ + spec = g_param_spec_boolean("is-quoted", + "Is quoted", + "Whether quoting is used", + parse_options.quoting, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, PROP_IS_QUOTED, spec); + + /** + * GArrowCSVReadOptions:quote-character: + * + * Quoting character. This is used only when + * #GArrowCSVReadOptions:is-quoted is %TRUE. + * + * Since: 0.12.0 + */ + spec = g_param_spec_char("quote-character", + "Quote character", + "Quoting character", + 0, + G_MAXINT8, + parse_options.quote_char, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, PROP_QUOTE_CHARACTER, spec); + + /** + * GArrowCSVReadOptions:is-double-quoted: + * + * Whether a quote inside a value is double quoted. + * + * Since: 0.12.0 + */ + spec = g_param_spec_boolean("is-double-quoted", + "Is double quoted", + "Whether a quote inside a value is double quoted", + parse_options.double_quote, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, PROP_IS_DOUBLE_QUOTED, spec); + + /** + * GArrowCSVReadOptions:is-escaped: + * + * Whether escaping is used. + * + * Since: 0.12.0 + */ + spec = g_param_spec_boolean("is-escaped", + "Is escaped", + "Whether escaping is used", + parse_options.escaping, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, PROP_IS_ESCAPED, spec); + + /** + * GArrowCSVReadOptions:escape-character: + * + * Escaping character. This is used only when + * #GArrowCSVReadOptions:is-escaped is %TRUE. + * + * Since: 0.12.0 + */ + spec = g_param_spec_char("escape-character", + "Escape character", + "Escaping character", + 0, + G_MAXINT8, + parse_options.escape_char, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, PROP_ESCAPE_CHARACTER, spec); + + /** + * GArrowCSVReadOptions:allow-newlines-in-values: + * + * Whether values are allowed to contain CR (0x0d) and LF (0x0a) characters. + * + * Since: 0.12.0 + */ + spec = g_param_spec_boolean("allow-newlines-in-values", + "Allow newlines in values", + "Whether values are allowed to contain " + "CR (0x0d) and LF (0x0a) characters.", + parse_options.newlines_in_values, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, + PROP_ALLOW_NEWLINES_IN_VALUES, + spec); + + /** + * GArrowCSVReadOptions:ignore-empty-lines: + * + * Whether empty lines are ignored. If %FALSE, an empty line + * represents a simple empty value (assuming a one-column CSV file). + * + * Since: 0.12.0 + */ + spec = g_param_spec_boolean("ignore-empty-lines", + "Ignore empty lines", + "Whether empty lines are ignored. " + "If %FALSE, an empty line represents " + "a simple empty value " + "(assuming a one-column CSV file).", + parse_options.ignore_empty_lines, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, + PROP_IGNORE_EMPTY_LINES, + spec); + + /** + * GArrowCSVReadOptions:n-header-rows: + * + * The number of header rows to skip (including the first row + * containing column names) + * + * Since: 0.12.0 + */ + spec = g_param_spec_uint("n-header-rows", + "N header rows", + "The number of header rows to skip " + "(including the first row containing column names", + 0, + G_MAXUINT, + parse_options.header_rows, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, + PROP_N_HEADER_ROWS, + spec); + + auto convert_options = arrow::csv::ConvertOptions::Defaults(); + + /** + * GArrowCSVReadOptions:check-utf8: + * + * Whether to check UTF8 validity of string columns. + * + * Since: 0.12.0 + */ + spec = g_param_spec_boolean("check-utf8", + "Check UTF8", + "Whether to check UTF8 validity of string columns", + convert_options.check_utf8, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, PROP_CHECK_UTF8, spec); +} + +/** + * garrow_csv_read_options_new: + * + * Returns: A newly created #GArrowCSVReadOptions. + * + * Since: 0.12.0 + */ +GArrowCSVReadOptions * +garrow_csv_read_options_new(void) +{ + auto csv_read_options = g_object_new(GARROW_TYPE_CSV_READ_OPTIONS, + "pool", arrow::default_memory_pool(), + NULL); + return GARROW_CSV_READ_OPTIONS(csv_read_options); +} + + +typedef struct GArrowCSVReaderPrivate_ { + std::shared_ptr reader; +} GArrowCSVReaderPrivate; + +enum { + PROP_CSV_TABLE_READER = 1 +}; + +G_DEFINE_TYPE_WITH_PRIVATE(GArrowCSVReader, + garrow_csv_reader, + G_TYPE_OBJECT) + +#define GARROW_CSV_READER_GET_PRIVATE(object) \ + static_cast( \ + garrow_csv_reader_get_instance_private( \ + GARROW_CSV_READER(object))) + +static void +garrow_csv_reader_dispose(GObject *object) +{ + auto priv = GARROW_CSV_READER_GET_PRIVATE(object); + + priv->reader = nullptr; + + G_OBJECT_CLASS(garrow_csv_reader_parent_class)->dispose(object); +} + +static void +garrow_csv_reader_set_property(GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + auto priv = GARROW_CSV_READER_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_CSV_TABLE_READER: + priv->reader = + *static_cast *>(g_value_get_pointer(value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +garrow_csv_reader_get_property(GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + switch (prop_id) { + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +garrow_csv_reader_init(GArrowCSVReader *object) +{ +} + +static void +garrow_csv_reader_class_init(GArrowCSVReaderClass *klass) +{ + GParamSpec *spec; + + auto gobject_class = G_OBJECT_CLASS(klass); + + gobject_class->dispose = garrow_csv_reader_dispose; + gobject_class->set_property = garrow_csv_reader_set_property; + gobject_class->get_property = garrow_csv_reader_get_property; + + spec = g_param_spec_pointer("csv-table-reader", + "CSV table reader", + "The raw std::shared *", + static_cast(G_PARAM_WRITABLE | + G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(gobject_class, PROP_CSV_TABLE_READER, spec); +} + +/** + * garrow_csv_reader_new: + * @input: The input to be read. + * @options: (nullable): A #GArrowCSVReadOptions. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: (nullable): A newly created #GArrowCSVReader or %NULL on error. + * + * Since: 0.12.0 + */ +GArrowCSVReader * +garrow_csv_reader_new(GArrowInputStream *input, + GArrowCSVReadOptions *options, + GError **error) +{ + auto arrow_input = garrow_input_stream_get_raw(input); + arrow::Status status; + std::shared_ptr arrow_reader; + if (options) { + auto options_priv = GARROW_CSV_READ_OPTIONS_GET_PRIVATE(options); + status = arrow::csv::TableReader::Make(options_priv->pool, + arrow_input, + options_priv->read_options, + options_priv->parse_options, + options_priv->convert_options, + &arrow_reader); + } else { + status = + arrow::csv::TableReader::Make(arrow::default_memory_pool(), + arrow_input, + arrow::csv::ReadOptions::Defaults(), + arrow::csv::ParseOptions::Defaults(), + arrow::csv::ConvertOptions::Defaults(), + &arrow_reader); + } + + if (garrow_error_check(error, status, "[csv-reader][new]")) { + return garrow_csv_reader_new_raw(&arrow_reader); + } else { + return NULL; + } +} + +/** + * garrow_csv_reader_read: + * @reader: A #GArrowCSVReader. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: (nullable) (transfer full): A read #GArrowTable or %NULL on error. + * + * Since: 0.12.0 + */ +GArrowTable * +garrow_csv_reader_read(GArrowCSVReader *reader, + GError **error) +{ + auto arrow_reader = garrow_csv_reader_get_raw(reader); + std::shared_ptr arrow_table; + auto status = arrow_reader->Read(&arrow_table); + if (garrow_error_check(error, status, "[csv-reader][read]")) { + return garrow_table_new_raw(&arrow_table); + } else { + return NULL; + } +} + G_END_DECLS GArrowRecordBatchReader * @@ -903,9 +1441,7 @@ garrow_record_batch_reader_new_raw(std::shared_ptr garrow_record_batch_reader_get_raw(GArrowRecordBatchReader *reader) { - GArrowRecordBatchReaderPrivate *priv; - - priv = GARROW_RECORD_BATCH_READER_GET_PRIVATE(reader); + auto priv = GARROW_RECORD_BATCH_READER_GET_PRIVATE(reader); return priv->record_batch_reader; } @@ -944,9 +1480,7 @@ garrow_record_batch_file_reader_new_raw(std::shared_ptr garrow_record_batch_file_reader_get_raw(GArrowRecordBatchFileReader *reader) { - GArrowRecordBatchFileReaderPrivate *priv; - - priv = GARROW_RECORD_BATCH_FILE_READER_GET_PRIVATE(reader); + auto priv = GARROW_RECORD_BATCH_FILE_READER_GET_PRIVATE(reader); return priv->record_batch_file_reader; } @@ -964,8 +1498,22 @@ garrow_feather_file_reader_new_raw(arrow::ipc::feather::TableReader *arrow_reade arrow::ipc::feather::TableReader * garrow_feather_file_reader_get_raw(GArrowFeatherFileReader *reader) { - GArrowFeatherFileReaderPrivate *priv; - - priv = GARROW_FEATHER_FILE_READER_GET_PRIVATE(reader); + auto priv = GARROW_FEATHER_FILE_READER_GET_PRIVATE(reader); return priv->feather_table_reader; } + +GArrowCSVReader * +garrow_csv_reader_new_raw(std::shared_ptr *arrow_reader) +{ + auto reader = GARROW_CSV_READER(g_object_new(GARROW_TYPE_CSV_READER, + "csv-table-reader", arrow_reader, + NULL)); + return reader; +} + +std::shared_ptr +garrow_csv_reader_get_raw(GArrowCSVReader *reader) +{ + auto priv = GARROW_CSV_READER_GET_PRIVATE(reader); + return priv->reader; +} diff --git a/c_glib/arrow-glib/reader.h b/c_glib/arrow-glib/reader.h index b043ec1c40fb..d1a3947a4c98 100644 --- a/c_glib/arrow-glib/reader.h +++ b/c_glib/arrow-glib/reader.h @@ -243,4 +243,35 @@ garrow_feather_file_reader_read_names(GArrowFeatherFileReader *reader, guint n_names, GError **error); +#define GARROW_TYPE_CSV_READ_OPTIONS (garrow_csv_read_options_get_type()) +G_DECLARE_DERIVABLE_TYPE(GArrowCSVReadOptions, + garrow_csv_read_options, + GARROW, + CSV_READ_OPTIONS, + GObject) +struct _GArrowCSVReadOptionsClass +{ + GObjectClass parent_class; +}; + +GArrowCSVReadOptions *garrow_csv_read_options_new(void); + +#define GARROW_TYPE_CSV_READER (garrow_csv_reader_get_type()) +G_DECLARE_DERIVABLE_TYPE(GArrowCSVReader, + garrow_csv_reader, + GARROW, + CSV_READER, + GObject) +struct _GArrowCSVReaderClass +{ + GObjectClass parent_class; +}; + +GArrowCSVReader *garrow_csv_reader_new(GArrowInputStream *input, + GArrowCSVReadOptions *options, + GError **error); +GArrowTable *garrow_csv_reader_read(GArrowCSVReader *reader, + GError **error); + + G_END_DECLS diff --git a/c_glib/arrow-glib/reader.hpp b/c_glib/arrow-glib/reader.hpp index 3e1135e13771..4f85e490735b 100644 --- a/c_glib/arrow-glib/reader.hpp +++ b/c_glib/arrow-glib/reader.hpp @@ -20,6 +20,7 @@ #pragma once #include +#include #include #include @@ -37,3 +38,8 @@ std::shared_ptr garrow_record_batch_file_read GArrowFeatherFileReader *garrow_feather_file_reader_new_raw(arrow::ipc::feather::TableReader *arrow_reader); arrow::ipc::feather::TableReader *garrow_feather_file_reader_get_raw(GArrowFeatherFileReader *reader); + +GArrowCSVReader * +garrow_csv_reader_new_raw(std::shared_ptr *arrow_reader); +std::shared_ptr +garrow_csv_reader_get_raw(GArrowCSVReader *reader); diff --git a/c_glib/doc/arrow-glib/arrow-glib-docs.xml b/c_glib/doc/arrow-glib/arrow-glib-docs.xml index 462f42e0ace4..948a321a178d 100644 --- a/c_glib/doc/arrow-glib/arrow-glib-docs.xml +++ b/c_glib/doc/arrow-glib/arrow-glib-docs.xml @@ -159,6 +159,10 @@ Index of deprecated API + + Index of new symbols in 0.12.0 + + Index of new symbols in 0.11.0 diff --git a/c_glib/test/test-csv-reader.rb b/c_glib/test/test-csv-reader.rb new file mode 100644 index 000000000000..12897a86973e --- /dev/null +++ b/c_glib/test/test-csv-reader.rb @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +class TestCSVReader < Test::Unit::TestCase + include Helper::Buildable + include Helper::Omittable + + sub_test_case("#read") do + def open_input(csv) + buffer = Arrow::Buffer.new(csv) + Arrow::BufferInputStream.new(buffer) + end + + def test_default + require_gi(1, 42, 0) + table = Arrow::CSVReader.new(open_input(<<-CSV)) +message,count +"Start",2 +"Shutdown",9 + CSV + columns = { + "message" => build_string_array(["Start", "Shutdown"]), + "count" => build_int64_array([2, 9]), + } + assert_equal(build_table(columns), + table.read) + end + + def test_options + options = Arrow::CSVReadOptions.new + options.quoted = false + table = Arrow::CSVReader.new(open_input(<<-CSV), options) +message,count +"Start",2 +"Shutdown",9 + CSV + columns = { + "message" => build_string_array(["\"Start\"", "\"Shutdown\""]), + "count" => build_int64_array([2, 9]), + } + assert_equal(build_table(columns), + table.read) + end + end +end