Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Doris support Arrow Flight SQL protocol #25514

Open
1 task done
xinyiZzz opened this issue Oct 17, 2023 · 7 comments
Open
1 task done

[Feature] Doris support Arrow Flight SQL protocol #25514

xinyiZzz opened this issue Oct 17, 2023 · 7 comments
Labels
kind/feature Categorizes issue or PR as related to a new feature.

Comments

@xinyiZzz
Copy link
Contributor

xinyiZzz commented Oct 17, 2023

Search before asking

  • I had searched in the issues and found no similar issues.

Description

Doris implements a high-speed data link based on the Arrow Flight SQL protocol, supporting multiple languages to use SQL to read large data batches from Doris at high speed.

1. Motivation

In data science scenarios, it is often necessary to load large amounts of data from Doris to Python/Java/Spark. Loading data using Pymysql/Pandas or JDBC is very slow.

Nowadays, many big data systems use columnar in-memory data formats, Mysql/JDBC/ODBC are the mainstream protocols and standards for interacting with database systems. Their performance defects have become more and more obvious in today’s big data world. Data needs to be transferred from the system. A specific column storage format is serialized into the row storage format of Mysql/JDBC/ODBC and then deserialized back to the client's column storage format, which will significantly slow down the data movement.

If both the source database and the target client support Arrow as a columnar in-memory format, transferring using the Arrow Flight SQL protocol eliminates the need to serialize and deserialize the data, thereby eliminating the overhead in this portion of the data transfer. Additionally, Arrow Flight can leverage multi-node and multi-core architectures to optimize throughput through full parallelization.

2. Introduction to Arrow Flight SQL

Apache Arrow Flight SQL is a protocol developed by the Apache Arrow community to interact with database systems. It is used by ADBC clients to use the Arrow data format to interact with databases that implement the Arrow Flight SQL protocol. It has the speed advantage of Arrow Flight and the advantages of JDBC/ODBC. Ease of use. Some basic concepts are as follows:

  • Apache Arrow
    Apache Arrow is an efficient columnar memory format widely used for large-scale data processing and is supported by many libraries in all major programming languages.

  • Apache Arrow Flight
    Arrow Flight is an RPC framework that transmits Arrow data format, allowing high-speed data exchange between different systems using Arrow format.

  • Arrow Flight SQL
    Although the Mysql/JDBC/ODBC protocols and standards are slower, they have simple APIs for developers to use. For this reason, Arrow Flight SQL is introduced based on Arrow Flight to provide a more friendly interface to interact with the database system.

  • ADBC
    ADBC is a driver that supports different languages ​​to access the database. The database needs to implement the Arrow Flight SQL protocol, similar to JDBC/ODBC.

See also:

3. Implementation method

3.1 Principle

In Apache Doris, query results are organized in columnar format blocks. In previous versions, if you need to transfer this data to the target client through MySQL Client or JDBC/ODBC driver, you need to first serialize the blocks into row-format bytes. If the target client is a column-formatted data science component or column-formatted database like Pandas, you also need to deserialize the row-format bytes into column-formatted ones, and the serialization/deserialization operation is a very time-consuming process.

Using Arrow Flight SQL, we first convert the column-stored Block into the same column-stored Arrow RecordBatch in Doris. This conversion step is very fast. There is no need to serialize and deserialize again during the transmission process, and then use the Python client to convert the Arrow RecordBatch is transferred to the Pandas DataFrame stored in the same column. This conversion step is also very fast.

In addition, Arrow Flight SQL also provides a universal JDBC driver, which supports the use of Arrow Flight SQL to interact with database systems that is fully compatible with the JDBC standard.

Python reading Doris acceleration has been implemented, as shown in the figure:

image

3.2 Outline design

image

  1. ADBC Client sends a query request to Doris FE and completes the authentication on the first request.

  2. FE parses the query plan and sends the Fragment to be executed to BE.

  3. After BE completes the prepare and open of the Fragment, it returns the Schema of the query result in Arrow format to FE, starts executing the query, and puts the query results into a queue.

  4. FE sends the QueryID, the Schema of the query result, and the BE address (Endpoints) where the query result is located back to the ADBC Client.

  5. ADBC Client requests BE to pull the query results of the specified QueryID.

  6. BE returns the query results in Arrow format in the queue to the ADBC Client, and the ADBC Client completes after verifying the Schema of the results.

3.3 Detailed design

Arrow version: 13.0.0

Take the ADBC Low-Level API execution process as an example:

image

3.3.1 ADBC Client

1.1 db = adbc_driver_flightsql.connect(uri="grpc://ip:port?user=&password=")

Create a Database connector that can maintain multiple shared Connections at the same time. Parameters: Arrow Flight Server IP, port, username, password

1.2 conn = adbc_driver_manager.AdbcConnection(db)

Creating a Database link will trigger authentication and obtain FlightSqlInfo.

  1. Auth
    The authentication operation will be triggered when Arrow Flight Server is requested for the first time.
    The return value is a Bearer Token. Each subsequent request to Arrow Flight Server will bring this Token.

  2. getFlightInfoSqlInfo
    Request Arrow Flight Server to return SQL Info, including the SQL syntax supported by the database, etc. The return value is the schema and endpoint of SQL Info. SQL Info is also data in arrow format, and the endpoint is still the current doris fe flight server.
    All data interacted in arrow flight are arrows. Usually before obtaining an arrow data, the first request will obtain its endpoint and schema and encapsulate it in a FlightInfo. Then the endpoint will be requested again to obtain the arrow data and verify it. schema

  3. getStreamSqlInfo
    Request the endpoint to obtain SQL Info. The result is wrapped in ArrowArrayStream and associated with a ServerStreamListener.

1.3 stmt = adbc_driver_manager.AdbcStatement(conn)

It is used to maintain the status of the query. It can be a one-time query or a prepare statement, which can be used repeatedly, but the previous query results will be invalid.

1.4 stmt.set_sql_query("select * from tpch.hdf5 limit 10;")

1.5 stream, _ = stmt.execute_query()

Executing Query returns a RecordBatchReader, wrapped in a RecordBatchStream.

  1. getFlightInfoStatement
    Returns the Endpoints and Schema where the query results are located, which is the Metadata of the Stream.

  2. getStreamStatement
    Returns a RecordBatchReader for reading query results.

