Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/jni/bindings_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,24 @@ JNIEXPORT jobject JNICALL Java_org_duckdb_DuckDBBindings_duckdb_1vector_1get_1da
*/
JNIEXPORT jobject JNICALL Java_org_duckdb_DuckDBBindings_duckdb_1vector_1get_1validity(JNIEnv *env, jclass,
jobject vector,
jlong array_size) {
jlong vector_size_elems) {

duckdb_vector vec = vector_buf_to_vector(env, vector);
if (env->ExceptionCheck()) {
return nullptr;
}

idx_t vector_size = jlong_to_idx(env, vector_size_elems);
if (env->ExceptionCheck()) {
return nullptr;
}

uint64_t *mask = duckdb_vector_get_validity(vec);
idx_t vec_len = duckdb_vector_size();
idx_t mask_len = vec_len * sizeof(uint64_t) * array_size / 64;
idx_t vector_size_rounded = vector_size;
if (vector_size % 64 != 0) {
vector_size_rounded += 64 - (vector_size % 64);
}
idx_t mask_len = vector_size_rounded * sizeof(uint64_t) / 64;

return make_data_buf(env, mask, mask_len);
}
Expand Down
36 changes: 22 additions & 14 deletions src/main/java/org/duckdb/DuckDBAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ public class DuckDBAppender implements AutoCloseable {

private static final LocalDateTime EPOCH_DATE_TIME = LocalDateTime.ofEpochSecond(0, 0, UTC);

private static final long MAX_TOP_LEVEL_ROWS = duckdb_vector_size();

private final DuckDBConnection conn;

private final String catalog;
private final String schema;
private final String table;

private final long maxRows;

private ByteBuffer appenderRef;
private final Lock appenderRefLock = new ReentrantLock();

Expand All @@ -101,8 +101,6 @@ public class DuckDBAppender implements AutoCloseable {
this.schema = schema;
this.table = table;

this.maxRows = duckdb_vector_size();

ByteBuffer appenderRef = null;
ByteBuffer[] colTypes = null;
ByteBuffer chunkRef = null;
Expand Down Expand Up @@ -163,7 +161,7 @@ public DuckDBAppender endRow() throws SQLException {
rowIdx++;
Column prev = prevColumn;
this.prevColumn = null;
if (rowIdx >= maxRows) {
if (rowIdx >= MAX_TOP_LEVEL_ROWS) {
try {
flush();
} catch (SQLException e) {
Expand Down Expand Up @@ -2325,8 +2323,10 @@ private Column(Column parent, int idx, ByteBuffer colTypeRef, ByteBuffer vector,
this.arraySize = duckdb_array_type_array_size(parent.colTypeRef);
}

long maxElems = maxElementsCount();
if (colType.widthBytes > 0 || colType == DUCKDB_TYPE_DECIMAL) {
this.data = duckdb_vector_get_data(vectorRef, vectorSize());
long vectorSizeBytes = maxElems * widthBytes();
this.data = duckdb_vector_get_data(vectorRef, vectorSizeBytes);
if (null == this.data) {
throw new SQLException("cannot initialize data chunk vector data");
}
Expand All @@ -2335,7 +2335,7 @@ private Column(Column parent, int idx, ByteBuffer colTypeRef, ByteBuffer vector,
}

duckdb_vector_ensure_validity_writable(vectorRef);
this.validity = duckdb_vector_get_validity(vectorRef, arraySize * parentArraySize());
this.validity = duckdb_vector_get_validity(vectorRef, maxElems);
if (null == this.validity) {
throw new SQLException("cannot initialize data chunk vector validity");
}
Expand All @@ -2353,15 +2353,18 @@ void reset(long listSize) throws SQLException {
}

void reset() throws SQLException {
long maxElems = maxElementsCount();

if (null != this.data) {
this.data = duckdb_vector_get_data(vectorRef, vectorSize());
long vectorSizeBytes = maxElems * widthBytes();
this.data = duckdb_vector_get_data(vectorRef, vectorSizeBytes);
if (null == this.data) {
throw new SQLException("cannot reset data chunk vector data");
}
}

duckdb_vector_ensure_validity_writable(vectorRef);
this.validity = duckdb_vector_get_validity(vectorRef, arraySize * parentArraySize());
this.validity = duckdb_vector_get_validity(vectorRef, maxElems);
if (null == this.validity) {
throw new SQLException("cannot reset data chunk vector validity");
}
Expand Down Expand Up @@ -2432,12 +2435,17 @@ long parentArraySize() {
return parent.arraySize;
}

long vectorSize() {
if (null != parent && (parent.colType == DUCKDB_TYPE_LIST || parent.colType == DUCKDB_TYPE_MAP)) {
return listSize * widthBytes();
} else {
return duckdb_vector_size() * widthBytes() * arraySize * parentArraySize();
long maxElementsCount() {
Column ancestor = this;
while (null != ancestor) {
if (null != ancestor.parent &&
(ancestor.parent.colType == DUCKDB_TYPE_LIST || ancestor.parent.colType == DUCKDB_TYPE_MAP)) {
break;
}
ancestor = ancestor.parent;
}
long maxEntries = null != ancestor ? ancestor.listSize : DuckDBAppender.MAX_TOP_LEVEL_ROWS;
return maxEntries * arraySize * parentArraySize();
}
}
}
2 changes: 1 addition & 1 deletion src/main/java/org/duckdb/DuckDBBindings.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class DuckDBBindings {

static native ByteBuffer duckdb_vector_get_data(ByteBuffer vector, long size_bytes);

static native ByteBuffer duckdb_vector_get_validity(ByteBuffer vector, long array_size);
static native ByteBuffer duckdb_vector_get_validity(ByteBuffer vector, long vector_size_elems);

static native void duckdb_vector_ensure_validity_writable(ByteBuffer vector);

Expand Down
108 changes: 35 additions & 73 deletions src/test/java/org/duckdb/TestAppenderCollection.java
Original file line number Diff line number Diff line change
Expand Up @@ -1410,93 +1410,55 @@ public static void test_appender_list_basic_nested_list() throws Exception {
}
}

private static void assertMapsEqual(Object obj1, Map<?, ?> map2) throws Exception {
Map<?, ?> map1 = (Map<?, ?>) obj1;
assertEquals(map1.size(), map2.size());
List<Map.Entry<?, ?>> list2 = new ArrayList<>(map2.entrySet());
int i = 0;
for (Map.Entry<?, ?> en : map1.entrySet()) {
assertEquals(en.getKey(), list2.get(i).getKey());
assertEquals(en.getValue(), list2.get(i).getValue());
i += 1;
}
}
public static void test_appender_list_bigint() throws Exception {
int count = 1 << 12; // auto flush twice
int tail = 7; // flushed on close
int listLen = (1 << 6) + 7; // increase this for stress tests

public static void test_appender_map_basic() throws Exception {
Map<Integer, String> map1 = createMap(41, "foo", 42, "bar");
Map<Integer, String> map2 = createMap(41, "foo", 42, null, 43, "baz");
try (DuckDBConnection conn = DriverManager.getConnection(JDBC_URL).unwrap(DuckDBConnection.class);
Statement stmt = conn.createStatement()) {
stmt.execute("CREATE TABLE tab1(col1 INTEGER, col2 MAP(INTEGER, VARCHAR))");
stmt.execute("CREATE TABLE tab1(col1 INTEGER, col2 BIGINT[])");

try (DuckDBAppender appender = conn.createAppender("tab1")) {
appender.beginRow()
.append(41)
.append(map1)
.endRow()
.beginRow()
.append(42)
.append(map2)
.endRow()
.flush();
for (int i = 0; i < count + tail; i++) {
List<Long> list = new ArrayList<>();
for (long j = 0; j < Math.min(i, listLen); j++) {
if (0 == (i + j) % 13) {
list.add(null);
} else {
list.add(i + j);
}
}
appender.beginRow().append(i).append(list).endRow();
}
}

try (ResultSet rs = stmt.executeQuery("SELECT col2 FROM tab1 ORDER BY col1")) {
assertTrue(rs.next());
assertMapsEqual(rs.getObject(1), map1);
try (ResultSet rs = stmt.executeQuery("SELECT count(*) FROM tab1")) {
assertTrue(rs.next());
assertMapsEqual(rs.getObject(1), map2);
assertEquals(rs.getInt(1), count + tail);
assertFalse(rs.next());
}
}
}

public static void test_appender_list_basic_map() throws Exception {
Map<Integer, String> map1 = createMap(41, "foo1", 42, "bar1", 43, "baz1");
Map<Integer, String> map2 = createMap(44, null, 45, "bar2");
Map<Integer, String> map3 = new LinkedHashMap<>();
Map<Integer, String> map4 = createMap(46, "foo3");
try (DuckDBConnection conn = DriverManager.getConnection(JDBC_URL).unwrap(DuckDBConnection.class);
Statement stmt = conn.createStatement()) {
stmt.execute("CREATE TABLE tab1(col1 INT, col2 MAP(INTEGER, VARCHAR)[])");
try (DuckDBAppender appender = conn.createAppender("tab1")) {
appender.beginRow()
.append(42)
.append(asList(map1, map2, map3))
.endRow()
.beginRow()
.append(43)
.append((List<Object>) null)
.endRow()
.beginRow()
.append(44)
.append(asList(null, map4))
.endRow()
.flush();
}

try (ResultSet rs = stmt.executeQuery("SELECT unnest(col2) from tab1 WHERE col1 = 42")) {
assertTrue(rs.next());
assertMapsEqual(rs.getObject(1), map1);
try (ResultSet rs = stmt.executeQuery(
"SELECT count(*) FROM (SELECT unnest(col2) FROM tab1 WHERE col1 = " + (listLen - 7) + ")")) {
assertTrue(rs.next());
assertMapsEqual(rs.getObject(1), map2);
assertTrue(rs.next());
assertMapsEqual(rs.getObject(1), map3);
assertFalse(rs.next());
}
try (ResultSet rs = stmt.executeQuery("SELECT col2 from tab1 WHERE col1 = 43")) {
assertTrue(rs.next());
assertNull(rs.getObject(1));
assertTrue(rs.wasNull());
assertEquals(rs.getInt(1), listLen - 7);
assertFalse(rs.next());
}
try (ResultSet rs = stmt.executeQuery("SELECT unnest(col2) from tab1 WHERE col1 = 44")) {
assertTrue(rs.next());
assertNull(rs.getObject(1));
assertTrue(rs.wasNull());
assertTrue(rs.next());
assertMapsEqual(rs.getObject(1), map4);
assertFalse(rs.next());

try (ResultSet rs = stmt.executeQuery("SELECT col1, unnest(col2) FROM tab1 ORDER BY col1")) {
for (int i = 0; i < count + tail; i++) {
for (long j = 0; j < Math.min(i, listLen); j++) {
assertTrue(rs.next());
assertEquals(rs.getInt(1), i);
if (0 == (i + j) % 13) {
assertNull(rs.getObject(2));
assertTrue(rs.wasNull());
} else {
assertEquals(rs.getLong(2), i + j);
}
}
}
}
}
}
Expand Down
Loading