From 9115454f252ea87b68dc14a2bec08b93b58c45aa Mon Sep 17 00:00:00 2001 From: Joris Gillis Date: Sat, 8 Nov 2025 18:51:21 +0100 Subject: [PATCH 1/2] [JAVA][CI] Adds a recipe for Flight SQL A very basic database server is implemented that uses the DataFusion query engine for answering SQL queries on single-file, write-once tables stored in the Arrow IPC format. Although this does not implement all the features and possibilities of FlightSqlProducer, it show cases the two main categories of concepts through the inclusion of tables (like catalog, database schema, sqlinfo, etc) and statement (like prepared statement and substrait statement). Some updates to the building of the Java Cookbook: Using the release property instead of the compiler source and target values which is deprecated. --- .gitignore | 3 +- java/source/conf.py | 29 +-- java/source/demo/pom.xml | 30 ++- java/source/flight_sql.rst | 519 +++++++++++++++++++++++++++++++++++++ java/source/index.rst | 1 + 5 files changed, 562 insertions(+), 20 deletions(-) create mode 100644 java/source/flight_sql.rst diff --git a/.gitignore b/.gitignore index 2381cee7..3fc9141a 100644 --- a/.gitignore +++ b/.gitignore @@ -19,4 +19,5 @@ r/*.Rproj *target/ .DS_Store -.cp.tmp \ No newline at end of file +.cp.tmp +python/compressed.csv.gz diff --git a/java/source/conf.py b/java/source/conf.py index 6594bd80..53c1d751 100644 --- a/java/source/conf.py +++ b/java/source/conf.py @@ -33,11 +33,11 @@ # -- Project information ----------------------------------------------------- -project = 'Apache Arrow Java Cookbook' -copyright = '2022, Apache Software Foundation' -author = 'The Apache Software Foundation' -arrow_nightly=os.getenv("ARROW_NIGHTLY") -if arrow_nightly and arrow_nightly != '0': +project = "Apache Arrow Java Cookbook" +copyright = "2022, Apache Software Foundation" +author = "The Apache Software Foundation" +arrow_nightly = os.getenv("ARROW_NIGHTLY") +if arrow_nightly and arrow_nightly != "0": version = "19.0.0-SNAPSHOT" else: version = "18.1.0" @@ -48,13 +48,10 @@ # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. -extensions = [ - "javadoctest", - "sphinx.ext.intersphinx" -] +extensions = ["javadoctest", "sphinx.ext.intersphinx"] # Add any paths that contain templates here, relative to this directory. -templates_path = ['_templates'] +templates_path = ["_templates"] # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. @@ -67,11 +64,11 @@ # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. # -html_theme = 'alabaster' +html_theme = "alabaster" html_theme_options = { - 'page_width': '1200px', - 'code_font_size': '0.8em', + "page_width": "1200px", + "code_font_size": "0.8em", "logo": "arrow-logo_vertical_black-txt_transparent-bg.svg", "github_user": "apache", "github_repo": "arrow-cookbook", @@ -80,15 +77,15 @@ "extra_nav_links": { "User Guide": "https://arrow.apache.org/docs/java/index.html", "API Reference": "https://arrow.apache.org/docs/java/reference/index.html", - "All Cookbooks": "../" + "All Cookbooks": "../", }, - "font_family": "-apple-system,BlinkMacSystemFont,Segoe UI,Roboto,Helvetica Neue,Arial,Noto Sans,Liberation Sans,sans-serif,Apple Color Emoji,Segoe UI Emoji,Segoe UI Symbol,Noto Color Emoji" + "font_family": "-apple-system,BlinkMacSystemFont,Segoe UI,Roboto,Helvetica Neue,Arial,Noto Sans,Liberation Sans,sans-serif,Apple Color Emoji,Segoe UI Emoji,Segoe UI Symbol,Noto Color Emoji", } # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['../../static'] +html_static_path = ["../../static"] # The name of an image file (relative to this directory) to use as a favicon of # the docs. This file should be a Windows icon file (.ico) being 16x16 or diff --git a/java/source/demo/pom.xml b/java/source/demo/pom.xml index 472e1abe..b74de3a9 100644 --- a/java/source/demo/pom.xml +++ b/java/source/demo/pom.xml @@ -30,6 +30,14 @@ exec-maven-plugin 3.0.0 + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 11 + + @@ -39,9 +47,7 @@ - 11 - 11 - 18.1.0 + 18.3.0 @@ -74,6 +80,11 @@ + + org.apache.arrow + flight-sql + ${arrow.version} + org.apache.arrow arrow-dataset @@ -119,5 +130,18 @@ calcite-core 1.37.0 + + + io.github.datafusion-contrib + datafusion-java + 0.16.0 + + + org.apache.arrow + arrow-memory-unsafe + + + + diff --git a/java/source/flight_sql.rst b/java/source/flight_sql.rst new file mode 100644 index 00000000..7ae00cdb --- /dev/null +++ b/java/source/flight_sql.rst @@ -0,0 +1,519 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +================ +Arrow Flight SQL +================ + +This section contains a recipe for working with Arrow Flight SQL. +For more detail about Flight SQL please take a look at `Arrow Flight SQL`_. + +.. contents:: + +A Simple DataFusion powered SQL database +======================================== + +FlightSQL +--------- + +The following code snippet shows a FlightSQL server that is a very simple data system. + +- It lists all tables in the data system. +- It stores Arrow `VectorSchemaRoot` s as table in the Arrow IPC format. +- It uses the Apache DataFusion query engine to answer SQL queries + +FlightSQL offers more concepts then simply tables and statements. This code snippet only +show cases the tables and the statements, all the other concepts (such as, catalog, +database schema, sql info, table type, prepared statement and substrait statement) +work very similar to either the table example or the statement example in this code. + +The concepts of Session and Transactions are ignored for simplicity. Both session +and transaction work with Flight Actions. + +A note on the implementation +---------------------------- + +The following example implements both the `FlightSqlClient` and `FlightSqlProducer`. + +The `FlightSqlClient` composes a `FlightClient` and directs the higher level API of +FlightSQL to the correct functions of the `FlightClient`. + +The `FlightSqlProducer` extends from the `BasicFlightSqlProducer` to implement the +server-side of the FlightSQL protocol. Note that only a part of the protocol is +implemented in this example (i.e., tables and statements). + +The interesting part of the code is in the `ExampleProducer`, implementing the +`getFlightInfo...` and `getStream...` functions for Tables and Statements respectively. + +In the Flight protocol, a client will first ask for the `FlightInfo` and then retrieve +the requested dataset using the `getStream` function. +A FlightSQL server works exactly the same. But offers a higher level API to the developer. + +The convenience of `FlightSqlProducer` interface is that is does dynamic routing/dispatching. +For example, the `getFlightInfo` function interprets the incoming request and invokes +`getFlightInfoTables` when the `FlightDescriptor` command is of the type `CommandGetTables`. +Or, if the `FlightDescriptor` type is `CommandStatementQuery`, the `getFlightInfoStatment` +function is invoked. + +This works similarly for the other concepts (prepared statements, catalogs, etc) and for +other functions (`getStream`, `getSchema`, etc). + +Last but not least, the `determineEndpoints` function is used by the `getFlightInfo...` +functions in `BasicFlightSqlProducer` to generate the `FlightEndpoint`'s for all the +non-statement concepts. It essentially takes the command from the `FlightDescriptor` +and returns that in the Ticket, such that the `getStream` function can deduce the correct +`getStream...` function in the `FlightSqlProducer` interface to call. + + +.. testcode:: + + import com.google.protobuf.Any; + import com.google.protobuf.Message; + + import org.apache.arrow.datafusion.ArrowFormat; + import org.apache.arrow.datafusion.DataFrame; + import org.apache.arrow.datafusion.ListingOptions; + import org.apache.arrow.datafusion.ListingTable; + import org.apache.arrow.datafusion.ListingTableConfig; + import org.apache.arrow.datafusion.SessionContexts; + import org.apache.arrow.flight.FlightClient; + import org.apache.arrow.flight.FlightDescriptor; + import org.apache.arrow.flight.FlightEndpoint; + import org.apache.arrow.flight.FlightInfo; + import org.apache.arrow.flight.FlightServer; + import org.apache.arrow.flight.FlightStream; + import org.apache.arrow.flight.Location; + import org.apache.arrow.flight.PutResult; + import org.apache.arrow.flight.Ticket; + import org.apache.arrow.flight.sql.BasicFlightSqlProducer; + import org.apache.arrow.flight.sql.FlightSqlClient; + import org.apache.arrow.flight.sql.impl.FlightSql; + import org.apache.arrow.memory.RootAllocator; + import org.apache.arrow.vector.IntVector; + import org.apache.arrow.vector.TimeStampMilliTZVector; + import org.apache.arrow.vector.ValueVector; + import org.apache.arrow.vector.VarBinaryVector; + import org.apache.arrow.vector.VarCharVector; + import org.apache.arrow.vector.VectorSchemaRoot; + import org.apache.arrow.vector.dictionary.DictionaryProvider; + import org.apache.arrow.vector.ipc.ArrowFileWriter; + import org.apache.arrow.vector.ipc.ArrowReader; + import org.apache.arrow.vector.types.TimeUnit; + import org.apache.arrow.vector.types.pojo.ArrowType; + import org.apache.arrow.vector.types.pojo.Field; + import org.apache.arrow.vector.types.pojo.Schema; + import org.apache.arrow.vector.util.VectorSchemaRootAppender; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import java.io.FileOutputStream; + import java.io.IOException; + import java.net.URISyntaxException; + import java.nio.charset.StandardCharsets; + import java.nio.file.Path; + import java.time.Instant; + import java.util.Collections; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + import java.util.WeakHashMap; + import java.util.regex.Pattern; + + class FlightSqlExample { + + public FlightSqlExample() {} + + private void createATableWithPredefinedData( + RootAllocator rootAllocator, FlightSqlClient client) { + Field idField = Field.notNullable("id", new ArrowType.Int(32, false)); + Field nameField = Field.notNullable("name", new ArrowType.Utf8()); + Field createdField = + Field.notNullable( + "ts", new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")); + + IntVector idVector = new IntVector(idField, rootAllocator); + VarCharVector nameVector = new VarCharVector(nameField, rootAllocator); + TimeStampMilliTZVector createdVector = + new TimeStampMilliTZVector(createdField, rootAllocator); + try (VectorSchemaRoot ingestData = + VectorSchemaRoot.of(idVector, nameVector, createdVector)) { + ingestData.allocateNew(); + int numberOfRows = 5; + for (int i = 0; i < numberOfRows; i++) { + idVector.set(i, i + 1); + nameVector.set(i, ("name" + i).getBytes(StandardCharsets.UTF_8)); + createdVector.set( + i, Instant.parse("2026-01-01T00:00:00Z").toEpochMilli() + i); + } + ingestData.setRowCount(numberOfRows); + + FlightSqlClient.ExecuteIngestOptions ingestOptions = + new FlightSqlClient.ExecuteIngestOptions( + "writing_test", + FlightSql.CommandStatementIngest.TableDefinitionOptions + .newBuilder() + .setIfExists( + FlightSql.CommandStatementIngest + .TableDefinitionOptions + .TableExistsOption + .TABLE_EXISTS_OPTION_FAIL) + .build(), + "default", + "public", + Map.of()); + client.executeIngest(ingestData, ingestOptions); + } + } + + private void stoppingServer(FlightServer server, Thread serverThread) + throws InterruptedException { + System.out.println("RUNNER: Ending server"); + server.shutdown(); + server.awaitTermination(); + serverThread.join(); + System.out.println("RUNNER: Server stopped"); + } + + private Thread startingServer(FlightServer server) throws InterruptedException { + Thread serverThread = + new Thread( + () -> { + try { + server.start(); + System.out.println("RUNNER: Server running!"); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + serverThread.start(); + + // Waiting for the server to start + Thread.sleep(1000L); + return serverThread; + } + + private void listTablesAvailableOnServer(FlightSqlClient client) { + FlightInfo listTablesInfo = + client.getTables("default", "public", ".*", List.of(), true); + try (FlightStream listTablesStream = + client.getStream(listTablesInfo.getEndpoints().get(0).getTicket())) { + while (listTablesStream.next()) { + VectorSchemaRoot root = listTablesStream.getRoot(); + System.out.println("CLIENT: Tables = " + root.getVector("table_name")); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void executeQueryPrintResults(FlightSqlClient client) throws Exception { + FlightInfo executeInfo = client.execute("SELECT * FROM writing_test"); + try (FlightStream executeStream = + client.getStream(executeInfo.getEndpoints().get(0).getTicket())) { + while (executeStream.next()) { + VectorSchemaRoot root = executeStream.getRoot(); + System.out.println("CLIENT: Query result = " + root.getRowCount()); + } + } + } + + // ================================================================================================================= + // Main function: to start server and drive the client. + // ================================================================================================================= + void runExample() { + var producer = new ExampleProducer(); + try (var rootAllocator = new RootAllocator(1024 * 1024 * 100); + FlightServer server = + FlightServer.builder() + .location(Location.forGrpcInsecure("0.0.0.0", 33333)) + .allocator(rootAllocator) + .producer(producer) + .build()) { + // ========================================================================================================= + // Starting server in a virtual thread + // ========================================================================================================= + Thread serverThread = startingServer(server); + + // ========================================================================================================= + // Creating client and executing commands on the server. + // ========================================================================================================= + try (var flightClient = + FlightClient.builder() + .location(Location.forGrpcInsecure("0.0.0.0", 33333)) + .allocator(rootAllocator) + .build()) { + var client = new FlightSqlClient(flightClient); + + System.out.println("CLIENT: Creating a table!"); + createATableWithPredefinedData(rootAllocator, client); + + System.out.println("CLIENT: Listing all tables"); + listTablesAvailableOnServer(client); + + System.out.println("CLIENT: Querying the table"); + executeQueryPrintResults(client); + + System.out.println("CLIENT: Ending client run."); + } catch (Exception e) { + System.out.println("CLIENT: Error running:\n"); + e.printStackTrace(System.out); + } + + // ========================================================================================================= + // Shutdown the server + // ========================================================================================================= + stoppingServer(server, serverThread); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + // ================================================================================================================= + // FlightSqlProducer: the code responding on the Flight SQL Server side. + // ================================================================================================================= + + /** + * A Producer is the heart of the Flight SQL Server. + * + *

It is responsible for handling all the requests from the client. From FlightInfo + * (generating tickets) to the schema of the data and streaming the actual data. + * + *

This example does not showcase all possibilities of the Producer, but it does + * showcase the most important ones. + */ + private class ExampleProducer extends BasicFlightSqlProducer { + private final Map tablePaths = new HashMap<>(); + + private final RootAllocator rootAllocator = + new RootAllocator(1024 * 1024 * 200); // 200 MB + private final Map results = + Collections.synchronizedMap(new WeakHashMap<>(100)); + + private long getSizeInBytes(VectorSchemaRoot vectorSchemaRoot) { + long totalSize = 0; + for (ValueVector vector : vectorSchemaRoot.getFieldVectors()) { + totalSize += vector.getBufferSize(); + } + return totalSize; + } + + // ============================================================================================================= + // TABLES + // ============================================================================================================= + @Override + public FlightInfo getFlightInfoTables( + FlightSql.CommandGetTables request, + CallContext context, + FlightDescriptor descriptor) { + if (!request.getCatalog().equals("default") + || !request.getDbSchemaFilterPattern().equals("public")) { + throw new RuntimeException( + "Only supporting `default` (catalog) and `public` (database schema)."); + } + + return super.getFlightInfoTables(request, context, descriptor); + } + + @Override + public void getStreamTables( + FlightSql.CommandGetTables command, + CallContext context, + ServerStreamListener listener) { + try (VectorSchemaRoot tableRoot = + VectorSchemaRoot.create(Schemas.GET_TABLES_SCHEMA, rootAllocator)) { + VarCharVector nameVector = + (VarCharVector) tableRoot.getVector("table_name"); + nameVector.allocateNewSafe(); + VarCharVector typeVector = + (VarCharVector) tableRoot.getVector("table_type"); + typeVector.allocateNewSafe(); + VarBinaryVector schemaVector = + (VarBinaryVector) tableRoot.getVector("table_schema"); + schemaVector.allocateNewSafe(); + + Pattern tableNamePattern = Pattern.compile(".*"); + if (command.hasTableNameFilterPattern()) { + tableNamePattern = Pattern.compile(command.getTableNameFilterPattern()); + } + int index = 0; + for (String tableName : tablePaths.keySet()) { + if (tableNamePattern.matcher(tableName).matches()) { + nameVector.set(index, tableName.getBytes(StandardCharsets.UTF_8)); + typeVector.set(index, "table".getBytes(StandardCharsets.UTF_8)); + schemaVector.set(index, "schema".getBytes(StandardCharsets.UTF_8)); + index++; + } + } + tableRoot.setRowCount(index); + + listener.start(tableRoot); + listener.putNext(); + listener.completed(); + } + } + + // ============================================================================================================= + // QUERYING + // ============================================================================================================= + @Override + public FlightInfo getFlightInfoStatement( + FlightSql.CommandStatementQuery command, + CallContext context, + FlightDescriptor descriptor) { + try (var ctx = SessionContexts.create()) { + for (Map.Entry tableEntry : tablePaths.entrySet()) { + ctx.registerTable( + tableEntry.getKey(), + new ListingTable( + new ListingTableConfig.Builder( + tableEntry.getValue().toString()) + .withListingOptions( + ListingOptions.builder( + new ArrowFormat()) + .build()) + .build(ctx) + .get())); + } + DataFrame dataFrame = ctx.sql(command.getQuery()).get(); + + try (ArrowReader reader = dataFrame.collect(rootAllocator).get()) { + try { + VectorSchemaRoot collectedData = + VectorSchemaRoot.create( + reader.getVectorSchemaRoot().getSchema(), + rootAllocator); + collectedData.allocateNew(); + while (reader.loadNextBatch()) { + VectorSchemaRootAppender.append( + collectedData, reader.getVectorSchemaRoot()); + } + + results.put(command.getQuery(), collectedData); + + /* + * Very important to return a FlightSql.TicketStatementQuery as the Ticket in endpoints, + * because this is used for routing to the getStreamStatement function in the getStream + * function of FlightSqlProducer. + */ + FlightSql.TicketStatementQuery ticketStatementQuery = + FlightSql.TicketStatementQuery.newBuilder() + .setStatementHandle(command.getQueryBytes()) + .build(); + byte[] statementQuerySerialized = + Any.pack(ticketStatementQuery).toByteArray(); + Ticket ticket = new Ticket(statementQuerySerialized); + FlightEndpoint endpoint = FlightEndpoint.builder(ticket).build(); + return FlightInfo.builder( + new Schema(List.of()), + descriptor, + List.of(endpoint)) + .setBytes(getSizeInBytes(collectedData)) + .setRecords(collectedData.getRowCount()) + .build(); + } catch (Exception e) { + System.out.println("SERVER: Error while saving query result"); + throw new RuntimeException(e); + } + } + } catch (Exception e) { + System.out.println("SERVER: Error running query: " + command.getQuery()); + throw new RuntimeException(e); + } + } + + @Override + public void getStreamStatement( + FlightSql.TicketStatementQuery ticket, + CallContext context, + ServerStreamListener listener) { + var resultKey = ticket.getStatementHandle().toStringUtf8(); + if (results.containsKey(resultKey)) { + VectorSchemaRoot root = results.get(resultKey); + listener.start(root); + listener.putNext(); + listener.completed(); + root.close(); + } else { + listener.error(new RuntimeException("Could not find query result")); + } + } + + // ============================================================================================================= + // WRITING + // ============================================================================================================= + @Override + public Runnable acceptPutStatementBulkIngest( + FlightSql.CommandStatementIngest command, + CallContext context, + FlightStream flightStream, + StreamListener ackStream) { + Path tablePath = Path.of(command.getTable() + ".arrow"); + try (FileOutputStream fileWriter = new FileOutputStream(tablePath.toFile()); + ArrowFileWriter arrowFileWriter = + new ArrowFileWriter( + flightStream.getRoot(), + new DictionaryProvider.MapDictionaryProvider(), + fileWriter.getChannel())) { + while (flightStream.next()) { + arrowFileWriter.writeBatch(); + } + tablePaths.put(command.getTable(), tablePath); + ackStream.onNext(PutResult.empty()); + return ackStream::onCompleted; + } catch (IOException e) { + return () -> ackStream.onError(e); + } + } + + // ============================================================================================================= + // OTHER + // ============================================================================================================= + @Override + protected List determineEndpoints( + T request, FlightDescriptor flightDescriptor, Schema schema) { + Ticket ticket = new Ticket(flightDescriptor.getCommand()); + FlightEndpoint endpoint = + FlightEndpoint.builder( + ticket, Location.forGrpcInsecure("0.0.0.0", 33333)) + .build(); + return List.of(endpoint); + } + } + } + + new FlightSqlExample().runExample(); + + + +Should output: + +.. testoutput:: + + RUNNER: Server running! + CLIENT: Creating a table! + CLIENT: Listing all tables + CLIENT: Tables = [writing_test] + CLIENT: Querying the table + CLIENT: Query result = 5 + CLIENT: Ending client run. + RUNNER: Ending server + RUNNER: Server stopped + + + + +_`Arrow Flight SQL`: https://arrow.apache.org/docs/format/FlightSql.html diff --git a/java/source/index.rst b/java/source/index.rst index 63f94c0d..65372ddd 100644 --- a/java/source/index.rst +++ b/java/source/index.rst @@ -38,6 +38,7 @@ This cookbook is tested with Apache Arrow |version|. schema io flight + flight_sql dataset substrait data From 55c9c62334da1306ed876a242c20101ab9a6ef23 Mon Sep 17 00:00:00 2001 From: Joris Gillis Date: Tue, 18 Nov 2025 14:22:30 +0100 Subject: [PATCH 2/2] [JAVA] Fixing compilation error in Java 11 --- java/source/flight_sql.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/source/flight_sql.rst b/java/source/flight_sql.rst index 7ae00cdb..277ba83d 100644 --- a/java/source/flight_sql.rst +++ b/java/source/flight_sql.rst @@ -295,7 +295,7 @@ and returns that in the Ticket, such that the `getStream` function can deduce th *

This example does not showcase all possibilities of the Producer, but it does * showcase the most important ones. */ - private class ExampleProducer extends BasicFlightSqlProducer { + class ExampleProducer extends BasicFlightSqlProducer { private final Map tablePaths = new HashMap<>(); private final RootAllocator rootAllocator =