1.6 reader = pyarrow.RecordBatchReader._import_from_c(stream.address)

Created a Reader using Stream.

1.7 arrow_data = reader.read_all()

read_all() will loop to call RecordBatchReader.ReadNext() to obtain the RecordBatch of query results.

Corresponding code example:

import adbc_driver_flightsql
import adbc_driver_manager

db = adbc_driver_flightsql.connect(uri="grpc://127.0.0.1:8040", db_kwargs={
            adbc_driver_manager.DatabaseOptions.USERNAME.value: "root",
            adbc_driver_manager.DatabaseOptions.PASSWORD.value: "",
        })
conn = adbc_driver_manager.AdbcConnection(db)
stmt = adbc_driver_manager.AdbcStatement(conn)
stmt.set_sql_query("select * from tbl1 limit 1000000;")
stream, rows = stmt.execute_query()
reader = pyarrow.RecordBatchReader._import_from_c(stream.address)
arrow_data = reader.read_all()

3.3.2 Doris FE

2.1 Authentication

Implement arrow.flight.auth2 related interfaces to respond to authentication when the ADBC client connects for the first time. Extract the username and password in the request header and perform authentication. After generating a 130-bit Token, associate the Token with the user's permission information and save it in a cache. The cache size and Token expiration time can be adjusted in Config. Finally, the Token is returned to the ADBC client.

image

2.2 getFlightInfoSqlInfo

In response to the arrow flight sql request, SQL Info is returned and two methods, FlightSqlProducer.getFlightInfoSqlInfo() and FlightSqlProducer.getStreamSqlInfo, are implemented.

When Arrow Flight Server is initialized, it will create a FlightSqlProducer that responds to ADBC requests. When FlightSqlProducer is initialized, it will bind SQL Info, including the version of Arrow, whether it supports reading and writing, whether it supports DDL statements such as creating tables and modifying schema, and supported function lists and other SQL syntax. etc.

2.3 getFlightInfoStatement

Execute Query in response to the arrow flight sql request and return the Endpoints and Schema of the query results, implementing the FlightSqlProducer.getFlightInfoStatement method.

  1. Initialize ConnectContext. The first time ADBC Client makes an Execute Query request, it will initialize ConnectContext, which is a Session that stores information related to query execution, including user permissions, Session variables, etc.

  2. Initialize the executor FlightStatementExecutor. Saves Query, QueryID, connectContext, and resultServerInfo.

  3. Execute Query. Initialize QueryID and StmtExecutor, then executeArrowFlightQuery to generate the query plan, initialize and execute the Coordinator, and send the Fragment to the specified BE.

  4. Get the Arrow Result Set Schema. Request the Arrow Flight Server of the BE where the Result Sink Node in the query plan is located. The latter will generate the Schema of the query result after the Fragment completes Prepare and Open.

  5. Use Query and QueryID to initialize Ticket, use the Arrow Flight Server address of the BE where the Result Sink Node in the query plan is located, that is, the Server address where the query result Arrow Result Set is located, and the Ticket to initialize FlightEndpoint, and finally use Arrow Result Set Schema and Endpoints to initialize FlightInfo Then send it back to ADBC Client.

image

3.3.3 Doris BE

3.1 Execute Fragment

Execute the Fragment and return the Arrow Result Set Schema. The overall execution process is the same as before. The difference is that the type of ResultSinkNode in the Fragment is no longer MYSQL_PROTOCAL, but ARROW_FLIGHT_PROTOCAL. After the Prepare and Open are completed, the Arrow Schema of the query result will be put into a Map. Wait for FE to obtain and initialize ArrowFlightResultWriter.

After the subsequent query results arrive at the ResultSink, use ArrowFlightResultWriter::append_block to convert the data block into a RecordBatch in Arrow format, and then put it into a separate queue BufferControlBlock, waiting for the ADBC Client to pull it.

3.2 GetStatement

After receiving the Endpoints sent back by Doris FE, the ADBC Client will request the Arrow Flight Server address corresponding to the Endpoints located in Doris BE. After receiving the ADBC Client request, Doris BE will first Decode the Ticket and then obtain the SQL and QueryID, and then use the QueryID Find the Arrow Schema of the previously saved query result and initialize a RecordBatchReader to return it, which is used by the ADBC Client to subsequently pull data and implement the FlightSqlServerBase::DoGetStatement() method.

In addition, when the ADBC Client requests Doris BE's Arrow Flight Server for the first time, the Header will also contain the Bearer Token, but the HeaderAuthServerMiddleware and BearerAuthServerMiddleware used when the BE Arrow Flight Server is initialized are both NoOp, that is, no verification will be done, so currently BE Arrow Flight Server's permission verification of requests is based on QueryID, that is, ADBC Client is allowed to read data as long as the QueryID is correct.

image

3.3 ArrowFlightBatchReader::ReadNext

ADBC Client will cyclically call the ReadNext method of the previously returned RecordBatchReader to pull data, and BE Arrow Flight Server will use the QueryID in the request to pull the RecordBatch in Arrow format from the BufferControlBlock and return it.

4. How to use

Using the ADBC Driver based on Python (require version >= 3.9) as an example, connect to Doris that implements Arrow Flight SQL and supports common syntaxes such as DDL, DML, Session Veriable, Show Stmt, etc.

Modify the configuration of Doris FE and BE:

  1. Modify arrow_flight_sql_port in fe/conf/fe.conf to an available port, such as 9090.
  2. Modify arrow_flight_sql_port in be/conf/be.conf to an available port, such as 9091.

After Python uses the ADBC Driver to connect to Doris, which implements Arrow Flight SQL, the following uses various ADBC APIs to load the Clickbench data set from Doris to Python.

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import adbc_driver_manager
import adbc_driver_flightsql.dbapi as flight_sql
import pandas
from datetime import datetime

my_uri = "grpc://0.0.0.0:`fe.conf_arrow_flight_port`"
my_db_kwargs = {
    adbc_driver_manager.DatabaseOptions.USERNAME.value: "root",
    adbc_driver_manager.DatabaseOptions.PASSWORD.value: "",
}
sql = "select * from clickbench.hits limit 1000000;"

