Skip to content

Commit

Permalink
GH-37143: [GLib][FlightSQL] Add support for prepared INSERT (#37196)
Browse files Browse the repository at this point in the history
### Rationale for this change

Prepared INSERT is useful for large INSERT.

### What changes are included in this PR?

Sorry. This is too large I thought.

Core bindings are the followings:

* `GAFLightSQLPreparedStatement`
* `gaflightsql_client_prepare()`

Others are for writing test of them. We need to implement a test server for prepared INSERT. Others are for it.

An exception is `ruby/red-arrow-flight-sql/lib/` change. It's for convenient API for `gaflightsql_client_prepare()`.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* Closes: #37143

Authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
  • Loading branch information
kou committed Aug 18, 2023
1 parent ca1a0eb commit 9fea4ee
Show file tree
Hide file tree
Showing 22 changed files with 1,574 additions and 25 deletions.
18 changes: 9 additions & 9 deletions c_glib/arrow-flight-glib/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -689,16 +689,14 @@ gaflight_client_do_get(GAFlightClient *client,
if (options) {
flight_options = gaflight_call_options_get_raw(options);
}
std::unique_ptr<arrow::flight::FlightStreamReader> flight_reader;
auto result = flight_client->DoGet(*flight_options, *flight_ticket);
auto status = std::move(result).Value(&flight_reader);
if (garrow::check(error,
status,
"[flight-client][do-get]")) {
return gaflight_stream_reader_new_raw(flight_reader.release());
} else {
return NULL;
if (!garrow::check(error,
result,
"[flight-client][do-get]")) {
return nullptr;
}
auto flight_reader = std::move(*result);
return gaflight_stream_reader_new_raw(flight_reader.release(), TRUE);
}


Expand All @@ -707,11 +705,13 @@ G_END_DECLS

GAFlightStreamReader *
gaflight_stream_reader_new_raw(
arrow::flight::FlightStreamReader *flight_reader)
arrow::flight::FlightStreamReader *flight_reader,
gboolean is_owner)
{
return GAFLIGHT_STREAM_READER(
g_object_new(GAFLIGHT_TYPE_STREAM_READER,
"reader", flight_reader,
"is-owner", is_owner,
NULL));
}

Expand Down
4 changes: 3 additions & 1 deletion c_glib/arrow-flight-glib/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@


GAFlightStreamReader *
gaflight_stream_reader_new_raw(arrow::flight::FlightStreamReader *flight_reader);
gaflight_stream_reader_new_raw(
arrow::flight::FlightStreamReader *flight_reader,
gboolean is_owner);

arrow::flight::FlightCallOptions *
gaflight_call_options_get_raw(GAFlightCallOptions *options);
Expand Down
23 changes: 18 additions & 5 deletions c_glib/arrow-flight-glib/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1223,10 +1223,12 @@ gaflight_stream_chunk_get_metadata(GAFlightStreamChunk *chunk)

typedef struct GAFlightRecordBatchReaderPrivate_ {
arrow::flight::MetadataRecordBatchReader *reader;
bool is_owner;
} GAFlightRecordBatchReaderPrivate;

enum {
PROP_READER = 1,
PROP_IS_OWNER,
};

G_DEFINE_TYPE_WITH_PRIVATE(GAFlightRecordBatchReader,
Expand All @@ -1242,9 +1244,9 @@ static void
gaflight_record_batch_reader_finalize(GObject *object)
{
auto priv = GAFLIGHT_RECORD_BATCH_READER_GET_PRIVATE(object);

delete priv->reader;

if (priv->is_owner) {
delete priv->reader;
}
G_OBJECT_CLASS(gaflight_info_parent_class)->finalize(object);
}

Expand All @@ -1262,6 +1264,9 @@ gaflight_record_batch_reader_set_property(GObject *object,
static_cast<arrow::flight::MetadataRecordBatchReader *>(
g_value_get_pointer(value));
break;
case PROP_IS_OWNER:
priv->is_owner = g_value_get_boolean(value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
Expand All @@ -1283,11 +1288,19 @@ gaflight_record_batch_reader_class_init(GAFlightRecordBatchReaderClass *klass)

GParamSpec *spec;
spec = g_param_spec_pointer("reader",
"Reader",
"The raw arrow::flight::MetadataRecordBatchReader *",
nullptr,
nullptr,
static_cast<GParamFlags>(G_PARAM_WRITABLE |
G_PARAM_CONSTRUCT_ONLY));
g_object_class_install_property(gobject_class, PROP_READER, spec);

spec = g_param_spec_boolean("is-owner",
nullptr,
nullptr,
TRUE,
static_cast<GParamFlags>(G_PARAM_WRITABLE |
G_PARAM_CONSTRUCT_ONLY));
g_object_class_install_property(gobject_class, PROP_IS_OWNER, spec);
}

/**
Expand Down
57 changes: 57 additions & 0 deletions c_glib/arrow-flight-glib/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ G_BEGIN_DECLS
* IPC payloads to be sent in `FlightData` protobuf messages by
* #GArrowRecordBatchReader`.
*
* #GAFlightMessageReader is a class for IPC payloads uploaded by a
* client. Also allows reading application-defined metadata via the
* Flight protocol.
*
* #GAFlightServerAuthSender is a class for sending messages to the
* client during an authentication handshake.
*
Expand Down Expand Up @@ -258,6 +262,37 @@ gaflight_record_batch_stream_new(GArrowRecordBatchReader *reader,
}


G_DEFINE_TYPE(GAFlightMessageReader,
gaflight_message_reader,
GAFLIGHT_TYPE_RECORD_BATCH_READER)

static void
gaflight_message_reader_init(GAFlightMessageReader *object)
{
}

static void
gaflight_message_reader_class_init(GAFlightMessageReaderClass *klass)
{
}

/**
* gaflight_message_reader_get_descriptor:
* @reader: A #GAFlightMessageReader.
*
* Returns: (transfer full): The descriptor for this upload.
*
* Since: 14.0.0
*/
GAFlightDescriptor *
gaflight_message_reader_get_descriptor(GAFlightMessageReader *reader)
{
auto flight_reader = gaflight_message_reader_get_raw(reader);
const auto &flight_descriptor = flight_reader->descriptor();
return gaflight_descriptor_new_raw(&flight_descriptor);
}


typedef struct GAFlightServerCallContextPrivate_ {
arrow::flight::ServerCallContext *call_context;
} GAFlightServerCallContextPrivate;
Expand Down Expand Up @@ -1215,6 +1250,28 @@ gaflight_data_stream_get_raw(GAFlightDataStream *stream)
return priv->stream;
}


GAFlightMessageReader *
gaflight_message_reader_new_raw(
arrow::flight::FlightMessageReader *flight_reader,
gboolean is_owner)
{
return GAFLIGHT_MESSAGE_READER(
g_object_new(GAFLIGHT_TYPE_MESSAGE_READER,
"reader", flight_reader,
"is-owner", is_owner,
NULL));
}

arrow::flight::FlightMessageReader *
gaflight_message_reader_get_raw(GAFlightMessageReader *reader)
{
auto flight_reader =
gaflight_record_batch_reader_get_raw(GAFLIGHT_RECORD_BATCH_READER(reader));
return static_cast<arrow::flight::FlightMessageReader *>(flight_reader);
}


GAFlightServerCallContext *
gaflight_server_call_context_new_raw(
const arrow::flight::ServerCallContext *flight_call_context)
Expand Down
17 changes: 17 additions & 0 deletions c_glib/arrow-flight-glib/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,23 @@ gaflight_record_batch_stream_new(GArrowRecordBatchReader *reader,
GArrowWriteOptions *options);


#define GAFLIGHT_TYPE_MESSAGE_READER \
(gaflight_message_reader_get_type())
G_DECLARE_DERIVABLE_TYPE(GAFlightMessageReader,
gaflight_message_reader,
GAFLIGHT,
MESSAGE_READER,
GAFlightRecordBatchReader)
struct _GAFlightMessageReaderClass
{
GAFlightRecordBatchReaderClass parent_class;
};

GARROW_AVAILABLE_IN_14_0
GAFlightDescriptor *
gaflight_message_reader_get_descriptor(GAFlightMessageReader *reader);


#define GAFLIGHT_TYPE_SERVER_CALL_CONTEXT \
(gaflight_server_call_context_get_type())
G_DECLARE_DERIVABLE_TYPE(GAFlightServerCallContext,
Expand Down
8 changes: 8 additions & 0 deletions c_glib/arrow-flight-glib/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
arrow::flight::FlightDataStream *
gaflight_data_stream_get_raw(GAFlightDataStream *stream);

GAFlightMessageReader *
gaflight_message_reader_new_raw(
arrow::flight::FlightMessageReader *flight_reader,
gboolean is_owner);
arrow::flight::FlightMessageReader *
gaflight_message_reader_get_raw(GAFlightMessageReader *reader);


GAFlightServerCallContext *
gaflight_server_call_context_new_raw(
const arrow::flight::ServerCallContext *flight_call_context);
Expand Down

0 comments on commit 9fea4ee

Please sign in to comment.