From 330929518c44180ed3d55a4dbe91fcd36c079036 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Bedn=C3=A1=C5=99?= Date: Thu, 20 Nov 2025 08:50:46 +0100 Subject: [PATCH 1/2] chore: explicitly close the vector after finish the processing --- .../client/internal/InfluxDBClientImpl.java | 44 ++++++++----------- 1 file changed, 19 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 7b7034c0..8714402c 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import java.util.logging.Logger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -42,7 +43,6 @@ import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpResponseStatus; import org.apache.arrow.flight.CallOption; -import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import com.influxdb.v3.client.InfluxDBApiException; @@ -187,14 +187,8 @@ public Stream query(@Nonnull final String query, @Nonnull final Map query(@Nonnull final String query, @Nonnull final Map parameters, @Nonnull final QueryOptions options) { - return queryData(query, parameters, options) - .flatMap(vector -> IntStream.range(0, vector.getRowCount()) - .mapToObj(rowNumber -> - VectorSchemaRootConverter.INSTANCE - .getArrayObjectFromVectorSchemaRoot( - vector, - rowNumber - ))); + return queryDataAndProcess(query, parameters, options, + VectorSchemaRootConverter.INSTANCE::getArrayObjectFromVectorSchemaRoot); } @Nonnull @@ -222,14 +216,7 @@ public Stream> queryRows(@Nonnull final String query, @Nonnu public Stream> queryRows(@Nonnull final String query, @Nonnull final Map parameters, @Nonnull final QueryOptions options) { - return queryData(query, parameters, options) - .flatMap(vector -> IntStream.range(0, vector.getRowCount()) - .mapToObj(rowNumber -> - VectorSchemaRootConverter.INSTANCE - .getMapFromVectorSchemaRoot( - vector, - rowNumber - ))); + return queryDataAndProcess(query, parameters, options, VectorSchemaRootConverter.INSTANCE::getMapFromVectorSchemaRoot); } @Nonnull @@ -255,14 +242,8 @@ public Stream queryPoints(@Nonnull final String query, @Nonnull fin public Stream queryPoints(@Nonnull final String query, @Nonnull final Map parameters, @Nonnull final QueryOptions options) { - return queryData(query, parameters, options) - .flatMap(vector -> { - List fieldVectors = vector.getFieldVectors(); - return IntStream - .range(0, vector.getRowCount()) - .mapToObj(row -> - VectorSchemaRootConverter.INSTANCE.toPointValues(row, fieldVectors)); - }); + return queryDataAndProcess(query, parameters, options, + (vector, rowNumber) -> VectorSchemaRootConverter.INSTANCE.toPointValues(rowNumber, vector.getFieldVectors())); } @Nonnull @@ -389,6 +370,19 @@ private void writeData(@Nonnull final List data, @Nonnull final WriteOpti } } + @Nonnull + private Stream queryDataAndProcess(@Nonnull final String query, + @Nonnull final Map parameters, + @Nonnull final QueryOptions options, + @Nonnull final BiFunction processor) { + return queryData(query, parameters, options) + .flatMap(vector -> + IntStream.range(0, vector.getRowCount()) + .mapToObj(rowNumber -> processor.apply(vector, rowNumber)) + .onClose(vector::close) + ); + } + @Nonnull private Stream queryData(@Nonnull final String query, @Nonnull final Map parameters, From 5f74725e25b5be150af8ee9c2e189e30bfea44c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Bedn=C3=A1=C5=99?= Date: Thu, 20 Nov 2025 09:00:32 +0100 Subject: [PATCH 2/2] docs: update CHANGELOG.md --- CHANGELOG.md | 1 + .../influxdb/v3/client/internal/InfluxDBClientImpl.java | 9 +++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 672a429a..d1b79bb9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Bug Fixes 1. [#317](https://github.com/InfluxCommunity/influxdb3-java/pull/317): Fix Arrow memory leak when stream close fails due to thread interrupts. +1. [#318](https://github.com/InfluxCommunity/influxdb3-java/pull/318): Explicit releasing of the VectorSchemaRoot. ## 1.6.0 [2025-11-14] diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 8714402c..50db570a 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -43,6 +43,7 @@ import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpResponseStatus; import org.apache.arrow.flight.CallOption; +import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import com.influxdb.v3.client.InfluxDBApiException; @@ -216,7 +217,8 @@ public Stream> queryRows(@Nonnull final String query, @Nonnu public Stream> queryRows(@Nonnull final String query, @Nonnull final Map parameters, @Nonnull final QueryOptions options) { - return queryDataAndProcess(query, parameters, options, VectorSchemaRootConverter.INSTANCE::getMapFromVectorSchemaRoot); + return queryDataAndProcess(query, parameters, options, + VectorSchemaRootConverter.INSTANCE::getMapFromVectorSchemaRoot); } @Nonnull @@ -243,7 +245,10 @@ public Stream queryPoints(@Nonnull final String query, @Nonnull final Map parameters, @Nonnull final QueryOptions options) { return queryDataAndProcess(query, parameters, options, - (vector, rowNumber) -> VectorSchemaRootConverter.INSTANCE.toPointValues(rowNumber, vector.getFieldVectors())); + (vector, rowNumber) -> { + List fieldVectors = vector.getFieldVectors(); + return VectorSchemaRootConverter.INSTANCE.toPointValues(rowNumber, fieldVectors); + }); } @Nonnull