# PEP 249 (DB-API 2.0) API wrapper for the ADBC Driver Manager.
def dbapi_adbc_execute_fetchallarrow():
    conn = flight_sql.connect(uri=my_uri, db_kwargs=my_db_kwargs)
    cursor = conn.cursor()
    start_time = datetime.now()
    cursor.execute(sql)
    arrow_data = cursor.fetchallarrow()
    dataframe = arrow_data.to_pandas()
    print("\n##################\n dbapi_adbc_execute_fetchallarrow" + ", cost:" + str(datetime.now() - start_time) + ", bytes:" + str(arrow_data.nbytes) + ", len(arrow_data):" + str(len(arrow_data)))
    print(dataframe.info(memory_usage='deep'))
    print(dataframe)

# ADBC reads data into pandas dataframe, which is faster than fetchallarrow first and then to_pandas.
def dbapi_adbc_execute_fetch_df():
    conn = flight_sql.connect(uri=my_uri, db_kwargs=my_db_kwargs)
    cursor = conn.cursor()
    start_time = datetime.now()
    cursor.execute(sql)
    dataframe = cursor.fetch_df()    
    print("\n##################\n dbapi_adbc_execute_fetch_df" + ", cost:" + str(datetime.now() - start_time))
    print(dataframe.info(memory_usage='deep'))
    print(dataframe)

# Can read multiple partitions in parallel.
def dbapi_adbc_execute_partitions():
    conn = flight_sql.connect(uri=my_uri, db_kwargs=my_db_kwargs)
    cursor = conn.cursor()
    start_time = datetime.now()
    partitions, schema = cursor.adbc_execute_partitions(sql)
    cursor.adbc_read_partition(partitions[0])
    arrow_data = cursor.fetchallarrow()
    dataframe = arrow_data.to_pandas()
    print("\n##################\n dbapi_adbc_execute_partitions" + ", cost:" + str(datetime.now() - start_time) + ", len(partitions):" + str(len(partitions)))
    print(dataframe.info(memory_usage='deep'))
    print(dataframe)

import adbc_driver_flightsql
import pyarrow

# ADBC Low-level api is root module, provides a fairly direct, 1:1 mapping to the C API definitions in Python. 
# For a higher-level interface, use adbc_driver_manager.dbapi. (This requires PyArrow.)
def low_level_api_execute_query():
    db = adbc_driver_flightsql.connect(uri=my_uri, db_kwargs=my_db_kwargs)
    conn = adbc_driver_manager.AdbcConnection(db)
    stmt = adbc_driver_manager.AdbcStatement(conn)
    stmt.set_sql_query(sql)
    start_time = datetime.now()
    stream, rows = stmt.execute_query()
    reader = pyarrow.RecordBatchReader._import_from_c(stream.address)
    arrow_data = reader.read_all()
    dataframe = arrow_data.to_pandas()
    print("\n##################\n low_level_api_execute_query" + ", cost:" + str(datetime.now() - start_time) + ", stream.address:" + str(stream.address) + ", rows:" + str(rows) + ", bytes:" + str(arrow_data.nbytes) + ", len(arrow_data):" + str(len(arrow_data)))
    print(dataframe.info(memory_usage='deep'))
    print(dataframe)

# Can read multiple partitions in parallel.
def low_level_api_execute_partitions():
    db = adbc_driver_flightsql.connect(uri=my_uri, db_kwargs=my_db_kwargs)
    conn = adbc_driver_manager.AdbcConnection(db)
    stmt = adbc_driver_manager.AdbcStatement(conn)
    stmt.set_sql_query(sql)
    start_time = datetime.now()
    streams = stmt.execute_partitions()
    for s in streams[0]:
        stream = conn.read_partition(s)
        reader = pyarrow.RecordBatchReader._import_from_c(stream.address)
        arrow_data = reader.read_all()
        dataframe = arrow_data.to_pandas()
    print("\n##################\n low_level_api_execute_partitions" + ", cost:" + str(datetime.now() - start_time) + "streams.size:" + str(len(streams)) + ", "  + str(len(streams[0])) + ", " + str(streams[2]))

dbapi_adbc_execute_fetchallarrow()
dbapi_adbc_execute_fetch_df()
dbapi_adbc_execute_partitions()
low_level_api_execute_query()
low_level_api_execute_partitions()

The execution results are as follows (repeated output is ignored). It can be seen that it takes 3 seconds to load the 1 million rows, 105 columns, and 780M Clickbench data set from Doris.

##################
 dbapi_adbc_execute_fetchallarrow, cost:0:00:03.548080, bytes:784372793, len(arrow_data):1000000
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000000 entries, 0 to 999999
Columns: 105 entries, CounterID to CLID
dtypes: int16(48), int32(19), int64(6), object(32)
memory usage: 2.4 GB
None
        CounterID   EventDate               UserID            EventTime              WatchID  JavaEnable                                              Title  GoodEvent  ...  UTMCampaign  UTMContent  UTMTerm  FromTag  HasGCLID          RefererHash              URLHash  CLID
