-
Notifications
You must be signed in to change notification settings - Fork 32
Description
What happens?
Undefined, undesirable behavior running query over arrow stream that requires multiple passes.
Perhaps this issue needs to be in the duckdb-java or duckdb-python repos, but I think the behavior is likely stemming from a problem in duckdb core.
If, after first registering an arrow stream with duckdb, you run a query that requires mutiple table scans, the query will fail (java) or produce incorrect results (python).
In duckdb-java, it appears as though the second scan of the results fails with an error indicating that the stream has been released: Invalid Input Error: This stream has been released
In duckdb-python, the query completes successfully, but returns seemingly incorrect data.
Ideally, the query would complete, successfully, with the correct result, having materialized the intermediate result as necessary (and apply any predicate / filter pushdown if posible).
To Reproduce
Java repro that throws an error
package com.acme;
import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.duckdb.DuckDBConnection;
import org.duckdb.DuckDBDriver;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Properties;
public class DuckDBStreamIngestTest {
private static byte[] createStream(BufferAllocator allocator) throws Exception {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
IntVector intVector = new IntVector("id", allocator);
VarCharVector stringVector = new VarCharVector("value", allocator);
try (
VectorSchemaRoot vsr = new VectorSchemaRoot(List.of(intVector, stringVector));
ArrowStreamWriter writer = new ArrowStreamWriter(vsr, null, outputStream)
) {
vsr.setRowCount(5);
for (int i = 0; i < 5; i++) {
intVector.setSafe(i, i);
stringVector.setSafe(i, ("v " + Integer.valueOf(i).toString()).getBytes(StandardCharsets.UTF_8));
}
writer.writeBatch();
}
return outputStream.toByteArray();
}
public static void main(final String[] args) throws Exception {
BufferAllocator allocator = new RootAllocator();
byte[] bytes = createStream(allocator);
ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
ArrowStreamReader arrowReader = new ArrowStreamReader(inputStream, allocator);
ArrowArrayStream arrowArrayStream = ArrowArrayStream.allocateNew(allocator);
Data.exportArrayStream(allocator, arrowReader, arrowArrayStream);
DuckDBDriver driver = new DuckDBDriver();
try (Connection connection = driver.connect("jdbc:duckdb:", new Properties())) {
DuckDBConnection conn = connection.unwrap(DuckDBConnection.class);
conn.registerArrowStream("arrow_table", arrowArrayStream);
try (
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select id from arrow_table union all select id + 1 from arrow_table");
) {
printResultSet(resultSet);
}
}
}
private static void printResultSet(ResultSet resultSet) throws SQLException {
for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
System.out.print(resultSet.getMetaData().getColumnLabel(i) + ", ");
}
System.out.println();
while (resultSet.next()) {
resultSet.getMetaData().getColumnCount();
for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
System.out.print(resultSet.getString(i) + ", ");
}
System.out.println();
}
}
}
Java output:
Exception in thread "main" java.sql.SQLException: Invalid Input Error: This stream has been released
at org.duckdb.DuckDBNative.duckdb_jdbc_execute(Native Method)
at org.duckdb.DuckDBPreparedStatement.execute(DuckDBPreparedStatement.java:193)
at org.duckdb.DuckDBPreparedStatement.execute(DuckDBPreparedStatement.java:159)
at org.duckdb.DuckDBPreparedStatement.executeQuery(DuckDBPreparedStatement.java:229)
at org.duckdb.DuckDBPreparedStatement.executeQuery(DuckDBPreparedStatement.java:263)
at com.acme.DuckDBStreamIngestTest.main(DuckDBStreamIngestTest.java:63)
Python repro
import pyarrow as pa
import duckdb
import io
def create_arrow_stream(table):
buffer = io.BytesIO()
with pa.ipc.new_stream(buffer, table.schema) as writer:
writer.write(table)
buffer.seek(0)
return buffer
def main():
data = {
'id': [1, 2, 3, 4, 5],
'value': ['one', 'two', 'three', 'four', 'five']
}
table = pa.table(data)
stream_buffer1 = create_arrow_stream(table)
with pa.ipc.open_stream(stream_buffer1) as stream1:
duckdb.register("arrow_stream", stream1)
sql = "SELECT id FROM arrow_stream union all select id + 1 from arrow_stream"
print("Query Results:")
print(duckdb.sql(sql).show())
if __name__ == "__main__":
main()
Python output:
Query Results:
┌───────┐
│ id │
│ int64 │
├───────┤
│ 1 │
│ 2 │
│ 3 │
│ 4 │
│ 5 │
└───────┘
In both cases, you can materialize the table (issuing a CREATE TABLE <tablename> AS SELECT * FROM arrow_stream
) and the query completes and produces the appropriate result.
id,
0,
1,
2,
3,
4,
1,
2,
3,
4,
5,
OS:
linux x86_64, macOS arch64
DuckDB Version:
1.3.2
DuckDB Client:
duckdb-java,python
Hardware:
No response
Full Name:
Jonathan Swenson
Affiliation:
Omni
What is the latest build you tested with? If possible, we recommend testing with the latest nightly build.
I have tested with a stable release
Did you include all relevant data sets for reproducing the issue?
Yes
Did you include all code required to reproduce the issue?
- Yes, I have
Did you include all relevant configuration (e.g., CPU architecture, Python version, Linux distribution) to reproduce the issue?
- Yes, I have