Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 137 additions & 27 deletions src/main/java/io/tiledb/java/api/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import io.tiledb.libtiledb.*;
import java.math.BigInteger;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -56,9 +57,9 @@ public class Query implements AutoCloseable {

private NativeArray subarray;

private HashMap<String, NativeArray> buffers_;
private HashMap<String, Pair<NativeArray, NativeArray>> var_buffers_;
private HashMap<String, Pair<uint64_tArray, uint64_tArray>> buffer_sizes_;
private Map<String, NativeArray> buffers_;
private Map<String, Pair<NativeArray, NativeArray>> var_buffers_;
private Map<String, Pair<uint64_tArray, uint64_tArray>> buffer_sizes_;

public Query(Array array, QueryType type) throws TileDBError {
Context _ctx = array.getCtx();
Expand All @@ -76,9 +77,9 @@ public Query(Array array, QueryType type) throws TileDBError {
this.array = array;
this.querypp = _querypp;
this.queryp = tiledb.tiledb_query_tpp_value(_querypp);
this.buffers_ = new HashMap<>();
this.var_buffers_ = new HashMap<>();
this.buffer_sizes_ = new HashMap<>();
this.buffers_ = Collections.synchronizedMap(new HashMap<>());
this.var_buffers_ = Collections.synchronizedMap(new HashMap<>());
this.buffer_sizes_ = Collections.synchronizedMap(new HashMap<>());
}

public Query(Array array) throws TileDBError {
Expand All @@ -91,7 +92,7 @@ public Query(Array array) throws TileDBError {
* @param layout The layout order to be set.
* @exception TileDBError A TileDB exception
*/
public Query setLayout(Layout layout) throws TileDBError {
public synchronized Query setLayout(Layout layout) throws TileDBError {
ctx.handleError(tiledb.tiledb_query_set_layout(ctx.getCtxp(), queryp, layout.toSwigEnum()));
return this;
}
Expand Down Expand Up @@ -150,7 +151,7 @@ public void submitAsync(Callback callback) throws TileDBError {
* @param subarray The targeted subarray.
* @exception TileDBError A TileDB exception
*/
public Query setSubarray(NativeArray subarray) throws TileDBError {
public synchronized Query setSubarray(NativeArray subarray) throws TileDBError {
Types.typeCheck(subarray.getNativeType(), array.getSchema().getDomain().getType());
ctx.handleError(
tiledb.tiledb_query_set_subarray(ctx.getCtxp(), queryp, subarray.toVoidPointer()));
Expand All @@ -171,7 +172,7 @@ public Query setSubarray(NativeArray subarray) throws TileDBError {
* @return This query
* @throws TileDBError A TileDB exception
*/
public Query addRange(int dimIdx, Object start, Object end) throws TileDBError {
public synchronized Query addRange(int dimIdx, Object start, Object end) throws TileDBError {
Datatype dimType;
try (ArraySchema schema = array.getSchema();
Domain domain = schema.getDomain()) {
Expand Down Expand Up @@ -264,7 +265,7 @@ public Pair<Object, Object> getRange(int dimIdx, long rangeIdx) throws TileDBErr
* @param buffer NativeBuffer to be used for the attribute values.
* @exception TileDBError A TileDB exception
*/
public Query setBuffer(String attr, NativeArray buffer) throws TileDBError {
public synchronized Query setBuffer(String attr, NativeArray buffer) throws TileDBError {
try (ArraySchema schema = array.getSchema()) {
if (attr.equals(tiledb.tiledb_coords())) {
try (Domain domain = schema.getDomain()) {
Expand Down Expand Up @@ -297,7 +298,8 @@ public Query setBuffer(String attr, NativeArray buffer) throws TileDBError {
* @param buffer Buffer vector with elements of the attribute type.
* @exception TileDBError A TileDB exception
*/
public Query setBuffer(String attr, NativeArray offsets, NativeArray buffer) throws TileDBError {
public synchronized Query setBuffer(String attr, NativeArray offsets, NativeArray buffer)
throws TileDBError {

if (attr.equals(tiledb.tiledb_coords())) {
throw new TileDBError("Cannot set coordinate buffer as variable sized.");
Expand All @@ -313,7 +315,7 @@ public Query setBuffer(String attr, NativeArray offsets, NativeArray buffer) thr
Types.typeCheck(attribute.getType(), buffer.getNativeType());
}
Pair<uint64_tArray, uint64_tArray> buffer_sizes =
new Pair<uint64_tArray, uint64_tArray>(new uint64_tArray(1), new uint64_tArray(1));
new Pair<>(new uint64_tArray(1), new uint64_tArray(1));
buffer_sizes.getFirst().setitem(0, BigInteger.valueOf(offsets.getNBytes()));
buffer_sizes.getSecond().setitem(0, BigInteger.valueOf(buffer.getNBytes()));
// close previous buffers if they exist for this attribute
Expand All @@ -327,6 +329,115 @@ public Query setBuffer(String attr, NativeArray offsets, NativeArray buffer) thr
return this;
}

private Query setBufferSizeUnsafe(String attribute, long offsetSize, long bufferSize) {
buffer_sizes_.get(attribute).getFirst().setitem(0, BigInteger.valueOf(offsetSize));
buffer_sizes_.get(attribute).getSecond().setitem(0, BigInteger.valueOf(bufferSize));
return this;
}

public synchronized Query setBufferByteSize(String attribute, Long offsetSize, Long bufferSize)
throws TileDBError {
if (!var_buffers_.containsKey(attribute)) {
throw new TileDBError("Query var attribute buffer does not exist: " + attribute);
}
if (offsetSize <= 0 || bufferSize <= 0) {
throw new TileDBError("Number of buffer bytes must be >= 1");
}
Pair<NativeArray, NativeArray> varBuffers = var_buffers_.get(attribute);
NativeArray offsetBuffer = varBuffers.getFirst();
Long offsetNBytes = offsetBuffer.getNBytes();
NativeArray buffer = varBuffers.getSecond();
Long bufferNBytes = buffer.getNBytes();
if (offsetSize > offsetNBytes) {
throw new TileDBError(
"Number of offset bytes requested exceeds the number bytes of in allocated offset buffer: "
+ offsetNBytes
+ " > "
+ offsetSize);
}
if (bufferSize > bufferNBytes) {
throw new TileDBError(
"Number of buffer bytes requested exceeds the number of bytes in allocated buffer"
+ bufferNBytes
+ " > "
+ bufferSize);
}
return setBufferSizeUnsafe(attribute, offsetSize, bufferSize);
}

public synchronized Query setBufferByteSize(String attribute, Long bufferSize)
throws TileDBError {
if (!buffers_.containsKey(attribute)) {
throw new TileDBError("Query attrbute buffer does not exist: " + attribute);
}
if (bufferSize <= 0) {
throw new TileDBError("Number of buffer bytes must be >= 1");
}
NativeArray buffer = buffers_.get(attribute);
Long bufferNBytes = buffer.getNBytes();
if (bufferSize > bufferNBytes) {
throw new TileDBError(
"Number of bytes requested exceeds the number of bytes in allocated buffer: "
+ bufferSize
+ " > "
+ bufferNBytes);
}
return setBufferSizeUnsafe(attribute, 0l, bufferSize);
}

public synchronized Query setBufferElements(String attribute, Integer bufferElements)
throws TileDBError {
if (!buffers_.containsKey(attribute)) {
throw new TileDBError("Query attribute buffer does not exist: " + attribute);
}
if (bufferElements <= 0) {
throw new TileDBError("Number of buffer elements must be >= 1");
}
NativeArray buffer = buffers_.get(attribute);
Integer bufferSize = buffer.getSize();
if (bufferElements > bufferSize) {
throw new TileDBError(
"Number of elements requested exceeds the number of elements in allocated buffer: "
+ bufferElements
+ " > "
+ bufferSize);
}
return setBufferSizeUnsafe(attribute, 0l, (long) (bufferElements * buffer.getNativeTypeSize()));
}

public synchronized Query setBufferElements(
String attribute, Integer offsetElements, Integer bufferElements) throws TileDBError {
if (!var_buffers_.containsKey(attribute)) {
throw new TileDBError("Query var attribute buffer does not exist: " + attribute);
}
if (offsetElements <= 0 || bufferElements <= 0) {
throw new TileDBError("Number of buffer elements must be >= 1");
}
Pair<NativeArray, NativeArray> varBuffers = var_buffers_.get(attribute);
NativeArray offsetBuffer = varBuffers.getFirst();
Integer offsetSize = offsetBuffer.getSize();
NativeArray buffer = varBuffers.getSecond();
Integer bufferSize = buffer.getSize();
if (offsetElements > offsetSize) {
throw new TileDBError(
"Number of offset elements requested exceeds the number of elements in allocated offset buffer: "
+ offsetElements
+ " > "
+ offsetSize);
}
if (bufferElements > bufferSize) {
throw new TileDBError(
"Number of buffer elements requested exceeds the number of elements in allocated buffer"
+ bufferElements
+ " > "
+ bufferSize);
}
return setBufferSizeUnsafe(
attribute,
(long) (offsetElements * offsetBuffer.getNativeTypeSize()),
(long) (bufferElements * buffer.getNativeTypeSize()));
}

/**
* Set the coordinate buffer
*
Expand Down Expand Up @@ -364,7 +475,7 @@ public HashMap<String, Pair<Long, Long>> resultBufferElements() throws TileDBErr
BigInteger val_nbytes = buffer_sizes_.get(name).getSecond().getitem(0);
Long nelements =
val_nbytes.divide(BigInteger.valueOf(val_buffer.getNativeTypeSize())).longValue();
result.put(name, new Pair<Long, Long>(0l, nelements));
result.put(name, new Pair<>(0l, nelements));
}
for (Map.Entry<String, Pair<NativeArray, NativeArray>> entry : var_buffers_.entrySet()) {
String name = entry.getKey();
Expand All @@ -385,7 +496,7 @@ public HashMap<String, Pair<Long, Long>> resultBufferElements() throws TileDBErr
}

/** Clears all attribute buffers. */
public void resetBuffers() {
public synchronized void resetBuffers() {
for (NativeArray buffer : buffers_.values()) {
buffer.close();
}
Expand All @@ -402,23 +513,25 @@ public void resetBuffers() {
buffer_sizes_.clear();
}

/** Resets all attribute buffer sizes to zero */
public void resetBufferSizes() {
BigInteger zero = BigInteger.valueOf(0L);
public synchronized Query resetBufferSizes(Long val) {
BigInteger sizeVal = BigInteger.valueOf(val);
for (Pair<uint64_tArray, uint64_tArray> size_pair : buffer_sizes_.values()) {
size_pair.getFirst().setitem(0, zero);
size_pair.getSecond().setitem(0, zero);
size_pair.getFirst().setitem(0, sizeVal);
size_pair.getSecond().setitem(0, sizeVal);
}
return this;
}

private void prepareSubmission() throws TileDBError {
/** Resets all attribute buffer sizes to zero */
public Query resetBufferSizes() {
return resetBufferSizes(0l);
}

private synchronized void prepareSubmission() throws TileDBError {
for (Map.Entry<String, NativeArray> entry : buffers_.entrySet()) {
String name = entry.getKey();
NativeArray buffer = entry.getValue();
Pair<uint64_tArray, uint64_tArray> buffer_sizes = buffer_sizes_.get(name);
buffer_sizes.getFirst().setitem(0, BigInteger.valueOf(0L));
buffer_sizes.getSecond().setitem(0, BigInteger.valueOf(buffer.getNBytes()));

uint64_tArray buffer_size = buffer_sizes.getSecond();
ctx.handleError(
tiledb.tiledb_query_set_buffer(
Expand All @@ -430,9 +543,6 @@ private void prepareSubmission() throws TileDBError {
NativeArray val_buffer = entry.getValue().getSecond();

Pair<uint64_tArray, uint64_tArray> buffer_size = buffer_sizes_.get(name);
buffer_size.getFirst().setitem(0, BigInteger.valueOf(off_buffer.getNBytes()));
buffer_size.getSecond().setitem(0, BigInteger.valueOf(val_buffer.getNBytes()));

uint64_tArray offsets = PointerUtils.uint64_tArrayFromVoid(off_buffer.toVoidPointer());
uint64_tArray off_size = buffer_size.getFirst();
uint64_tArray val_size = buffer_size.getSecond();
Expand Down Expand Up @@ -535,7 +645,7 @@ public String toString() {

/** Free's native TileDB resources associated with the Query object */
@Override
public void close() {
public synchronized void close() {
if (queryp != null) {
for (Pair<uint64_tArray, uint64_tArray> size_pair : buffer_sizes_.values()) {
size_pair.getFirst().delete();
Expand Down
Loading