0          245620  2013-07-09  2178958239546411410  2013-07-09 19:30:27  8302242799508478680           1  OWAProfessionov — Мой Круг (СВАО Интернет-магазин          1  ...                                                    0 -7861356476484644683 -2933046165847566158     0
999999       1095  2013-07-03  4224919145474070397  2013-07-03 14:36:17  6301487284302774604           0  @дневники Sinatra (ЛАДА, цена для деталли кто ...          1  ...                                                    0  -296158784638538920  1335027772388499430     0

[1000000 rows x 105 columns]

##################
 dbapi_adbc_execute_fetch_df, cost:0:00:03.611664
##################
 dbapi_adbc_execute_partitions, cost:0:00:03.483436, len(partitions):1
##################
 low_level_api_execute_query, cost:0:00:03.523598, stream.address:139992182177600, rows:-1, bytes:784322926, len(arrow_data):1000000
##################
 low_level_api_execute_partitions, cost:0:00:03.738128streams.size:3, 1, -1

5. Progress and TODO

  1. upgrade thirdparty libs - again [enhancement](thirdparty) upgrade thirdparty libs - again #23414

  2. (step1) BE support Arrow Flight server, read data only [feature-wip] (arrow-flight) (step1) BE support Arrow Flight server, read data only #23765

  3. (step2) FE support Arrow Flight server [feature-wip](arrow-flight)(step2) FE support Arrow Flight server #24314

  4. (step3) Support authentication and user session [feature-wip](arrow-flight)(step3) Support authentication and user session #24772

  5. (step4) Support other DML and DDL statements, besides Select [feature-wip](arrow-flight)(step4) Support other DML and DDL statements, besides Select #25919

  6. (step5) Support JDBC and PreparedStatement and Fix Bug [feature-wip](arrow-flight)(step5) Support JDBC and PreparedStatement and Fix Bug #27661

  7. (step6) Support regression test [feature-wip](arrow-flight)(step6) Support regression test #27847

6. Test

Test Dataset:

TableName Rows Describe
Clickbench 10000w https://github.com/ClickHouse/ClickBench
tpch.lineitem 60000w https://www.tpc.org/tpch/
96float_table 2000w 1 column of String, 95 columns of Float. CREATE TABLE hdf5(
 k0varchar(65532) NULL,
 k1float NULL,
 k2float NULL,
 ……
 k95 float NULL
) ENGINE=OLAP
DUPLICATE KEY(k0)
DISTRIBUTED BY HASH(k0) BUCKETS 64;

6.1 Python

Compare the performance of Python using Pymysql, Pandas and Arrow Flight SQL to read Doris:

Column type / cost (Unit: s) Pymysql pandas.read_sql Arrow Flight SQL SQL
Int column 70.097648 80.461473 0.154683 select ClientIP from clickbench.hits limit 10000000;
Bool column 68.84048 91.333049 0.109124 select CounterClass from clickbench.hits where CounterClass!=0 limit 10000000;
Float column 132.46575 152.666138 1.974839 select k1 from 96float_table
String column 68.21946 79.298519 3.614652 select URL from clickbench.hits where URL!='' limit 10000000
String column 126.32184 147.955599 6.195701 select k0 from 96float_table;
Mixed columns 229.72 248.41 3.920499 select * from clickbench.hits limit 1000000
Mixed columns 200.88983 206.912296 1.049839 select * from 96float_table limit 1000000

Arrow Flight SQL is much faster than Pymysql and other MySQL protocol-based libraries in all column types, with more than 20 times performance improvement in most large data volume reading scenarios, and 100 times performance improvement in some scenarios.

image

6.2 Java

6.2.1

Compared with the traditional jdbc:mysql connection method, the performance of reading data from Doris using three Arrow Flight SQL connection methods was tested. The test code can be found by searching Arrow Flight in https://github.com/apache/doris-websit. jdbc:mysql and jdbc:arrow-flight-sql both return data in JDBC ResultSet format, and Flight AdbcDriver and Flight JdbcDriver both return data in Arrow format.

Column type / cost (Unit: s) JDK version Java jdbc:mysql DriverManager Java jdbc:arrow-flight-sql DriverManager Java Flight AdbcDriver Java Flight JdbcDriver SQL
1 Int column (1000w) JDK 1.8 3.772 0.425 0.568 0.829 select ClientIP from clickbench.hits limit 10000000;
JDK 17 3.826 0.451 0.510 0.756
2 Bool column (1000w) JDK 1.8 4.353 0.430 0.547 0.815 select CounterClass from clickbench.hits where CounterClass!=0 limit 10000000;
JDK 17 3.755 0.421 0.491 0.751
3 String column (1000w) JDK 1.8 9.800 1.103 1.218 4.519 select URL from clickbench.hits where URL!='' limit 10000000
JDK 17 5.454 0.973 1.062 3.102
4 Mixed columns (100w) JDK 1.8 8.478 1.799 2.123 13.431 select * from clickbench.hits limit 1000000
JDK 17 4.355 1.794 1.919 10.544
5 Decimal column (60000w) JDK 1.8 OutOfMemoryError: Java heap space -Xms20G -Xmx40g 23.354 24.288 Cannot get simple type for type DECIMAL select l_extendedprice from tpch.lineitem;
JDK 17 OutOfMemoryError: Java heap space -Xms20G -Xmx40g 23.357 23.701 Cannot get simple type for type DECIMAL
6 Decimal column (1000w) JDK 1.8 4.499 0.654 0.873 Cannot get simple type for type DECIMAL select l_extendedprice from tpch.lineitem limit 10000000;
JDK 17 3.456 0.660 0.789 Cannot get simple type for type DECIMAL
7 DATE (60000w) JDK 1.8 OutOfMemoryError: Java heap space -Xms20G -Xmx40g 24.323 24.559 122.514 select l_commitdate from tpch.lineitem;
JDK17 OutOfMemoryError: Java heap space -Xms20G -Xmx40g 23.932 23.919 124.309
8 DATE (1000w) JDK 1.8 4.554 0.636 0.864 2.784 select l_commitdate from tpch.lineitem limit 10000000;
JDK 17 3.226 0.689 0.838 2.712
9 Mixed columns (100w) JDK 1.8 1.690 0.892 1.070 Cannot get simple type for type DECIMAL select * from tpch.lineitem limit 1000000;
JDK 17 1.061 0.756 0.919 Cannot get simple type for type DECIMAL

The above test can be concluded:

  1. Simply replace the connection string from jdbc:mysql:// to jdbc:arrow-flight-sql://, and you can get a 2 to 10 times improvement in read performance on different data types.
  2. For all connection methods, JDK 17 has a minimum 10% and maximum 100% improvement in read performance over JDK 1.8.
  3. When reading a large amount of data, Arrow Flight SQL will use less memory than jdbc:mysql://. In the above test, Arrow Flight SQL uses less than one-tenth of the memory to read 600 million rows of Decimal Column.

6.2.2

Compare the performance of reading data from Doris and converting it to String using different connection methods, and call resultSet.toString() or VectorSchemaRoot.contentToTSVString in the loop body of each getResultSet or loadNextBatch.

Column type / cost (Unit: s) JDK version Java jdbc:mysql DriverManager Java jdbc:arrow-flight-sql DriverManager Java Flight AdbcDriver Java Flight JdbcDriver SQL
1 Int column (1000w) JDK 1.8 11.972 2.239 1.797 2.078 select ClientIP from clickbench.hits limit 10000000;
JDK 17 10.793 1.580 1.466 1.686
2 Bool column (1000w) JDK 1.8 6.514 2.337 1.332 1.581 select CounterClass from clickbench.hits where CounterClass!=0 limit 10000000;
JDK 17 5.436 1.601 1.074 1.343
3 String column(1000w) JDK 1.8 12.376 2.378 4.536 7.855 select URL from clickbench.hits where URL!='' limit 10000000
JDK 17 7.460 1.766 5.674 6.709
4 Mixed columns (100w) JDK 1.8 14.957 1.856 13.97 26.437 select * from clickbench.hits limit 1000000
JDK 17 7.840 1.818 13.24 25.421
5 Decimal column (60000w) JDK 1.8 OutOfMemoryError: Java heap space -Xms20G -Xmx40g 86.219 72.473 Cannot get simple type for type DECIMAL select l_extendedprice from tpch.lineitem;
JDK 17 OutOfMemoryError: Java heap space -Xms20G -Xmx40g 49.458 70.62 Cannot get simple type for type DECIMAL
6 Decimal column (1000w) JDK 1.8 6.556 2.256 2.131 Cannot get simple type for type DECIMAL select l_extendedprice from tpch.lineitem limit 10000000;
JDK 17 5.298 1.726 2.168 Cannot get simple type for type DECIMAL
7 DATE (60000w) JDK 1.8 OutOfMemoryError: Java heap space -Xms20G -Xmx40g 86.006 40.807 170.358 select l_commitdate from tpch.lineitem;
JDK 17 OutOfMemoryError: Java heap space -Xms20G -Xmx40g 49.792 37.610 169.474
8 DATE (1000w) JDK 1.8 7.117 2.253 1.532 3.484 select l_commitdate from tpch.lineitem limit 10000000;
JDK 17 4.753 1.697 1.365 3.307
9 Mixed columns (100w) JDK 1.8 2.126 0.992 3.309 Cannot get simple type for type DECIMAL select * from tpch.lineitem limit 1000000;
JDK 17 1.264 0.793 3.109 Cannot get simple type for type DECIMAL

The above test can be concluded as follows:

  1. Whether parsing data in JDBC ResultSet or Arrow format, the time consumed is greater than the time consumed to read the data. In the above test, the time consumed to convert data into String is usually twice that of reading data.
  2. The time consumed to convert a column of data into String in JDBC ResultSet and Arrow format is similar, but when converting multiple columns of data into String, Arrow format is significantly slower than JDBC ResultSet. This is because JDBC ResultSet is a row-based data format, while Arrow is a column-based data format. Converting to a row-based String requires row-column conversion, so it is slower.

(zh-CN)

Doris 基于 Arrow Flight SQL 协议实现了高速数据链路,支持多种语言使用 SQL 从 Doris 高速读取大批量数据。

1 动机

在数据科学场景中,经常需要从 Doris 加载大量数据到 Python/Java/Spark,过去使用 Pymysql/Pandas 或 JDBC 加载大批量数据的速度很慢。

如今许多大数据系统都使用列式的内存数据格式,Mysql/JDBC/ODBC 作为与数据库系统交互的主流协议与标准,它们的性能缺陷在当今的大数据世界中愈发明显,需要将数据从系统特定的列存格式序列化为 Mysql/JDBC/ODBC 的行存格式,然后再反序列化回客户端的列存格式,这会使数据移动速度大幅降低。

如果源数据库和目标客户端都支持 Arrow 作为列式内存格式,使用 Arrow Flight SQL 协议传输将无需序列化和反序列化数据,从而消除这部分数据传输中的开销。此外 Arrow Flight 还可以利用多节点和多核架构,通过完全并行化优化吞吐能力。

2 Arrow Flight SQL介绍

Apache Arrow Flight SQL 是一个由 Apache Arrow 社区开发的与数据库系统交互的协议,用于 ADBC 客户端使用 Arrow 数据格式与实现了 Arrow Flight SQL 协议的数据库交互,具有 Arrow Flight 的速度优势以及 JDBC/ODBC 的易用性。一些基本概念如下:

  • Apache Arrow
    Apache Arrow 作为一个高效的列式内存格式,广泛用于大规模数据处理,被当今所有主流编程语言的许多库所支持。

  • Apache Arrow Flight
    Arrow Flight 是一种传输 Arrow 数据格式的RPC框架,允许不同系统间使用 Arrow 格式进行高速的数据交互。

  • Arrow Flight SQL
    Mysql/JDBC/ODBC 协议与标准虽然速度较慢,但具有简单的API供开发人员使用,为此基于 Arrow Flight 引入 Arrow Flight SQL,提供更友好的接口与数据库系统交互。

  • ADBC
    ADBC 是一个支持不同语言访问数据库的 Driver,需要数据库实现 Arrow Flight SQL 协议,类似JDBC/ODBC。

See also:
Introducing Apache Arrow Flight SQL: Accelerating Database Access
介绍了 Arrow Flight SQL 的原理和实现。
https://arrow.apache.org/blog/2022/02/16/introducing-arrow-flight-sql/
Arrow Flight SQL 介绍
Arrow Flight SQL 文档,介绍了 API 和交互流程。
https://arrow.apache.org/docs/format/FlightSql.html
An Introduction to Apache Arrow Flight SQL
介绍了 Arrow Flight SQL 对比 JDBC/ODBC 的优势。
https://www.dremio.com/blog/an-introduction-to-apache-arrow-flight-sql
Apache Arrow ADBC
ADBC 文档,介绍了不同语言 ADBC Driver 的使用方法。
https://arrow.apache.org/adbc/main/

3 基于 Arrow Flight SQL 实现高速数据链路

3.1 原理

在 Apache Doris 中,查询结果以列存格式的 Block 组织。在之前版本中,如需将这些数据通过 MySQL Client 或 JDBC/ODBC 驱动传输至目标客户端时,需要先将 Block 序列化为行存格式的 Bytes,如果目标客户端是类似 Pandas 的列存数据科学组件或列存数据库,还需将行存格式的 Bytes 再反序列化为列存格式,而序列化/反序列化操作是一个非常耗时的过程。

使用 Arrow Flight SQL,我们在 Doris 中先将列存的 Block 转为同样列存的 Arrow RecordBatch,这一步转换非常快,传输过程中无需再次序列化和反序列化,然后在 Python 客户端再将 Arrow RecordBatch 转到同样列存的 Pandas DataFrame 中,这一步转换同样非常快。

此外 Arrow Flight SQL 还提供了通用 JDBC 驱动,支持完全兼容 JDBC 标准的使用 Arrow Flight SQL 与数据库系统交互。
自此实现 Python 读取 Doris 加速,如图所示:
[pic]

3.2 概要设计

[pic]
1)ADBC Client 向 Doris FE发送查询请求,并在第一次请求时完成鉴权。
2)FE解析查询计划并将要执行的 Fragment 发送给 BE。
3)BE完成Fragment的 prepare 和 open 后将Arrow格式的查询结果的Schema返回给FE,并开始执行查询,将查询结果放入一个队列中。
4)FE将QueryID、查询结果的Schema、查询结果所在的BE地址(Endpoints)发回ADBC Client。
5)ADBC Client向BE请求拉取指定QueryID的查询结果。
6)BE 将队列中 Arrow 格式的查询结果返回 ADBC Client,ADBC Client对结果的Schema校验无误后完成。

