diff --git a/c_glib/arrow-flight-glib/client.cpp b/c_glib/arrow-flight-glib/client.cpp index 7097195f5f92b..65bc2d56a51f6 100644 --- a/c_glib/arrow-flight-glib/client.cpp +++ b/c_glib/arrow-flight-glib/client.cpp @@ -689,16 +689,14 @@ gaflight_client_do_get(GAFlightClient *client, if (options) { flight_options = gaflight_call_options_get_raw(options); } - std::unique_ptr flight_reader; auto result = flight_client->DoGet(*flight_options, *flight_ticket); - auto status = std::move(result).Value(&flight_reader); - if (garrow::check(error, - status, - "[flight-client][do-get]")) { - return gaflight_stream_reader_new_raw(flight_reader.release()); - } else { - return NULL; + if (!garrow::check(error, + result, + "[flight-client][do-get]")) { + return nullptr; } + auto flight_reader = std::move(*result); + return gaflight_stream_reader_new_raw(flight_reader.release(), TRUE); } @@ -707,11 +705,13 @@ G_END_DECLS GAFlightStreamReader * gaflight_stream_reader_new_raw( - arrow::flight::FlightStreamReader *flight_reader) + arrow::flight::FlightStreamReader *flight_reader, + gboolean is_owner) { return GAFLIGHT_STREAM_READER( g_object_new(GAFLIGHT_TYPE_STREAM_READER, "reader", flight_reader, + "is-owner", is_owner, NULL)); } diff --git a/c_glib/arrow-flight-glib/client.hpp b/c_glib/arrow-flight-glib/client.hpp index 28d15ef2c404f..b6b768b8d3197 100644 --- a/c_glib/arrow-flight-glib/client.hpp +++ b/c_glib/arrow-flight-glib/client.hpp @@ -25,7 +25,9 @@ GAFlightStreamReader * -gaflight_stream_reader_new_raw(arrow::flight::FlightStreamReader *flight_reader); +gaflight_stream_reader_new_raw( + arrow::flight::FlightStreamReader *flight_reader, + gboolean is_owner); arrow::flight::FlightCallOptions * gaflight_call_options_get_raw(GAFlightCallOptions *options); diff --git a/c_glib/arrow-flight-glib/common.cpp b/c_glib/arrow-flight-glib/common.cpp index 952f7ec6bc1fc..5aee3483032ae 100644 --- a/c_glib/arrow-flight-glib/common.cpp +++ b/c_glib/arrow-flight-glib/common.cpp @@ -1223,10 +1223,12 @@ gaflight_stream_chunk_get_metadata(GAFlightStreamChunk *chunk) typedef struct GAFlightRecordBatchReaderPrivate_ { arrow::flight::MetadataRecordBatchReader *reader; + bool is_owner; } GAFlightRecordBatchReaderPrivate; enum { PROP_READER = 1, + PROP_IS_OWNER, }; G_DEFINE_TYPE_WITH_PRIVATE(GAFlightRecordBatchReader, @@ -1242,9 +1244,9 @@ static void gaflight_record_batch_reader_finalize(GObject *object) { auto priv = GAFLIGHT_RECORD_BATCH_READER_GET_PRIVATE(object); - - delete priv->reader; - + if (priv->is_owner) { + delete priv->reader; + } G_OBJECT_CLASS(gaflight_info_parent_class)->finalize(object); } @@ -1262,6 +1264,9 @@ gaflight_record_batch_reader_set_property(GObject *object, static_cast( g_value_get_pointer(value)); break; + case PROP_IS_OWNER: + priv->is_owner = g_value_get_boolean(value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; @@ -1283,11 +1288,19 @@ gaflight_record_batch_reader_class_init(GAFlightRecordBatchReaderClass *klass) GParamSpec *spec; spec = g_param_spec_pointer("reader", - "Reader", - "The raw arrow::flight::MetadataRecordBatchReader *", + nullptr, + nullptr, static_cast(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(gobject_class, PROP_READER, spec); + + spec = g_param_spec_boolean("is-owner", + nullptr, + nullptr, + TRUE, + static_cast(G_PARAM_WRITABLE | + G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(gobject_class, PROP_IS_OWNER, spec); } /** diff --git a/c_glib/arrow-flight-glib/server.cpp b/c_glib/arrow-flight-glib/server.cpp index 0df710df6f319..53ecbf6e08db5 100644 --- a/c_glib/arrow-flight-glib/server.cpp +++ b/c_glib/arrow-flight-glib/server.cpp @@ -41,6 +41,10 @@ G_BEGIN_DECLS * IPC payloads to be sent in `FlightData` protobuf messages by * #GArrowRecordBatchReader`. * + * #GAFlightMessageReader is a class for IPC payloads uploaded by a + * client. Also allows reading application-defined metadata via the + * Flight protocol. + * * #GAFlightServerAuthSender is a class for sending messages to the * client during an authentication handshake. * @@ -258,6 +262,37 @@ gaflight_record_batch_stream_new(GArrowRecordBatchReader *reader, } +G_DEFINE_TYPE(GAFlightMessageReader, + gaflight_message_reader, + GAFLIGHT_TYPE_RECORD_BATCH_READER) + +static void +gaflight_message_reader_init(GAFlightMessageReader *object) +{ +} + +static void +gaflight_message_reader_class_init(GAFlightMessageReaderClass *klass) +{ +} + +/** + * gaflight_message_reader_get_descriptor: + * @reader: A #GAFlightMessageReader. + * + * Returns: (transfer full): The descriptor for this upload. + * + * Since: 14.0.0 + */ +GAFlightDescriptor * +gaflight_message_reader_get_descriptor(GAFlightMessageReader *reader) +{ + auto flight_reader = gaflight_message_reader_get_raw(reader); + const auto &flight_descriptor = flight_reader->descriptor(); + return gaflight_descriptor_new_raw(&flight_descriptor); +} + + typedef struct GAFlightServerCallContextPrivate_ { arrow::flight::ServerCallContext *call_context; } GAFlightServerCallContextPrivate; @@ -1215,6 +1250,28 @@ gaflight_data_stream_get_raw(GAFlightDataStream *stream) return priv->stream; } + +GAFlightMessageReader * +gaflight_message_reader_new_raw( + arrow::flight::FlightMessageReader *flight_reader, + gboolean is_owner) +{ + return GAFLIGHT_MESSAGE_READER( + g_object_new(GAFLIGHT_TYPE_MESSAGE_READER, + "reader", flight_reader, + "is-owner", is_owner, + NULL)); +} + +arrow::flight::FlightMessageReader * +gaflight_message_reader_get_raw(GAFlightMessageReader *reader) +{ + auto flight_reader = + gaflight_record_batch_reader_get_raw(GAFLIGHT_RECORD_BATCH_READER(reader)); + return static_cast(flight_reader); +} + + GAFlightServerCallContext * gaflight_server_call_context_new_raw( const arrow::flight::ServerCallContext *flight_call_context) diff --git a/c_glib/arrow-flight-glib/server.h b/c_glib/arrow-flight-glib/server.h index 3ad93d05d28c3..7fa0dcf878000 100644 --- a/c_glib/arrow-flight-glib/server.h +++ b/c_glib/arrow-flight-glib/server.h @@ -55,6 +55,23 @@ gaflight_record_batch_stream_new(GArrowRecordBatchReader *reader, GArrowWriteOptions *options); +#define GAFLIGHT_TYPE_MESSAGE_READER \ + (gaflight_message_reader_get_type()) +G_DECLARE_DERIVABLE_TYPE(GAFlightMessageReader, + gaflight_message_reader, + GAFLIGHT, + MESSAGE_READER, + GAFlightRecordBatchReader) +struct _GAFlightMessageReaderClass +{ + GAFlightRecordBatchReaderClass parent_class; +}; + +GARROW_AVAILABLE_IN_14_0 +GAFlightDescriptor * +gaflight_message_reader_get_descriptor(GAFlightMessageReader *reader); + + #define GAFLIGHT_TYPE_SERVER_CALL_CONTEXT \ (gaflight_server_call_context_get_type()) G_DECLARE_DERIVABLE_TYPE(GAFlightServerCallContext, diff --git a/c_glib/arrow-flight-glib/server.hpp b/c_glib/arrow-flight-glib/server.hpp index 4cd9b4cf3444a..6b273dc9e5a8f 100644 --- a/c_glib/arrow-flight-glib/server.hpp +++ b/c_glib/arrow-flight-glib/server.hpp @@ -27,6 +27,14 @@ arrow::flight::FlightDataStream * gaflight_data_stream_get_raw(GAFlightDataStream *stream); +GAFlightMessageReader * +gaflight_message_reader_new_raw( + arrow::flight::FlightMessageReader *flight_reader, + gboolean is_owner); +arrow::flight::FlightMessageReader * +gaflight_message_reader_get_raw(GAFlightMessageReader *reader); + + GAFlightServerCallContext * gaflight_server_call_context_new_raw( const arrow::flight::ServerCallContext *flight_call_context); diff --git a/c_glib/arrow-flight-sql-glib/client.cpp b/c_glib/arrow-flight-sql-glib/client.cpp index f05319532c215..8e1cf11549710 100644 --- a/c_glib/arrow-flight-sql-glib/client.cpp +++ b/c_glib/arrow-flight-sql-glib/client.cpp @@ -32,11 +32,327 @@ G_BEGIN_DECLS * * #GAFlightSQLClient is a class for Apache Arrow Flight SQL client. * + * #GAFlightSQLPreparedStatement is a class for prepared statement. + * * Since: 9.0.0 */ +struct GAFlightSQLPreparedStatementPrivate { + std::shared_ptr statement; + GAFlightSQLClient *client; +}; + +enum { + PROP_STATEMENT = 1, + PROP_PREPARED_STATEMENT_CLIENT, +}; + +G_DEFINE_TYPE_WITH_PRIVATE(GAFlightSQLPreparedStatement, + gaflightsql_prepared_statement, + G_TYPE_OBJECT) + +#define GAFLIGHTSQL_PREPARED_STATEMENT_GET_PRIVATE(object) \ + static_cast( \ + gaflightsql_prepared_statement_get_instance_private( \ + GAFLIGHTSQL_PREPARED_STATEMENT(object))) + +static void +gaflightsql_prepared_statement_dispose(GObject *object) +{ + auto priv = GAFLIGHTSQL_PREPARED_STATEMENT_GET_PRIVATE(object); + + if (priv->client) { + g_object_unref(priv->client); + priv->client = nullptr; + } + + G_OBJECT_CLASS(gaflightsql_prepared_statement_parent_class)->dispose(object); +} + +static void +gaflightsql_prepared_statement_finalize(GObject *object) +{ + auto priv = GAFLIGHTSQL_PREPARED_STATEMENT_GET_PRIVATE(object); + priv->statement.~shared_ptr(); + G_OBJECT_CLASS(gaflightsql_prepared_statement_parent_class)->finalize(object); +} + +static void +gaflightsql_prepared_statement_set_property(GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + auto priv = GAFLIGHTSQL_PREPARED_STATEMENT_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_STATEMENT: + priv->statement = + *static_cast *>( + g_value_get_pointer(value)); + break; + case PROP_PREPARED_STATEMENT_CLIENT: + priv->client = GAFLIGHTSQL_CLIENT(g_value_dup_object(value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +gaflightsql_prepared_statement_get_property(GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + auto priv = GAFLIGHTSQL_PREPARED_STATEMENT_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_PREPARED_STATEMENT_CLIENT: + g_value_set_object(value, priv->client); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +gaflightsql_prepared_statement_init(GAFlightSQLPreparedStatement *object) +{ + auto priv = GAFLIGHTSQL_PREPARED_STATEMENT_GET_PRIVATE(object); + new(&priv->statement) std::shared_ptr; +} + +static void +gaflightsql_prepared_statement_class_init( + GAFlightSQLPreparedStatementClass *klass) +{ + auto gobject_class = G_OBJECT_CLASS(klass); + + gobject_class->dispose = gaflightsql_prepared_statement_dispose; + gobject_class->finalize = gaflightsql_prepared_statement_finalize; + gobject_class->set_property = gaflightsql_prepared_statement_set_property; + gobject_class->get_property = gaflightsql_prepared_statement_get_property; + + GParamSpec *spec; + spec = g_param_spec_pointer("statement", + nullptr, + nullptr, + static_cast(G_PARAM_WRITABLE | + G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(gobject_class, PROP_STATEMENT, spec); + + /** + * GAFlightSQLPreparedStatement:client: + * + * The underlying Flight SQL client. + * + * Since: 14.0.0 + */ + spec = g_param_spec_object("client", + nullptr, + nullptr, + GAFLIGHTSQL_TYPE_CLIENT, + static_cast(G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(gobject_class, + PROP_PREPARED_STATEMENT_CLIENT, + spec); +} + +/** + * gaflightsql_prepared_statement_execute: + * @statement: A #GAFlightSQLPreparedStatement. + * @options: (nullable): A #GAFlightCallOptions. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: (nullable) (transfer full): The #GAFlightInfo describing + * where to access the dataset on success, %NULL on error. + * + * Since: 14.0.0 + */ +GAFlightInfo * +gaflightsql_prepared_statement_execute(GAFlightSQLPreparedStatement *statement, + GAFlightCallOptions *options, + GError **error) +{ + auto flight_sql_statement = gaflightsql_prepared_statement_get_raw(statement); + arrow::flight::FlightCallOptions flight_default_options; + auto flight_options = &flight_default_options; + if (options) { + flight_options = gaflight_call_options_get_raw(options); + } + auto result = flight_sql_statement->Execute(*flight_options); + if (!garrow::check(error, + result, + "[flight-sql-prepared-statement][execute]")) { + return nullptr; + } + auto flight_info = std::move(*result); + return gaflight_info_new_raw(flight_info.release()); +} + +/** + * gaflightsql_prepared_statement_execute_update: + * @statement: A #GAFlightSQLPreparedStatement. + * @options: (nullable): A #GAFlightCallOptions. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: The number of changed records. + * + * Since: 14.0.0 + */ +gint64 +gaflightsql_prepared_statement_execute_update( + GAFlightSQLPreparedStatement *statement, + GAFlightCallOptions *options, + GError **error) +{ + auto flight_sql_statement = gaflightsql_prepared_statement_get_raw(statement); + arrow::flight::FlightCallOptions flight_default_options; + auto flight_options = &flight_default_options; + if (options) { + flight_options = gaflight_call_options_get_raw(options); + } + auto result = flight_sql_statement->ExecuteUpdate(*flight_options); + if (!garrow::check(error, + result, + "[flight-sql-prepared-statement][execute-update]")) { + return 0; + } + return *result; +} + +/** + * gaflightsql_prepared_statement_get_parameter_schema: + * @statement: A #GAFlightSQLPreparedStatement. + * + * Returns: (nullable) (transfer full): The #GArrowSchema for parameter. + * + * Since: 14.0.0 + */ +GArrowSchema * +gaflightsql_prepared_statement_get_parameter_schema( + GAFlightSQLPreparedStatement *statement) +{ + auto flight_sql_statement = gaflightsql_prepared_statement_get_raw(statement); + auto arrow_schema = flight_sql_statement->parameter_schema(); + return garrow_schema_new_raw(&arrow_schema); +} + +/** + * gaflightsql_prepared_statement_get_dataset_schema: + * @statement: A #GAFlightSQLPreparedStatement. + * + * Returns: (nullable) (transfer full): The #GArrowSchema for dataset. + * + * Since: 14.0.0 + */ +GArrowSchema * +gaflightsql_prepared_statement_get_dataset_schema( + GAFlightSQLPreparedStatement *statement) +{ + auto flight_sql_statement = gaflightsql_prepared_statement_get_raw(statement); + auto arrow_schema = flight_sql_statement->dataset_schema(); + return garrow_schema_new_raw(&arrow_schema); +} + +/** + * gaflightsql_prepared_statement_set_record_batch: + * @statement: A #GAFlightSQLPreparedStatement. + * @record_batch: A #GArrowRecordBatch that contains the parameters that + * will be bound. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: %TRUE on success, %FALSE otherwise. + * + * Since: 14.0.0 + */ +gboolean +gaflightsql_prepared_statement_set_record_batch( + GAFlightSQLPreparedStatement *statement, + GArrowRecordBatch *record_batch, + GError **error) +{ + auto flight_sql_statement = gaflightsql_prepared_statement_get_raw(statement); + auto arrow_record_batch = garrow_record_batch_get_raw(record_batch); + return garrow::check(error, + flight_sql_statement->SetParameters(arrow_record_batch), + "[flight-sql-prepared-statement][set-record-batch]"); +} + +/** + * gaflightsql_prepared_statement_set_record_batch_reader: + * @statement: A #GAFlightSQLPreparedStatement. + * @reader: A #GArrowRecordBatchReader that contains the parameters that + * will be bound. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: %TRUE on success, %FALSE otherwise. + * + * Since: 14.0.0 + */ +gboolean +gaflightsql_prepared_statement_set_record_batch_reader( + GAFlightSQLPreparedStatement *statement, + GArrowRecordBatchReader *reader, + GError **error) +{ + auto flight_sql_statement = gaflightsql_prepared_statement_get_raw(statement); + auto arrow_reader = garrow_record_batch_reader_get_raw(reader); + return garrow::check(error, + flight_sql_statement->SetParameters(arrow_reader), + "[flight-sql-prepared-statement][set-record-batch-reader]"); +} + +/** + * gaflightsql_prepared_statement_close: + * @statement: A #GAFlightSQLPreparedStatement. + * @options: (nullable): A #GAFlightCallOptions. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: %TRUE on success, %FALSE otherwise. + * + * After this, the prepared statement may not be used anymore. + * + * Since: 14.0.0 + */ +gboolean +gaflightsql_prepared_statement_close(GAFlightSQLPreparedStatement *statement, + GAFlightCallOptions *options, + GError **error) +{ + auto flight_sql_statement = gaflightsql_prepared_statement_get_raw(statement); + arrow::flight::FlightCallOptions flight_default_options; + auto flight_options = &flight_default_options; + if (options) { + flight_options = gaflight_call_options_get_raw(options); + } + return garrow::check(error, + flight_sql_statement->Close(*flight_options), + "[flight-sql-prepared-statement][close]"); +} + +/** + * gaflightsql_prepared_statement_is_closed: + * @statement: A #GAFlightSQLPreparedStatement. + * + * Returns: Whether the prepared statement is closed or not. + * + * Since: 14.0.0 + */ +gboolean +gaflightsql_prepared_statement_is_closed(GAFlightSQLPreparedStatement *statement) +{ + auto flight_sql_statement = gaflightsql_prepared_statement_get_raw(statement); + return flight_sql_statement->IsClosed(); +} + + struct GAFlightSQLClientPrivate { - arrow::flight::sql::FlightSqlClient* client; + arrow::flight::sql::FlightSqlClient *client; GAFlightClient *flight_client; }; @@ -273,20 +589,68 @@ gaflightsql_client_do_get(GAFlightSQLClient *client, return nullptr; } auto flight_reader = std::move(*result); - return gaflight_stream_reader_new_raw(flight_reader.release()); + return gaflight_stream_reader_new_raw(flight_reader.release(), TRUE); +} + +/** + * gaflightsql_client_prepare: + * @client: A #GAFlightSQLClient. + * @query: A query to be prepared in the UTF-8 format. + * @options: (nullable): A #GAFlightCallOptions. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: (nullable) (transfer full): The #GAFlightSQLPreparedStatement + * on success, %NULL on error. + * + * Since: 14.0.0 + */ +GAFlightSQLPreparedStatement * +gaflightsql_client_prepare(GAFlightSQLClient *client, + const gchar *query, + GAFlightCallOptions *options, + GError **error) +{ + auto flight_sql_client = gaflightsql_client_get_raw(client); + arrow::flight::FlightCallOptions flight_default_options; + auto flight_options = &flight_default_options; + if (options) { + flight_options = gaflight_call_options_get_raw(options); + } + auto result = flight_sql_client->Prepare(*flight_options, query); + if (!garrow::check(error, + result, + "[flight-sql-client][prepare]")) { + return nullptr; + } + auto flight_sql_statement = std::move(*result); + return gaflightsql_prepared_statement_new_raw(&flight_sql_statement, + client); } G_END_DECLS -arrow::flight::sql::FlightSqlClient * -gaflightsql_client_get_raw(GAFlightSQLClient *client) +GAFlightSQLPreparedStatement * +gaflightsql_prepared_statement_new_raw( + std::shared_ptr *flight_sql_statement, + GAFlightSQLClient *client) { - auto priv = GAFLIGHTSQL_CLIENT_GET_PRIVATE(client); - return priv->client; + return GAFLIGHTSQL_PREPARED_STATEMENT( + g_object_new(GAFLIGHTSQL_TYPE_PREPARED_STATEMENT, + "statement", flight_sql_statement, + "client", client, + nullptr)); +} + +std::shared_ptr +gaflightsql_prepared_statement_get_raw(GAFlightSQLPreparedStatement *statement) +{ + auto priv = GAFLIGHTSQL_PREPARED_STATEMENT_GET_PRIVATE(statement); + return priv->statement; } + GAFlightSQLClient * gaflightsql_client_new_raw( arrow::flight::sql::FlightSqlClient *flight_sql_client, @@ -298,3 +662,10 @@ gaflightsql_client_new_raw( "flight_client", client, nullptr)); } + +arrow::flight::sql::FlightSqlClient * +gaflightsql_client_get_raw(GAFlightSQLClient *client) +{ + auto priv = GAFLIGHTSQL_CLIENT_GET_PRIVATE(client); + return priv->client; +} diff --git a/c_glib/arrow-flight-sql-glib/client.h b/c_glib/arrow-flight-sql-glib/client.h index 6374fece2209a..f2a025fef099b 100644 --- a/c_glib/arrow-flight-sql-glib/client.h +++ b/c_glib/arrow-flight-sql-glib/client.h @@ -24,6 +24,69 @@ G_BEGIN_DECLS +#define GAFLIGHTSQL_TYPE_PREPARED_STATEMENT \ + (gaflightsql_prepared_statement_get_type()) +G_DECLARE_DERIVABLE_TYPE(GAFlightSQLPreparedStatement, + gaflightsql_prepared_statement, + GAFLIGHTSQL, + PREPARED_STATEMENT, + GObject) +struct _GAFlightSQLPreparedStatementClass +{ + GObjectClass parent_class; +}; + +GARROW_AVAILABLE_IN_14_0 +GAFlightInfo * +gaflightsql_prepared_statement_execute( + GAFlightSQLPreparedStatement *statement, + GAFlightCallOptions *options, + GError **error); + +GARROW_AVAILABLE_IN_14_0 +gint64 +gaflightsql_prepared_statement_execute_update( + GAFlightSQLPreparedStatement *statement, + GAFlightCallOptions *options, + GError **error); + +GARROW_AVAILABLE_IN_14_0 +GArrowSchema * +gaflightsql_prepared_statement_get_parameter_schema( + GAFlightSQLPreparedStatement *statement); + +GARROW_AVAILABLE_IN_14_0 +GArrowSchema * +gaflightsql_prepared_statement_get_dataset_schema( + GAFlightSQLPreparedStatement *statement); + +GARROW_AVAILABLE_IN_14_0 +gboolean +gaflightsql_prepared_statement_set_record_batch( + GAFlightSQLPreparedStatement *statement, + GArrowRecordBatch *record_batch, + GError **error); + +GARROW_AVAILABLE_IN_14_0 +gboolean +gaflightsql_prepared_statement_set_record_batch_reader( + GAFlightSQLPreparedStatement *statement, + GArrowRecordBatchReader *reader, + GError **error); + +GARROW_AVAILABLE_IN_14_0 +gboolean +gaflightsql_prepared_statement_close( + GAFlightSQLPreparedStatement *statement, + GAFlightCallOptions *options, + GError **error); + +GARROW_AVAILABLE_IN_14_0 +gboolean +gaflightsql_prepared_statement_is_closed( + GAFlightSQLPreparedStatement *statement); + + #define GAFLIGHTSQL_TYPE_CLIENT (gaflightsql_client_get_type()) G_DECLARE_DERIVABLE_TYPE(GAFlightSQLClient, gaflightsql_client, @@ -60,5 +123,12 @@ gaflightsql_client_do_get(GAFlightSQLClient *client, GAFlightCallOptions *options, GError **error); +GARROW_AVAILABLE_IN_14_0 +GAFlightSQLPreparedStatement * +gaflightsql_client_prepare(GAFlightSQLClient *client, + const gchar *query, + GAFlightCallOptions *options, + GError **error); + G_END_DECLS diff --git a/c_glib/arrow-flight-sql-glib/client.hpp b/c_glib/arrow-flight-sql-glib/client.hpp index d9c04e51088eb..09136f8819d52 100644 --- a/c_glib/arrow-flight-sql-glib/client.hpp +++ b/c_glib/arrow-flight-sql-glib/client.hpp @@ -24,9 +24,16 @@ #include -arrow::flight::sql::FlightSqlClient * -gaflightsql_client_get_raw(GAFlightSQLClient *client); +GAFlightSQLPreparedStatement * +gaflightsql_prepared_statement_new_raw( + std::shared_ptr *flight_sql_statement, + GAFlightSQLClient *client); +std::shared_ptr +gaflightsql_prepared_statement_get_raw(GAFlightSQLPreparedStatement *statement); + GAFlightSQLClient * gaflightsql_client_new_raw( arrow::flight::sql::FlightSqlClient *flight_sql_client, GAFlightClient *client); +arrow::flight::sql::FlightSqlClient * +gaflightsql_client_get_raw(GAFlightSQLClient *client); diff --git a/c_glib/arrow-flight-sql-glib/server.cpp b/c_glib/arrow-flight-sql-glib/server.cpp index 750dff2232c20..518b771732a0f 100644 --- a/c_glib/arrow-flight-sql-glib/server.cpp +++ b/c_glib/arrow-flight-sql-glib/server.cpp @@ -33,6 +33,18 @@ G_BEGIN_DECLS * @title: Server related classes * @include: arrow-flight-sql-glib/arrow-flight-sql-glib.h * + * #GAFlightSQLPreparedStatementUpdate is a class for a request + * that executes an update SQL prepared statement. + * + * #GAFlightSQLCreatePreparedStatementRequest is a class for a request + * that creates a SQL prepared statement. + * + * #GAFlightSQLCreatePreparedStatementResult is a class for a result + * of the request that creates a SQL prepared statement. + * + * #GAFlightSQLClosePreparedStatementRequest is a class for a request + * that closes a SQL prepared statement. + * * #GAFlightSQLServer is a class to develop an Apache Arrow Flight SQL * server. * @@ -156,6 +168,41 @@ gaflightsql_statement_update_get_query(GAFlightSQLStatementUpdate *command) } +G_DEFINE_TYPE(GAFlightSQLPreparedStatementUpdate, + gaflightsql_prepared_statement_update, + GAFLIGHTSQL_TYPE_COMMAND) + +static void +gaflightsql_prepared_statement_update_init( + GAFlightSQLPreparedStatementUpdate *object) +{ +} + +static void +gaflightsql_prepared_statement_update_class_init( + GAFlightSQLPreparedStatementUpdateClass *klass) +{ +} + +/** + * gaflightsql_prepared_statement_update_get_handle: + * @command: A #GAFlightSQLPreparedStatementUpdate. + * + * Returns: (transfer full): The server-generated opaque identifier + * for the statement. + * + * Since: 14.0.0 + */ +GBytes * +gaflightsql_prepared_statement_update_get_handle( + GAFlightSQLPreparedStatementUpdate *command) +{ + auto update = gaflightsql_prepared_statement_update_get_raw(command); + return g_bytes_new_static(update->prepared_statement_handle.data(), + update->prepared_statement_handle.size()); +} + + G_DEFINE_TYPE(GAFlightSQLStatementQueryTicket, gaflightsql_statement_query_ticket, GAFLIGHTSQL_TYPE_COMMAND) @@ -221,6 +268,341 @@ gaflightsql_statement_query_ticket_get_handle( } +struct GAFlightSQLCreatePreparedStatementRequestPrivate { + arrow::flight::sql::ActionCreatePreparedStatementRequest *request; +}; + +enum { + PROP_REQUEST = 1, +}; + +G_DEFINE_TYPE_WITH_PRIVATE(GAFlightSQLCreatePreparedStatementRequest, + gaflightsql_create_prepared_statement_request, + G_TYPE_OBJECT) + +#define GAFLIGHTSQL_CREATE_PREPARED_STATEMENT_REQUEST_GET_PRIVATE(object) \ + static_cast( \ + gaflightsql_create_prepared_statement_request_get_instance_private( \ + GAFLIGHTSQL_CREATE_PREPARED_STATEMENT_REQUEST(object))) + +static void +gaflightsql_create_prepared_statement_request_set_property(GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + auto priv = GAFLIGHTSQL_CREATE_PREPARED_STATEMENT_REQUEST_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_REQUEST: + priv->request = + static_cast( + g_value_get_pointer(value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +gaflightsql_create_prepared_statement_request_init( + GAFlightSQLCreatePreparedStatementRequest *object) +{ +} + +static void +gaflightsql_create_prepared_statement_request_class_init( + GAFlightSQLCreatePreparedStatementRequestClass *klass) +{ + auto gobject_class = G_OBJECT_CLASS(klass); + gobject_class->set_property = + gaflightsql_create_prepared_statement_request_set_property; + + GParamSpec *spec; + spec = g_param_spec_pointer("request", + nullptr, + nullptr, + static_cast(G_PARAM_WRITABLE | + G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(gobject_class, PROP_REQUEST, spec); +} + +/** + * gaflightsql_create_prepared_statement_request_get_query: + * @request: A #GAFlightSQLCreatePreparedStatementRequest. + * + * Returns: The SQL query to be prepared. + * + * Since: 14.0.0 + */ +const gchar * +gaflightsql_create_prepared_statement_request_get_query( + GAFlightSQLCreatePreparedStatementRequest *request) +{ + auto priv = GAFLIGHTSQL_CREATE_PREPARED_STATEMENT_REQUEST_GET_PRIVATE(request); + return priv->request->query.c_str(); +} + +/** + * gaflightsql_create_prepared_statement_request_get_transaction_id: + * @request: A #GAFlightSQLCreatePreparedStatementRequest. + * + * Returns: The transaction ID, if specified (else a blank string). + * + * Since: 14.0.0 + */ +const gchar * +gaflightsql_create_prepared_statement_request_get_transaction_id( + GAFlightSQLCreatePreparedStatementRequest *request) +{ + auto priv = GAFLIGHTSQL_CREATE_PREPARED_STATEMENT_REQUEST_GET_PRIVATE(request); + return priv->request->transaction_id.c_str(); +} + + +struct GAFlightSQLCreatePreparedStatementResultPrivate { + arrow::flight::sql::ActionCreatePreparedStatementResult result; +}; + +G_DEFINE_TYPE_WITH_PRIVATE(GAFlightSQLCreatePreparedStatementResult, + gaflightsql_create_prepared_statement_result, + G_TYPE_OBJECT) + +#define GAFLIGHTSQL_CREATE_PREPARED_STATEMENT_RESULT_GET_PRIVATE(object) \ + static_cast( \ + gaflightsql_create_prepared_statement_result_get_instance_private( \ + GAFLIGHTSQL_CREATE_PREPARED_STATEMENT_RESULT(object))) + +static void +gaflightsql_create_prepared_statement_result_finalize(GObject *object) +{ + auto priv = GAFLIGHTSQL_CREATE_PREPARED_STATEMENT_RESULT_GET_PRIVATE(object); + priv->result.~ActionCreatePreparedStatementResult(); + G_OBJECT_CLASS(gaflightsql_create_prepared_statement_result_parent_class)->finalize(object); +} + +static void +gaflightsql_create_prepared_statement_result_init( + GAFlightSQLCreatePreparedStatementResult *object) +{ + auto priv = GAFLIGHTSQL_CREATE_PREPARED_STATEMENT_RESULT_GET_PRIVATE(object); + new(&(priv->result)) arrow::flight::sql::ActionCreatePreparedStatementResult(); +} + +static void +gaflightsql_create_prepared_statement_result_class_init( + GAFlightSQLCreatePreparedStatementResultClass *klass) +{ + auto gobject_class = G_OBJECT_CLASS(klass); + gobject_class->finalize = + gaflightsql_create_prepared_statement_result_finalize; +} + +/** + * gaflightsql_create_prepared_statement_result_new: + * + * Returns:: The newly created #GAFlightSQLCreatePreparedStatementResult. + * + * Since: 14.0.0 + */ +GAFlightSQLCreatePreparedStatementResult * +gaflightsql_create_prepared_statement_result_new(void) +{ + return GAFLIGHTSQL_CREATE_PREPARED_STATEMENT_RESULT( + g_object_new(GAFLIGHTSQL_TYPE_CREATE_PREPARED_STATEMENT_RESULT, + nullptr)); +} + +/** + * gaflightsql_create_prepared_statement_result_set_dataset_schema: + * @result: A #GAFlightSQLCreatePreparedStatementResult. + * @schema: A #GArrowSchema of dataset. + * + * Since: 14.0.0 + */ +void +gaflightsql_create_prepared_statement_result_set_dataset_schema( + GAFlightSQLCreatePreparedStatementResult *result, + GArrowSchema *schema) +{ + auto priv = + GAFLIGHTSQL_CREATE_PREPARED_STATEMENT_RESULT_GET_PRIVATE(result); + priv->result.dataset_schema = garrow_schema_get_raw(schema); +} + +/** + * gaflightsql_create_prepared_statement_result_get_dataset_schema: + * @result: A #GAFlightSQLCreatePreparedStatementResult. + * + * Returns: (nullable) (transfer full): The current dataset schema. + * + * Since: 14.0.0 + */ +GArrowSchema * +gaflightsql_create_prepared_statement_result_get_dataset_schema( + GAFlightSQLCreatePreparedStatementResult *result) +{ + auto priv = + GAFLIGHTSQL_CREATE_PREPARED_STATEMENT_RESULT_GET_PRIVATE(result); + if (!priv->result.dataset_schema) { + return nullptr; + } + return garrow_schema_new_raw(&(priv->result.dataset_schema)); +} + +/** + * gaflightsql_create_prepared_statement_result_set_parameter_schema: + * @result: A #GAFlightSQLCreatePreparedStatementResult. + * @schema: A #GArrowSchema of parameter. + * + * Since: 14.0.0 + */ +void +gaflightsql_create_prepared_statement_result_set_parameter_schema( + GAFlightSQLCreatePreparedStatementResult *result, + GArrowSchema *schema) +{ + auto priv = + GAFLIGHTSQL_CREATE_PREPARED_STATEMENT_RESULT_GET_PRIVATE(result); + priv->result.parameter_schema = garrow_schema_get_raw(schema); +} + +/** + * gaflightsql_create_prepared_statement_result_get_parameter_schema: + * @result: A #GAFlightSQLCreatePreparedStatementResult. + * + * Returns: (nullable) (transfer full): The current parameter schema. + * + * Since: 14.0.0 + */ +GArrowSchema * +gaflightsql_create_prepared_statement_result_get_parameter_schema( + GAFlightSQLCreatePreparedStatementResult *result) +{ + auto priv = + GAFLIGHTSQL_CREATE_PREPARED_STATEMENT_RESULT_GET_PRIVATE(result); + if (!priv->result.parameter_schema) { + return nullptr; + } + return garrow_schema_new_raw(&(priv->result.parameter_schema)); +} + +/** + * gaflightsql_create_prepared_statement_result_set_handle: + * @result: A #GAFlightSQLCreatePreparedStatementResult. + * @handle: A #GBytes for server-generated opaque identifier. + * + * Since: 14.0.0 + */ +void +gaflightsql_create_prepared_statement_result_set_handle( + GAFlightSQLCreatePreparedStatementResult *result, + GBytes *handle) +{ + auto priv = + GAFLIGHTSQL_CREATE_PREPARED_STATEMENT_RESULT_GET_PRIVATE(result); + size_t handle_size; + auto handle_data = g_bytes_get_data(handle, &handle_size); + priv->result.prepared_statement_handle = + std::string(static_cast(handle_data), handle_size); +} + +/** + * gaflightsql_create_prepared_statement_result_get_handle: + * @result: A #GAFlightSQLCreatePreparedStatementResult. + * + * Returns: (transfer full): The current server-generated opaque + * identifier. + * + * Since: 14.0.0 + */ +GBytes * +gaflightsql_create_prepared_statement_result_get_handle( + GAFlightSQLCreatePreparedStatementResult *result) +{ + auto priv = + GAFLIGHTSQL_CREATE_PREPARED_STATEMENT_RESULT_GET_PRIVATE(result); + return g_bytes_new_static(priv->result.prepared_statement_handle.data(), + priv->result.prepared_statement_handle.length()); +} + + +struct GAFlightSQLClosePreparedStatementRequestPrivate { + arrow::flight::sql::ActionClosePreparedStatementRequest *request; +}; + +G_DEFINE_TYPE_WITH_PRIVATE(GAFlightSQLClosePreparedStatementRequest, + gaflightsql_close_prepared_statement_request, + G_TYPE_OBJECT) + +#define GAFLIGHTSQL_CLOSE_PREPARED_STATEMENT_REQUEST_GET_PRIVATE(object) \ + static_cast( \ + gaflightsql_close_prepared_statement_request_get_instance_private( \ + GAFLIGHTSQL_CLOSE_PREPARED_STATEMENT_REQUEST(object))) + +static void +gaflightsql_close_prepared_statement_request_set_property(GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + auto priv = GAFLIGHTSQL_CLOSE_PREPARED_STATEMENT_REQUEST_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_REQUEST: + priv->request = + static_cast( + g_value_get_pointer(value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +gaflightsql_close_prepared_statement_request_init( + GAFlightSQLClosePreparedStatementRequest *object) +{ +} + +static void +gaflightsql_close_prepared_statement_request_class_init( + GAFlightSQLClosePreparedStatementRequestClass *klass) +{ + auto gobject_class = G_OBJECT_CLASS(klass); + gobject_class->set_property = + gaflightsql_close_prepared_statement_request_set_property; + + GParamSpec *spec; + spec = g_param_spec_pointer("request", + nullptr, + nullptr, + static_cast(G_PARAM_WRITABLE | + G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(gobject_class, PROP_REQUEST, spec); +} + +/** + * gaflightsql_close_prepared_statement_request_get_handle: + * @request: A #GAFlightSQLClosePreparedStatementRequest. + * + * Returns: (transfer full): The server-generated opaque identifier + * for the statement. + * + * Since: 14.0.0 + */ +GBytes * +gaflightsql_close_prepared_statement_request_get_handle( + GAFlightSQLClosePreparedStatementRequest *request) +{ + auto priv = GAFLIGHTSQL_CLOSE_PREPARED_STATEMENT_REQUEST_GET_PRIVATE(request); + return g_bytes_new_static(priv->request->prepared_statement_handle.data(), + priv->request->prepared_statement_handle.length()); +} + + G_END_DECLS namespace gaflightsql { class Server : public arrow::flight::sql::FlightSqlServerBase { @@ -283,7 +665,7 @@ namespace gaflightsql { arrow::Result DoPutCommandStatementUpdate( const arrow::flight::ServerCallContext &context, - const arrow::flight::sql::StatementUpdate& command) override { + const arrow::flight::sql::StatementUpdate &command) override { auto gacontext = gaflight_server_call_context_new_raw(&context); auto gacommand = gaflightsql_statement_update_new_raw(&command); GError *gerror = nullptr; @@ -303,6 +685,83 @@ namespace gaflightsql { return n_changed_records; } + arrow::Result + DoPutPreparedStatementUpdate( + const arrow::flight::ServerCallContext &context, + const arrow::flight::sql::PreparedStatementUpdate &command, + arrow::flight::FlightMessageReader *reader) override { + auto gacontext = gaflight_server_call_context_new_raw(&context); + auto gacommand = gaflightsql_prepared_statement_update_new_raw(&command); + auto gareader = gaflight_message_reader_new_raw(reader, FALSE); + GError *gerror = nullptr; + auto n_changed_records = + gaflightsql_server_do_put_prepared_statement_update(gaserver_, + gacontext, + gacommand, + gareader, + &gerror); + g_object_unref(gareader); + g_object_unref(gacommand); + g_object_unref(gacontext); + if (gerror) { + return garrow_error_to_status( + gerror, + arrow::StatusCode::UnknownError, + "[flight-sql-server][do-put-prepared-statement-update]"); + } + return n_changed_records; + } + + arrow::Result + CreatePreparedStatement( + const arrow::flight::ServerCallContext &context, + const arrow::flight::sql::ActionCreatePreparedStatementRequest &request) override { + auto gacontext = gaflight_server_call_context_new_raw(&context); + auto garequest = gaflightsql_create_prepared_statement_request_new_raw(&request); + GError *gerror = nullptr; + auto garesult = + gaflightsql_server_create_prepared_statement(gaserver_, + gacontext, + garequest, + &gerror); + g_object_unref(garequest); + g_object_unref(gacontext); + if (gerror) { + return garrow_error_to_status( + gerror, + arrow::StatusCode::UnknownError, + "[flight-sql-server][create-prepared-statement]"); + } + auto garesult_priv = + GAFLIGHTSQL_CREATE_PREPARED_STATEMENT_RESULT_GET_PRIVATE(garesult); + auto flightsql_result = garesult_priv->result; + g_object_unref(garesult); + return flightsql_result; + } + + arrow::Status + ClosePreparedStatement( + const arrow::flight::ServerCallContext &context, + const arrow::flight::sql::ActionClosePreparedStatementRequest &request) override { + auto gacontext = gaflight_server_call_context_new_raw(&context); + auto garequest = gaflightsql_close_prepared_statement_request_new_raw(&request); + GError *gerror = nullptr; + gaflightsql_server_close_prepared_statement(gaserver_, + gacontext, + garequest, + &gerror); + g_object_unref(garequest); + g_object_unref(gacontext); + if (gerror) { + return garrow_error_to_status( + gerror, + arrow::StatusCode::UnknownError, + "[flight-sql-server][close-prepared-statement]"); + } else { + return arrow::Status::OK(); + } + } + private: GAFlightSQLServer *gaserver_; }; @@ -351,7 +810,6 @@ gaflightsql_server_finalize(GObject *object) { auto priv = GAFLIGHTSQL_SERVER_GET_PRIVATE(object); priv->server.~Server(); - G_OBJECT_CLASS(gaflightsql_server_parent_class)->finalize(object); } @@ -463,6 +921,97 @@ gaflightsql_server_do_put_command_statement_update( return klass->do_put_command_statement_update(server, context, command, error); } +/** + * gaflightsql_server_do_put_prepared_statement_update: + * @server: A #GAFlightServer. + * @context: A #GAFlightServerCallContext. + * @command: A #GAFlightSQLPreparedStatementUpdate. + * @reader: A #GAFlightMessageReader that reads uploaded record batches. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: The number of changed records. + * + * Since: 14.0.0 + */ +gint64 +gaflightsql_server_do_put_prepared_statement_update( + GAFlightSQLServer *server, + GAFlightServerCallContext *context, + GAFlightSQLPreparedStatementUpdate *command, + GAFlightMessageReader *reader, + GError **error) +{ + auto klass = GAFLIGHTSQL_SERVER_GET_CLASS(server); + if (!(klass && klass->do_put_prepared_statement_update)) { + g_set_error(error, + GARROW_ERROR, + GARROW_ERROR_NOT_IMPLEMENTED, + "not implemented"); + return 0; + } + return klass->do_put_prepared_statement_update( + server, context, command, reader, error); +} + +/** + * gaflightsql_server_create_prepared_statement: + * @server: A #GAFlightServer. + * @context: A #GAFlightServerCallContext. + * @request: A #GAFlightSQLCreatePreparedStatementRequest. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: (nullable) (transfer full): A + * #GAFlightSQLCreatePreparedStatementResult containing the dataset + * and parameter schemas and a handle for created statement on + * success, %NULL on error. + * + * Since: 14.0.0 + */ +GAFlightSQLCreatePreparedStatementResult * +gaflightsql_server_create_prepared_statement( + GAFlightSQLServer *server, + GAFlightServerCallContext *context, + GAFlightSQLCreatePreparedStatementRequest *request, + GError **error) +{ + auto klass = GAFLIGHTSQL_SERVER_GET_CLASS(server); + if (!(klass && klass->create_prepared_statement)) { + g_set_error(error, + GARROW_ERROR, + GARROW_ERROR_NOT_IMPLEMENTED, + "not implemented"); + return nullptr; + } + return klass->create_prepared_statement(server, context, request, error); +} + +/** + * gaflightsql_server_close_prepared_statement: + * @server: A #GAFlightServer. + * @context: A #GAFlightServerCallContext. + * @request: A #GAFlightSQLClosePreparedStatementRequest. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Since: 14.0.0 + */ +void +gaflightsql_server_close_prepared_statement( + GAFlightSQLServer *server, + GAFlightServerCallContext *context, + GAFlightSQLClosePreparedStatementRequest *request, + GError **error) +{ + auto klass = GAFLIGHTSQL_SERVER_GET_CLASS(server); + if (!(klass && klass->close_prepared_statement)) { + g_set_error(error, + GARROW_ERROR, + GARROW_ERROR_NOT_IMPLEMENTED, + "not implemented"); + return; + } + return klass->close_prepared_statement(server, context, request, error); +} + G_END_DECLS @@ -503,6 +1052,26 @@ gaflightsql_statement_update_get_raw(GAFlightSQLStatementUpdate *command) } +GAFlightSQLPreparedStatementUpdate * +gaflightsql_prepared_statement_update_new_raw( + const arrow::flight::sql::PreparedStatementUpdate *flight_command) +{ + return GAFLIGHTSQL_PREPARED_STATEMENT_UPDATE( + g_object_new(GAFLIGHTSQL_TYPE_PREPARED_STATEMENT_UPDATE, + "command", flight_command, + nullptr)); +} + +const arrow::flight::sql::PreparedStatementUpdate * +gaflightsql_prepared_statement_update_get_raw( + GAFlightSQLPreparedStatementUpdate *command) +{ + auto priv = GAFLIGHTSQL_COMMAND_GET_PRIVATE(command); + return static_cast( + priv->command); +} + + GAFlightSQLStatementQueryTicket * gaflightsql_statement_query_ticket_new_raw( const arrow::flight::sql::StatementQueryTicket *flight_command) @@ -521,3 +1090,41 @@ gaflightsql_statement_query_ticket_get_raw( return static_cast( priv->command); } + + +GAFlightSQLCreatePreparedStatementRequest * +gaflightsql_create_prepared_statement_request_new_raw( + const arrow::flight::sql::ActionCreatePreparedStatementRequest *flight_request) +{ + return GAFLIGHTSQL_CREATE_PREPARED_STATEMENT_REQUEST( + g_object_new(GAFLIGHTSQL_TYPE_CREATE_PREPARED_STATEMENT_REQUEST, + "request", flight_request, + nullptr)); +} + +const arrow::flight::sql::ActionCreatePreparedStatementRequest * +gaflightsql_create_prepared_statement_request_get_raw( + GAFlightSQLCreatePreparedStatementRequest *request) +{ + auto priv = GAFLIGHTSQL_CREATE_PREPARED_STATEMENT_REQUEST_GET_PRIVATE(request); + return priv->request; +} + + +GAFlightSQLClosePreparedStatementRequest * +gaflightsql_close_prepared_statement_request_new_raw( + const arrow::flight::sql::ActionClosePreparedStatementRequest *flight_request) +{ + return GAFLIGHTSQL_CLOSE_PREPARED_STATEMENT_REQUEST( + g_object_new(GAFLIGHTSQL_TYPE_CLOSE_PREPARED_STATEMENT_REQUEST, + "request", flight_request, + nullptr)); +} + +const arrow::flight::sql::ActionClosePreparedStatementRequest * +gaflightsql_close_prepared_statement_request_get_raw( + GAFlightSQLClosePreparedStatementRequest *request) +{ + auto priv = GAFLIGHTSQL_CLOSE_PREPARED_STATEMENT_REQUEST_GET_PRIVATE(request); + return priv->request; +} diff --git a/c_glib/arrow-flight-sql-glib/server.h b/c_glib/arrow-flight-sql-glib/server.h index 60e5b300d4e84..106b6e40db38f 100644 --- a/c_glib/arrow-flight-sql-glib/server.h +++ b/c_glib/arrow-flight-sql-glib/server.h @@ -68,6 +68,24 @@ const gchar * gaflightsql_statement_update_get_query(GAFlightSQLStatementUpdate *command); +#define GAFLIGHTSQL_TYPE_PREPARED_STATEMENT_UPDATE \ + (gaflightsql_prepared_statement_update_get_type()) +G_DECLARE_DERIVABLE_TYPE(GAFlightSQLPreparedStatementUpdate, + gaflightsql_prepared_statement_update, + GAFLIGHTSQL, + PREPARED_STATEMENT_UPDATE, + GAFlightSQLCommand) +struct _GAFlightSQLPreparedStatementUpdateClass +{ + GAFlightSQLCommandClass parent_class; +}; + +GARROW_AVAILABLE_IN_14_0 +GBytes * +gaflightsql_prepared_statement_update_get_handle( + GAFlightSQLPreparedStatementUpdate *command); + + #define GAFLIGHTSQL_TYPE_STATEMENT_QUERY_TICKET \ (gaflightsql_statement_query_ticket_get_type()) G_DECLARE_DERIVABLE_TYPE(GAFlightSQLStatementQueryTicket, @@ -90,6 +108,91 @@ gaflightsql_statement_query_ticket_get_handle( GAFlightSQLStatementQueryTicket *command); +#define GAFLIGHTSQL_TYPE_CREATE_PREPARED_STATEMENT_REQUEST \ + (gaflightsql_create_prepared_statement_request_get_type()) +G_DECLARE_DERIVABLE_TYPE(GAFlightSQLCreatePreparedStatementRequest, + gaflightsql_create_prepared_statement_request, + GAFLIGHTSQL, + CREATE_PREPARED_STATEMENT_REQUEST, + GObject) +struct _GAFlightSQLCreatePreparedStatementRequestClass +{ + GObjectClass parent_class; +}; + +GARROW_AVAILABLE_IN_14_0 +const gchar * +gaflightsql_create_prepared_statement_request_get_query( + GAFlightSQLCreatePreparedStatementRequest *request); + +GARROW_AVAILABLE_IN_14_0 +const gchar * +gaflightsql_create_prepared_statement_request_get_transaction_id( + GAFlightSQLCreatePreparedStatementRequest *request); + + +#define GAFLIGHTSQL_TYPE_CREATE_PREPARED_STATEMENT_RESULT \ + (gaflightsql_create_prepared_statement_result_get_type()) +G_DECLARE_DERIVABLE_TYPE(GAFlightSQLCreatePreparedStatementResult, + gaflightsql_create_prepared_statement_result, + GAFLIGHTSQL, + CREATE_PREPARED_STATEMENT_RESULT, + GObject) +struct _GAFlightSQLCreatePreparedStatementResultClass +{ + GObjectClass parent_class; +}; + +GARROW_AVAILABLE_IN_14_0 +GAFlightSQLCreatePreparedStatementResult * +gaflightsql_create_prepared_statement_result_new(void); +GARROW_AVAILABLE_IN_14_0 +void +gaflightsql_create_prepared_statement_result_set_dataset_schema( + GAFlightSQLCreatePreparedStatementResult *result, + GArrowSchema *schema); +GARROW_AVAILABLE_IN_14_0 +GArrowSchema * +gaflightsql_create_prepared_statement_result_get_dataset_schema( + GAFlightSQLCreatePreparedStatementResult *result); +GARROW_AVAILABLE_IN_14_0 +void +gaflightsql_create_prepared_statement_result_set_parameter_schema( + GAFlightSQLCreatePreparedStatementResult *result, + GArrowSchema *schema); +GARROW_AVAILABLE_IN_14_0 +GArrowSchema * +gaflightsql_create_prepared_statement_result_get_parameter_schema( + GAFlightSQLCreatePreparedStatementResult *result); +GARROW_AVAILABLE_IN_14_0 +void +gaflightsql_create_prepared_statement_result_set_handle( + GAFlightSQLCreatePreparedStatementResult *result, + GBytes *handle); +GARROW_AVAILABLE_IN_14_0 +GBytes * +gaflightsql_create_prepared_statement_result_get_handle( + GAFlightSQLCreatePreparedStatementResult *result); + + +#define GAFLIGHTSQL_TYPE_CLOSE_PREPARED_STATEMENT_REQUEST \ + (gaflightsql_close_prepared_statement_request_get_type()) +G_DECLARE_DERIVABLE_TYPE(GAFlightSQLClosePreparedStatementRequest, + gaflightsql_close_prepared_statement_request, + GAFLIGHTSQL, + CLOSE_PREPARED_STATEMENT_REQUEST, + GObject) +struct _GAFlightSQLClosePreparedStatementRequestClass +{ + GObjectClass parent_class; +}; + +GARROW_AVAILABLE_IN_14_0 +GBytes * +gaflightsql_close_prepared_statement_request_get_handle( + GAFlightSQLClosePreparedStatementRequest *request); + + #define GAFLIGHTSQL_TYPE_SERVER (gaflightsql_server_get_type()) G_DECLARE_DERIVABLE_TYPE(GAFlightSQLServer, gaflightsql_server, @@ -105,6 +208,13 @@ G_DECLARE_DERIVABLE_TYPE(GAFlightSQLServer, * that gets a #GAFlightDataStream containing the query results. * @do_put_command_statement_update: A virtual function to implement * `DoPutCommandStatementUpdate` API that executes an update SQL statement. + * @do_put_prepared_statement_update: A virtual function to implement + * `DoPutPreparedStatementUpdate` API that executes an update prepared + * statement. + * @create_prepared_statement: A virtual function to implement + * `CreatePreparedStatement` API that creates a prepared statement + * @close_prepared_statement: A virtual function to implement + * `ClosePreparedStatement` API that closes a prepared statement. * * Since: 9.0.0 */ @@ -128,6 +238,22 @@ struct _GAFlightSQLServerClass GAFlightServerCallContext *context, GAFlightSQLStatementUpdate *command, GError **error); + gint64 (*do_put_prepared_statement_update)( + GAFlightSQLServer *server, + GAFlightServerCallContext *context, + GAFlightSQLPreparedStatementUpdate *command, + GAFlightMessageReader *reader, + GError **error); + GAFlightSQLCreatePreparedStatementResult *(*create_prepared_statement)( + GAFlightSQLServer *server, + GAFlightServerCallContext *context, + GAFlightSQLCreatePreparedStatementRequest *request, + GError **error); + void (*close_prepared_statement)( + GAFlightSQLServer *server, + GAFlightServerCallContext *context, + GAFlightSQLClosePreparedStatementRequest *request, + GError **error); }; GARROW_AVAILABLE_IN_9_0 @@ -152,5 +278,30 @@ gaflightsql_server_do_put_command_statement_update( GAFlightServerCallContext *context, GAFlightSQLStatementUpdate *command, GError **error); +/* We can restore this after we bump version to 14.0.0-SNAPSHOT. */ +/* GARROW_AVAILABLE_IN_14_0 */ +gint64 +gaflightsql_server_do_put_prepared_statement_update( + GAFlightSQLServer *server, + GAFlightServerCallContext *context, + GAFlightSQLPreparedStatementUpdate *command, + GAFlightMessageReader *reader, + GError **error); +/* We can restore this after we bump version to 14.0.0-SNAPSHOT. */ +/* GARROW_AVAILABLE_IN_14_0 */ +GAFlightSQLCreatePreparedStatementResult * +gaflightsql_server_create_prepared_statement( + GAFlightSQLServer *server, + GAFlightServerCallContext *context, + GAFlightSQLCreatePreparedStatementRequest *request, + GError **error); +/* We can restore this after we bump version to 14.0.0-SNAPSHOT. */ +/* GARROW_AVAILABLE_IN_14_0 */ +void +gaflightsql_server_close_prepared_statement( + GAFlightSQLServer *server, + GAFlightServerCallContext *context, + GAFlightSQLClosePreparedStatementRequest *request, + GError **error); G_END_DECLS diff --git a/c_glib/arrow-flight-sql-glib/server.hpp b/c_glib/arrow-flight-sql-glib/server.hpp index 9159a6648934c..bdecf054a4d61 100644 --- a/c_glib/arrow-flight-sql-glib/server.hpp +++ b/c_glib/arrow-flight-sql-glib/server.hpp @@ -38,9 +38,30 @@ const arrow::flight::sql::StatementUpdate * gaflightsql_statement_update_get_raw( GAFlightSQLStatementUpdate *command); +GAFlightSQLPreparedStatementUpdate * +gaflightsql_prepared_statement_update_new_raw( + const arrow::flight::sql::PreparedStatementUpdate *flight_command); +const arrow::flight::sql::PreparedStatementUpdate * +gaflightsql_prepared_statement_update_get_raw( + GAFlightSQLPreparedStatementUpdate *command); + GAFlightSQLStatementQueryTicket * gaflightsql_statement_query_ticket_new_raw( const arrow::flight::sql::StatementQueryTicket *flight_command); const arrow::flight::sql::StatementQueryTicket * gaflightsql_statement_query_ticket_get_raw( GAFlightSQLStatementQueryTicket *command); + +GAFlightSQLCreatePreparedStatementRequest * +gaflightsql_create_prepared_statement_request_new_raw( + const arrow::flight::sql::ActionCreatePreparedStatementRequest *flight_request); +const arrow::flight::sql::ActionCreatePreparedStatementRequest * +gaflightsql_create_prepared_statement_request_get_raw( + GAFlightSQLCreatePreparedStatementRequest *request); + +GAFlightSQLClosePreparedStatementRequest * +gaflightsql_close_prepared_statement_request_new_raw( + const arrow::flight::sql::ActionClosePreparedStatementRequest *flight_request); +const arrow::flight::sql::ActionClosePreparedStatementRequest * +gaflightsql_close_prepared_statement_request_get_raw( + GAFlightSQLClosePreparedStatementRequest *request); diff --git a/c_glib/arrow-glib/version.h.in b/c_glib/arrow-glib/version.h.in index 1d43271a8b607..60c02936193bc 100644 --- a/c_glib/arrow-glib/version.h.in +++ b/c_glib/arrow-glib/version.h.in @@ -110,6 +110,15 @@ # define GARROW_UNAVAILABLE(major, minor) G_UNAVAILABLE(major, minor) #endif +/** + * GARROW_VERSION_14_0: + * + * You can use this macro value for compile time API version check. + * + * Since: 14.0.0 + */ +#define GARROW_VERSION_14_0 G_ENCODE_VERSION(14, 0) + /** * GARROW_VERSION_13_0: * @@ -337,6 +346,20 @@ #define GARROW_AVAILABLE_IN_ALL +#if GARROW_VERSION_MIN_REQUIRED >= GARROW_VERSION_14_0 +# define GARROW_DEPRECATED_IN_14_0 GARROW_DEPRECATED +# define GARROW_DEPRECATED_IN_14_0_FOR(function) GARROW_DEPRECATED_FOR(function) +#else +# define GARROW_DEPRECATED_IN_14_0 +# define GARROW_DEPRECATED_IN_14_0_FOR(function) +#endif + +#if GARROW_VERSION_MAX_ALLOWED < GARROW_VERSION_14_0 +# define GARROW_AVAILABLE_IN_14_0 GARROW_UNAVAILABLE(14, 0) +#else +# define GARROW_AVAILABLE_IN_14_0 +#endif + #if GARROW_VERSION_MIN_REQUIRED >= GARROW_VERSION_13_0 # define GARROW_DEPRECATED_IN_13_0 GARROW_DEPRECATED # define GARROW_DEPRECATED_IN_13_0_FOR(function) GARROW_DEPRECATED_FOR(function) diff --git a/c_glib/doc/arrow-flight-sql-glib/arrow-flight-sql-glib-docs.xml b/c_glib/doc/arrow-flight-sql-glib/arrow-flight-sql-glib-docs.xml index be8c003ea2dba..f87d657461140 100644 --- a/c_glib/doc/arrow-flight-sql-glib/arrow-flight-sql-glib-docs.xml +++ b/c_glib/doc/arrow-flight-sql-glib/arrow-flight-sql-glib-docs.xml @@ -54,6 +54,14 @@ Index of deprecated API + + Index of new symbols in 14.0.0 + + + + Index of new symbols in 13.0.0 + + Index of new symbols in 9.0.0 diff --git a/c_glib/test/flight-sql/test-client.rb b/c_glib/test/flight-sql/test-client.rb index adfb47fe0bd8b..ab80fc2cb8550 100644 --- a/c_glib/test/flight-sql/test-client.rb +++ b/c_glib/test/flight-sql/test-client.rb @@ -16,6 +16,7 @@ # under the License. class TestFlightSQLClient < Test::Unit::TestCase + include Helper::Buildable include Helper::Omittable def setup @@ -67,4 +68,32 @@ def test_error end end end + + sub_test_case("#prepare") do + def test_success + insert_sql = "INSERT INTO page_view_table VALUES (?, true)" + statement = @sql_client.prepare(insert_sql) + begin + assert_equal([ + build_schema(count: :uint64, private: :boolean), + build_schema(count: :uint64), + ], + [ + statement.dataset_schema, + statement.parameter_schema, + ]) + parameters = build_record_batch(count: build_uint64_array([1, 2, 3])) + statement.set_record_batch(parameters) + assert_equal(3, statement.execute_update) + ensure + statement.close + end + end + + def test_error + assert_raise(Arrow::Error::Invalid) do + @sql_client.prepare("INSERT") + end + end + end end diff --git a/c_glib/test/flight-sql/test-create-prepared-statement-result.rb b/c_glib/test/flight-sql/test-create-prepared-statement-result.rb new file mode 100644 index 0000000000000..bcaeb1dddc93b --- /dev/null +++ b/c_glib/test/flight-sql/test-create-prepared-statement-result.rb @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +class TestFlightSQLCreatePreparedStatementResult < Test::Unit::TestCase + include Helper::Buildable + include Helper::Omittable + + def setup + omit("Arrow Flight SQL is required") unless defined?(ArrowFlightSQL) + @result = ArrowFlightSQL::CreatePreparedStatementResult.new + end + + def test_dataset_schema + assert_nil(@result.dataset_schema) + schema = build_schema(text: :string, number: :int32) + @result.dataset_schema = schema + assert_equal(schema, @result.dataset_schema) + end + + def test_parameter_schema + assert_nil(@result.parameter_schema) + schema = build_schema(text: :string, number: :int32) + @result.parameter_schema = schema + assert_equal(schema, @result.parameter_schema) + end + + def test_handle + assert_equal("", @result.handle.to_s) + @result.handle = "valid-handle" + assert_equal("valid-handle".to_s, + @result.handle.to_s) + end +end diff --git a/c_glib/test/helper/buildable.rb b/c_glib/test/helper/buildable.rb index 29d7b6ba03bbe..b0156f9c8e299 100644 --- a/c_glib/test/helper/buildable.rb +++ b/c_glib/test/helper/buildable.rb @@ -19,6 +19,16 @@ module Helper module Buildable def build_schema(fields) fields = fields.collect do |name, data_type| + if data_type.is_a?(Symbol) + data_type_class_name = + data_type. + to_s. + split("_"). + collect(&:capitalize). + join. + gsub(/\AUint/, "UInt") + "DataType" + data_type = Arrow.const_get(data_type_class_name).new + end Arrow::Field.new(name, data_type) end Arrow::Schema.new(fields) diff --git a/c_glib/test/helper/flight-sql-server.rb b/c_glib/test/helper/flight-sql-server.rb index 8b664ca112223..8cbba51f84b39 100644 --- a/c_glib/test/helper/flight-sql-server.rb +++ b/c_glib/test/helper/flight-sql-server.rb @@ -46,5 +46,31 @@ def virtual_do_do_put_command_statement_update(context, command) end 1 end + + def virtual_do_create_prepared_statement(context, request) + unless request.query == "INSERT INTO page_view_table VALUES (?, true)" + raise Arrow::Error::Invalid.new("invalid SQL") + end + result = ArrowFlightSQL::CreatePreparedStatementResult.new + generator = FlightInfoGenerator.new + table = generator.page_view_table + result.dataset_schema = table.schema + result.parameter_schema = table.schema.remove_field(1) + result.handle = "valid-handle" + result + end + + def virtual_do_do_put_prepared_statement_update(context, command, reader) + unless command.handle.to_s == "valid-handle" + raise Arrow::Error::Invalid.new("invalid handle") + end + reader.read_all.n_rows + end + + def virtual_do_close_prepared_statement(context, request) + unless request.handle.to_s == "valid-handle" + raise Arrow::Error::Invalid.new("invalid handle") + end + end end end diff --git a/ruby/red-arrow-flight-sql/lib/arrow-flight-sql/client.rb b/ruby/red-arrow-flight-sql/lib/arrow-flight-sql/client.rb new file mode 100644 index 0000000000000..ff3169d5621b2 --- /dev/null +++ b/ruby/red-arrow-flight-sql/lib/arrow-flight-sql/client.rb @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +module ArrowFlightSQL + class Client + alias_method :prepare_raw, :prepare + def prepare(*args) + statement = prepare_raw(*args) + if block_given? + begin + yield(statement) + ensure + statement.close unless statement.closed? + end + else + statement + end + end + end +end diff --git a/ruby/red-arrow-flight-sql/lib/arrow-flight-sql/loader.rb b/ruby/red-arrow-flight-sql/lib/arrow-flight-sql/loader.rb index 3ff71094dbc33..5c920d9780ddf 100644 --- a/ruby/red-arrow-flight-sql/lib/arrow-flight-sql/loader.rb +++ b/ruby/red-arrow-flight-sql/lib/arrow-flight-sql/loader.rb @@ -29,6 +29,7 @@ def post_load(repository, namespace) end def require_libraries + require_relative "client" require_relative "server" end diff --git a/ruby/red-arrow-flight-sql/test/helper/server.rb b/ruby/red-arrow-flight-sql/test/helper/server.rb index ab034996392ce..f7c935fab91fa 100644 --- a/ruby/red-arrow-flight-sql/test/helper/server.rb +++ b/ruby/red-arrow-flight-sql/test/helper/server.rb @@ -37,5 +37,31 @@ def virtual_do_do_get_statement(context, command) table = generator.page_view_table ArrowFlight::RecordBatchStream.new(table) end + + def virtual_do_create_prepared_statement(context, request) + unless request.query == "INSERT INTO page_view_table VALUES (?, true)" + raise Arrow::Error::Invalid.new("invalid SQL") + end + result = ArrowFlightSQL::CreatePreparedStatementResult.new + generator = InfoGenerator.new + table = generator.page_view_table + result.dataset_schema = table.schema + result.parameter_schema = table.schema.remove_field(1) + result.handle = "valid-handle" + result + end + + def virtual_do_do_put_prepared_statement_update(context, command, reader) + unless command.handle.to_s == "valid-handle" + raise Arrow::Error::Invalid.new("invalid handle") + end + reader.read_all.n_rows + end + + def virtual_do_close_prepared_statement(context, request) + unless request.handle.to_s == "valid-handle" + raise Arrow::Error::Invalid.new("invalid handle") + end + end end end diff --git a/ruby/red-arrow-flight-sql/test/test-client.rb b/ruby/red-arrow-flight-sql/test/test-client.rb index de8732898937b..21554c1bdab84 100644 --- a/ruby/red-arrow-flight-sql/test/test-client.rb +++ b/ruby/red-arrow-flight-sql/test/test-client.rb @@ -39,4 +39,25 @@ def test_execute assert_equal(generator.page_view_table, reader.read_all) end + + def test_prepare + insert_sql = "INSERT INTO page_view_table VALUES (?, true)" + block_called = false + @sql_client.prepare(insert_sql) do |statement| + block_called = true + assert_equal([ + Arrow::Schema.new(count: :uint64, private: :boolean), + Arrow::Schema.new(count: :uint64), + ], + [ + statement.dataset_schema, + statement.parameter_schema, + ]) + counts = Arrow::UInt64Array.new([1, 2, 3]) + parameters = Arrow::RecordBatch.new(count: counts) + statement.set_record_batch(parameters) + assert_equal(3, statement.execute_update) + end + assert_true(block_called) + end end