diff --git a/CHANGELOG.md b/CHANGELOG.md index 672a429a..d1b79bb9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Bug Fixes 1. [#317](https://github.com/InfluxCommunity/influxdb3-java/pull/317): Fix Arrow memory leak when stream close fails due to thread interrupts. +1. [#318](https://github.com/InfluxCommunity/influxdb3-java/pull/318): Explicit releasing of the VectorSchemaRoot. ## 1.6.0 [2025-11-14] diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 7b7034c0..50db570a 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import java.util.logging.Logger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -187,14 +188,8 @@ public Stream query(@Nonnull final String query, @Nonnull final Map query(@Nonnull final String query, @Nonnull final Map parameters, @Nonnull final QueryOptions options) { - return queryData(query, parameters, options) - .flatMap(vector -> IntStream.range(0, vector.getRowCount()) - .mapToObj(rowNumber -> - VectorSchemaRootConverter.INSTANCE - .getArrayObjectFromVectorSchemaRoot( - vector, - rowNumber - ))); + return queryDataAndProcess(query, parameters, options, + VectorSchemaRootConverter.INSTANCE::getArrayObjectFromVectorSchemaRoot); } @Nonnull @@ -222,14 +217,8 @@ public Stream> queryRows(@Nonnull final String query, @Nonnu public Stream> queryRows(@Nonnull final String query, @Nonnull final Map parameters, @Nonnull final QueryOptions options) { - return queryData(query, parameters, options) - .flatMap(vector -> IntStream.range(0, vector.getRowCount()) - .mapToObj(rowNumber -> - VectorSchemaRootConverter.INSTANCE - .getMapFromVectorSchemaRoot( - vector, - rowNumber - ))); + return queryDataAndProcess(query, parameters, options, + VectorSchemaRootConverter.INSTANCE::getMapFromVectorSchemaRoot); } @Nonnull @@ -255,13 +244,10 @@ public Stream queryPoints(@Nonnull final String query, @Nonnull fin public Stream queryPoints(@Nonnull final String query, @Nonnull final Map parameters, @Nonnull final QueryOptions options) { - return queryData(query, parameters, options) - .flatMap(vector -> { + return queryDataAndProcess(query, parameters, options, + (vector, rowNumber) -> { List fieldVectors = vector.getFieldVectors(); - return IntStream - .range(0, vector.getRowCount()) - .mapToObj(row -> - VectorSchemaRootConverter.INSTANCE.toPointValues(row, fieldVectors)); + return VectorSchemaRootConverter.INSTANCE.toPointValues(rowNumber, fieldVectors); }); } @@ -389,6 +375,19 @@ private void writeData(@Nonnull final List data, @Nonnull final WriteOpti } } + @Nonnull + private Stream queryDataAndProcess(@Nonnull final String query, + @Nonnull final Map parameters, + @Nonnull final QueryOptions options, + @Nonnull final BiFunction processor) { + return queryData(query, parameters, options) + .flatMap(vector -> + IntStream.range(0, vector.getRowCount()) + .mapToObj(rowNumber -> processor.apply(vector, rowNumber)) + .onClose(vector::close) + ); + } + @Nonnull private Stream queryData(@Nonnull final String query, @Nonnull final Map parameters,