3.3 ADBC Low-Level API执行流程

Arrow 版本:13.0.0
以 ADBC Low-Level API执行流程为例:
[pic]

1.3.2.1 ADBC Client

1.1. db = adbc_driver_flightsql.connect(uri="grpc://ip:port?user=&password=")
创建一个 Database 连接器,可以同时保持多个共享的 Connection,参数:Arrow Flight Server IP、port、username、password

1.2 conn = adbc_driver_manager.AdbcConnection(db)
创建一个 Database 的链接,将触发鉴权和获取 FlightSqlInfo。

  1. Auth
    第一次请求Arrow Flight Server时会触发鉴权操作。
    返回值是一个 Bearer Token(不记名Token)。之后每次请求 Arrow Flight Server 都会带上这个Token。
    2)getFlightInfoSqlInfo
    请求Arrow Flight Server返回SQL Info,包括数据库支持的SQL语法等,返回值是SQL Info的schema和endpoint,SQL Info也是一个arrow格式的数据,endpoint还是当前doris fe的flight server。
    arrow flight中交互的所有数据都是arrow,通常在获取一个arrow数据前,第一次请求会先获取它的 endpoint 和 schema,并封装在一个FlightInfo中,然后将再次请求endpoint获取arrow数据并校验schema
    3)getStreamSqlInfo
    请求endpoint获取SQL Info,结果被包装在 ArrowArrayStream 中,关联了一个 ServerStreamListener。

