Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Bug Fixes

1. [#317](https://github.com/InfluxCommunity/influxdb3-java/pull/317): Fix Arrow memory leak when stream close fails due to thread interrupts.
1. [#318](https://github.com/InfluxCommunity/influxdb3-java/pull/318): Explicit releasing of the VectorSchemaRoot.

## 1.6.0 [2025-11-14]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -187,14 +188,8 @@ public Stream<Object[]> query(@Nonnull final String query, @Nonnull final Map<St
public Stream<Object[]> query(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options) {
return queryData(query, parameters, options)
.flatMap(vector -> IntStream.range(0, vector.getRowCount())
.mapToObj(rowNumber ->
VectorSchemaRootConverter.INSTANCE
.getArrayObjectFromVectorSchemaRoot(
vector,
rowNumber
)));
return queryDataAndProcess(query, parameters, options,
VectorSchemaRootConverter.INSTANCE::getArrayObjectFromVectorSchemaRoot);
}

@Nonnull
Expand Down Expand Up @@ -222,14 +217,8 @@ public Stream<Map<String, Object>> queryRows(@Nonnull final String query, @Nonnu
public Stream<Map<String, Object>> queryRows(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options) {
return queryData(query, parameters, options)
.flatMap(vector -> IntStream.range(0, vector.getRowCount())
.mapToObj(rowNumber ->
VectorSchemaRootConverter.INSTANCE
.getMapFromVectorSchemaRoot(
vector,
rowNumber
)));
return queryDataAndProcess(query, parameters, options,
VectorSchemaRootConverter.INSTANCE::getMapFromVectorSchemaRoot);
}

@Nonnull
Expand All @@ -255,13 +244,10 @@ public Stream<PointValues> queryPoints(@Nonnull final String query, @Nonnull fin
public Stream<PointValues> queryPoints(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options) {
return queryData(query, parameters, options)
.flatMap(vector -> {
return queryDataAndProcess(query, parameters, options,
(vector, rowNumber) -> {
List<FieldVector> fieldVectors = vector.getFieldVectors();
return IntStream
.range(0, vector.getRowCount())
.mapToObj(row ->
VectorSchemaRootConverter.INSTANCE.toPointValues(row, fieldVectors));
return VectorSchemaRootConverter.INSTANCE.toPointValues(rowNumber, fieldVectors);
});
}

Expand Down Expand Up @@ -389,6 +375,19 @@ private <T> void writeData(@Nonnull final List<T> data, @Nonnull final WriteOpti
}
}

@Nonnull
private <T> Stream<T> queryDataAndProcess(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options,
@Nonnull final BiFunction<VectorSchemaRoot, Integer, T> processor) {
return queryData(query, parameters, options)
.flatMap(vector ->
IntStream.range(0, vector.getRowCount())
.mapToObj(rowNumber -> processor.apply(vector, rowNumber))
.onClose(vector::close)
);
}

@Nonnull
private Stream<VectorSchemaRoot> queryData(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
Expand Down
Loading