1.3 stmt = adbc_driver_manager.AdbcStatement(conn)
用于保持查询的状态,可以是一次性的查询,或者prepare的语句,可以重复使用,不过之前的查询结果将失效。

1.4 stmt.set_sql_query("select * from tpch.hdf5 limit 10;")

1.5 stream, _ = stmt.execute_query()
执行 Query 返回 RecordBatchReader,被包装在一个 RecordBatchStream 中。
1)getFlightInfoStatement
返回查询结果所在的 Endpoints 和 Schema,也就是 Stream 的 Metadata。
2)getStreamStatement
返回用于读取查询结果的 RecordBatchReader。

1.6 reader = pyarrow.RecordBatchReader._import_from_c(stream.address)
使用 Stream 创建了一个 Reader。

1.7 arrow_data = reader.read_all()
read_all() 会循环调用 RecordBatchReader.ReadNext() 获取查询结果的 RecordBatch。
对应代码示例:

1.3.2.2 Doris FE

2.1 Auth
实现 arrow.flight.auth2 相关的接口在ADBC client第一次连接时响应鉴权。提取请求header中的用户名和密码后进行身份验证,生成一个130位的Token后,将Token与用户的权限信息关联并保存到一个cache中,cache的大小和Token的过期时间可在Config中调整,最终将Token返回给ADBC client。

2.2 getFlightInfoSqlInfo
响应arrow flight sql请求返回SQL Info,实现 FlightSqlProducer.getFlightInfoSqlInfo() 和 FlightSqlProducer.getStreamSqlInfo两个方法。
Arrow Flight Server 初始化时会创建响应ADBC请求的 FlightSqlProducer,FlightSqlProducer初始化时会绑定SQL Info,包括Arrow的版本、是否支持读写、是否支持建表和修改schema等DDL语句、支持的函数列表等SQL语法等等。

2.3 getFlightInfoStatement
响应arrow flight sql请求执行 Query 并返回查询结果的 Endpoints 和 Schema,实现 FlightSqlProducer.getFlightInfoStatement方法。

  1. 初始化 ConnectContext。ADBC Client第一次 Execute Query 请求时会初始化 ConnectContext,也就是一个Session,保存着包括用户权限、Session变量等与查询执行相关的信息。
  2. 初始化执行器FlightStatementExecutor。保存着Query、QueryID、connectContext、resultServerInfo。
  3. 执行Query。初始化 QueryID 和 和 StmtExecutor,然后 executeArrowFlightQuery 生成查询计划,初始化 Coordinator 并执行,发送 Fragment 给指定 BE。
  4. 获取Arrow Result Set Schema。请求查询计划中 Result Sink Node 所在BE的 Arrow Flight Server,后者会在 Fragment 完成 Prepare 和 Open 后生成查询结果的 Schema。
  5. 使用 Query 和 QueryID 初始化 Ticket,使用查询计划中 Result Sink Node 所在 BE 的 Arrow Flight Server地址,也就是查询结果 Arrow Result Set 所在的 Server 地址和 Ticket 初始化 FlightEndpoint,最后用 Arrow Result Set Schema、Endpoints 初始化 FlightInfo 后发回 ADBC Client。

1.3.2.3 Doris BE

3.1 Execute Fragment
执行 Fragment 并返回 Arrow Result Set Schema,整体执行流程和之前相同,区别在于 Fragment 中 ResultSinkNode 的类型不再是 MYSQL_PROTOCAL,而是 ARROW_FLIGHT_PROTOCAL,会在 Prepare 和 Open 结束后将查询结果的Arrow Schema放入一个Map中等待 FE 获取,并初始化 ArrowFlightResultWriter。
后续查询结果到达 ResultSink 后,使用 ArrowFlightResultWriter::append_block 将数据 Block 转成 Arrow 格式的 RecordBatch,然后放入一个单独的队列 BufferControlBlock 中,等待 ADBC Client拉取。

3.2 GetStatement
ADBC Client 在收到 Doris FE 发回的 Endpoints 后,会请求 Endpoints 对应的位于 Doris BE 的 Arrow Flight Server 地址,Doris BE 在收到 ADBC Client 请求后,会先 Decode Ticket 后得到 SQL 和 QueryID,然后使用 QueryID 找到之前保存的查询结果的Arrow Schema并初始化一个 RecordBatchReader 返回,用于 ADBC Client 后续拉取数据,实现 FlightSqlServerBase::DoGetStatement() 方法。
此外,在 ADBC Client 第一次请求 Doris BE 的 Arrow Flight Server 时,Header 中同样会包含 Bearer Token,但 BE Arrow Flight Server 初始化时使用的 HeaderAuthServerMiddleware 和 BearerAuthServerMiddleware 都是 NoOp,即不会做任何验证,所以当前 BE Arrow Flight Server 对请求的权限验证基于 QueryID,即只要 QueryID 正确就允许 ADBC Client 读取数据。

3.3 ArrowFlightBatchReader::ReadNext
ADBC Client 会循环调用之前返回的 RecordBatchReader 的 ReadNext 方法拉取数据,BE Arrow Flight Server 会使用请求中的 QueryID 从 BufferControlBlock 中拉取 Arrow 格式的 RecordBatch 并返回。

6 性能测试

6.1 Python

对比 Python 使用 Pymysql, Pandas 和 Arrow Flight SQL 读取 Doris 的性能:
测试结论:
Arrow Flight SQL 在所有列类型上均远快于 Pymysql 等基于 Mysql 协议的库,多数大数据量读取场景下有20倍以上的性能提升,部分场景有百倍的性能提升。

6.2 Java

6.2.1

对比传统的 jdbc:mysql 连接方式,测试了三种 Arrow Flight SQL 连接方式从 Doris 读取数据的性能,测试代码可以在 https://github.com/apache/doris-websit 中搜索 Arrow Flight,其中jdbc:mysql和jdbc:arrow-flight-sql都返回 JDBC ResultSet 格式的数据,Flight AdbcDriver 和Flight JdbcDriver都返回 Arrow 格式的数据。

上面的测试可以得出结论:

  1. 只需将连接字符串从 jdbc:mysql:// 替换成 jdbc:arrow-flight-sql://,在不同数据类型上可获得 2 到 10 倍的读取性能提升。
  2. 对所有连接方式而言,JDK 17 都比 JDK 1.8 的读取性能最少提升提升 10%,最多提升100%。
  3. 当读取数据量非常大时,使用 Arrow Flight SQL 将比 jdbc:mysql:// 使用更少的内存,在上面的测试中读取6亿行 Decimal Column,Arrow Flight SQL只用了不到十分之一的内存。

6.2.2

对比不同连接方式从 Doris 读取数据并转换为 String 的性能,在每一次getResultSet 或 loadNextBatch 的循环体中调用 resultSet.toString() 或 VectorSchemaRoot.contentToTSVString。

上面的测试可以得出结论:

  1. 无论解析 JDBC ResultSet 还是 Arrow 格式的数据,所耗费的时间都大于读取数据的耗时,上面的测试中将数据转换成 String 的耗时通常两倍于读取数据。
  2. JDBC ResultSet 和 Arrow 格式的一列数据转换成 String 的耗时相近,但将多列数据转换成 String 时 Arrow 格式要显著慢于 JDBC ResultSet,这是因为 JDBC ResultSet是行存的数据格式,而 Arrow 是列存的数据格式,转换成行存格式的 String 需要行列转换所以更慢。
@liugddx
Copy link
Member

liugddx commented Oct 18, 2023

Very cool feature! Can it be maintained on the confluence?

@xinyiZzz
Copy link
Contributor Author

Very cool feature! Can it be maintained on the confluence?

Yes, I will maintain it later, thanks~

xinyiZzz added a commit that referenced this issue Nov 8, 2023
…ts, besides `Select` (#25919)

Design Documentation Linked to #25514
seawinde pushed a commit to seawinde/doris that referenced this issue Nov 8, 2023
seawinde pushed a commit to seawinde/doris that referenced this issue Nov 13, 2023
wsjz pushed a commit to wsjz/incubator-doris that referenced this issue Nov 19, 2023
@liugddx
Copy link
Member

liugddx commented Nov 28, 2023

Very cool feature! Can it be maintained on the confluence?

Yes, I will maintain it later, thanks~

I'd love to get involved in this issue, is there anything I can do to help?

@xinyiZzz xinyiZzz changed the title [Feature] Doris support Arrow Flight SQL [Feature] Doris support ADBC Dec 3, 2023
@xinyiZzz xinyiZzz changed the title [Feature] Doris support ADBC [Feature] Doris support Arrow Flight SQL protocol Dec 3, 2023
yiguolei pushed a commit that referenced this issue Dec 4, 2023
Design Documentation Linked to #25514

Regression test add a new group: arrow_flight_sql,

./run-regression-test.sh -g arrow_flight_sql to run regression-test, can use jdbc:arrow-flight-sql to run all Suites whose group contains arrow_flight_sql.
./run-regression-test.sh -g p0,arrow_flight_sql to run regression-test, can use jdbc:arrow-flight-sql to run all Suites whose group contains arrow_flight_sql, and use jdbc:mysql to run other Suites whose group contains p0 but does not contain arrow_flight_sql.
Requires attention, the formats of jdbc:arrow-flight-sql and jdbc:mysql and mysql client query results are different, for example:

Datatime field type: jdbc:mysql returns 2010-01-02T05:09:06, mysql client returns 2010-01-02 05:09:06, jdbc:arrow-flight-sql also returns 2010-01-02 05:09 :06.
Array and Map field types: jdbc:mysql returns ["ab", "efg", null], {"f1": 1, "f2": "a"}, jdbc:arrow-flight-sql returns ["ab ","efg",null], {"f1":1,"f2":"a"}, which is missing spaces.
Float field type: jdbc:mysql and mysql client returns 6.333, jdbc:arrow-flight-sql returns 6.333000183105469, in query_p0/subquery/test_subquery.groovy.
If the query result is empty, jdbc:arrow-flight-sql returns empty and jdbc:mysql returns \N.
use database; and query should be divided into two SQL executions as much as possible. otherwise the results may not be as expected. For example: USE information_schema; select cast ("0.0101031417" as datetime) The result is 2000-01-01 03:14:1 (constant fold), select cast ("0.0101031417" as datetime) The result is null (no constant fold),
In addition, doris jdbc:arrow-flight-sql still has unfinished parts, such as:

Unsupported data type: Decimal256. INVALID_ARGUMENT: [INTERNAL_ERROR]Fail to convert block data to arrow data, error: [E3] write_column_to_arrow with type Decimal256
Unsupported null value of map key. INVALID_ARGUMENT: [INTERNAL_ERROR]Fail to convert block data to arrow data, error: [E33] Can not write null value of map key to arrow.
Unsupported data type: ARRAY<MAP<TEXT,TEXT>>
jdbc:arrow-flight-sql not support connecting to specify DB name, such asjdbc:arrow-flight-sql://127.0.0.1:9090/{db_name}", In order to be compatible with regression-test, use db_nameis added before all SQLs whenjdbc:arrow-flight-sql` runs regression test.
select timediff("2010-01-01 01:00:00", "2010-01-02 01:00:00");, error java.lang.NumberFormatException: For input string: "-24:00:00"
XuJianxu pushed a commit to XuJianxu/doris that referenced this issue Dec 14, 2023
XuJianxu pushed a commit to XuJianxu/doris that referenced this issue Dec 14, 2023
)

Design Documentation Linked to apache#25514

Regression test add a new group: arrow_flight_sql,

./run-regression-test.sh -g arrow_flight_sql to run regression-test, can use jdbc:arrow-flight-sql to run all Suites whose group contains arrow_flight_sql.
./run-regression-test.sh -g p0,arrow_flight_sql to run regression-test, can use jdbc:arrow-flight-sql to run all Suites whose group contains arrow_flight_sql, and use jdbc:mysql to run other Suites whose group contains p0 but does not contain arrow_flight_sql.
Requires attention, the formats of jdbc:arrow-flight-sql and jdbc:mysql and mysql client query results are different, for example:

Datatime field type: jdbc:mysql returns 2010-01-02T05:09:06, mysql client returns 2010-01-02 05:09:06, jdbc:arrow-flight-sql also returns 2010-01-02 05:09 :06.
Array and Map field types: jdbc:mysql returns ["ab", "efg", null], {"f1": 1, "f2": "a"}, jdbc:arrow-flight-sql returns ["ab ","efg",null], {"f1":1,"f2":"a"}, which is missing spaces.
Float field type: jdbc:mysql and mysql client returns 6.333, jdbc:arrow-flight-sql returns 6.333000183105469, in query_p0/subquery/test_subquery.groovy.
If the query result is empty, jdbc:arrow-flight-sql returns empty and jdbc:mysql returns \N.
use database; and query should be divided into two SQL executions as much as possible. otherwise the results may not be as expected. For example: USE information_schema; select cast ("0.0101031417" as datetime) The result is 2000-01-01 03:14:1 (constant fold), select cast ("0.0101031417" as datetime) The result is null (no constant fold),
In addition, doris jdbc:arrow-flight-sql still has unfinished parts, such as:

Unsupported data type: Decimal256. INVALID_ARGUMENT: [INTERNAL_ERROR]Fail to convert block data to arrow data, error: [E3] write_column_to_arrow with type Decimal256
Unsupported null value of map key. INVALID_ARGUMENT: [INTERNAL_ERROR]Fail to convert block data to arrow data, error: [E33] Can not write null value of map key to arrow.
Unsupported data type: ARRAY<MAP<TEXT,TEXT>>
jdbc:arrow-flight-sql not support connecting to specify DB name, such asjdbc:arrow-flight-sql://127.0.0.1:9090/{db_name}", In order to be compatible with regression-test, use db_nameis added before all SQLs whenjdbc:arrow-flight-sql` runs regression test.
select timediff("2010-01-01 01:00:00", "2010-01-02 01:00:00");, error java.lang.NumberFormatException: For input string: "-24:00:00"
@jpohanka
Copy link

@xinyiZzz Thank you for creating this feature request and for implementing Arrow Flight in Doris. Currently (Doris 2.1.0) we have only data retrieval. Are you also planning to implement data ingestion via Arrow Flight?

@xinyiZzz
Copy link
Contributor Author

@xinyiZzz Thank you for creating this feature request and for implementing Arrow Flight in Doris. Currently (Doris 2.1.0) we have only data retrieval. Are you also planning to implement data ingestion via Arrow Flight?

Hi @jpohanka, currently no plans to implement data ingestion, but expect to support in future, especially in Spark and Flink. In the past we have tested Spark via Arrow Flight load Doris and reduce data serialization time by 10 times.

@aditanase
Copy link

Thanks for adding this! Wondering if there are plans for doing something similar on the federated query side.
I think it would slot in nicely between data lake support and JDBC, allowing for the best of both worlds.

A simple use case would be to run queries against another Doris instance via ADBC instead of mysql protocol.

If someone wanted to try to implement this, what would be a good template for a partitioned / parallelisable data source? (assuming we'd want to distribute the query across multiple BEs).

@xinyiZzz
Copy link
Contributor Author

Hi @aditanase , good suggestion!

We thought about using Arrow Flight SQL to implement federated query between multiple Doris clusters. to replace now use jdbc:mysql.

For partitioned data sources, users often divide data sources according to business. for Storage and Compute Separation, it is easy to achieve resource isolation. for Doris that use local storage, different businesses often create different Doris clusters. Arrow Flight SQL is helpful for federated query and data migration between clusters.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/feature Categorizes issue or PR as related to a new feature.
Projects
None yet
Development

No branches or pull requests

4 participants