Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3403,6 +3403,135 @@ void testCreateOrReplaceRefreshesSchemaOnDroppedColumn() throws Exception {
}
}

@Nested
@DisplayName("Key-Specific Bulk Update Operations")
class KeySpecificBulkUpdateTests {

@Test
@DisplayName("Should update multiple keys with all operator types in a single batch")
void testBulkUpdateAllOperatorTypes() throws Exception {
Map<Key, java.util.Collection<SubDocumentUpdate>> updates = new LinkedHashMap<>();
updates.put(
rawKey("1"),
List.of(
SubDocumentUpdate.of("item", "UpdatedSoap"),
SubDocumentUpdate.builder()
.subDocument("price")
.operator(UpdateOperator.ADD)
.subDocumentValue(SubDocumentValue.of(5))
.build(),
SubDocumentUpdate.builder()
.subDocument("props.brand")
.operator(UpdateOperator.SET)
.subDocumentValue(SubDocumentValue.of("NewBrand"))
.build()));

updates.put(
rawKey("3"),
List.of(
SubDocumentUpdate.builder()
.subDocument("props.brand")
.operator(UpdateOperator.UNSET)
.build(),
SubDocumentUpdate.builder()
.subDocument("tags")
.operator(UpdateOperator.APPEND_TO_LIST)
.subDocumentValue(SubDocumentValue.of(new String[] {"newTag1", "newTag2"}))
.build()));

updates.put(
rawKey("5"),
List.of(
SubDocumentUpdate.builder()
.subDocument("tags")
.operator(UpdateOperator.ADD_TO_LIST_IF_ABSENT)
.subDocumentValue(SubDocumentValue.of(new String[] {"hygiene", "uniqueTag"}))
.build()));

updates.put(
rawKey("6"),
List.of(
SubDocumentUpdate.builder()
.subDocument("tags")
.operator(UpdateOperator.REMOVE_ALL_FROM_LIST)
.subDocumentValue(SubDocumentValue.of(new String[] {"plastic"}))
.build()));

BulkUpdateResult result = flatCollection.bulkUpdate(updates, UpdateOptions.builder().build());

assertEquals(4, result.getUpdatedCount());

try (CloseableIterator<Document> iter = flatCollection.find(queryById("1"))) {
assertTrue(iter.hasNext());
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());
assertEquals("UpdatedSoap", json.get("item").asText());
assertEquals(15, json.get("price").asInt()); // 10 + 5
assertEquals("NewBrand", json.get("props").get("brand").asText());
assertEquals("M", json.get("props").get("size").asText()); // preserved
}

try (CloseableIterator<Document> iter = flatCollection.find(queryById("3"))) {
assertTrue(iter.hasNext());
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());
assertFalse(json.get("props").has("brand"));
assertEquals("L", json.get("props").get("size").asText()); // preserved
JsonNode tagsNode = json.get("tags");
assertEquals(6, tagsNode.size()); // Original 4 + 2 new
}

try (CloseableIterator<Document> iter = flatCollection.find(queryById("5"))) {
assertTrue(iter.hasNext());
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());
JsonNode tagsNode = json.get("tags");
assertEquals(4, tagsNode.size()); // Original 3 + 1 new unique
Set<String> tags = new HashSet<>();
tagsNode.forEach(n -> tags.add(n.asText()));
assertTrue(tags.contains("uniqueTag"));
}

try (CloseableIterator<Document> iter = flatCollection.find(queryById("6"))) {
assertTrue(iter.hasNext());
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());
JsonNode tagsNode = json.get("tags");
assertEquals(2, tagsNode.size()); // grooming, essential remain
Set<String> tags = new HashSet<>();
tagsNode.forEach(n -> tags.add(n.asText()));
assertFalse(tags.contains("plastic"));
}
}

@Test
@DisplayName("Should handle edge cases: empty map, null map, non-existent keys")
void testBulkUpdateEdgeCases() throws Exception {
UpdateOptions options = UpdateOptions.builder().build();

// Empty map
assertEquals(0, flatCollection.bulkUpdate(new HashMap<>(), options).getUpdatedCount());

// Null map
Map<Key, java.util.Collection<SubDocumentUpdate>> nullUpdates = null;
assertEquals(0, flatCollection.bulkUpdate(nullUpdates, options).getUpdatedCount());

// Non-existent key
Map<Key, java.util.Collection<SubDocumentUpdate>> nonExistent = new LinkedHashMap<>();
nonExistent.put(rawKey("non-existent"), List.of(SubDocumentUpdate.of("item", "X")));
assertEquals(0, flatCollection.bulkUpdate(nonExistent, options).getUpdatedCount());
}

// Creates a key with raw ID (matching test data format)
private Key rawKey(String id) {
return Key.from(id);
}

private Query queryById(String id) {
return Query.builder()
.setFilter(
RelationalExpression.of(
IdentifierExpression.of("id"), RelationalOperator.EQ, ConstantExpression.of(id)))
.build();
}
}

private static void executeInsertStatements() {
PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,5 +398,54 @@ CloseableIterator<Document> bulkUpdate(
final UpdateOptions updateOptions)
throws IOException;

/**
* Bulk update sub-documents with key-specific updates. Each key can have its own set of
* SubDocumentUpdate operations, allowing different updates per document.
*
* <p>This method supports all update operators (SET, UNSET, ADD, APPEND_TO_LIST,
* ADD_TO_LIST_IF_ABSENT, REMOVE_ALL_FROM_LIST). Updates for each individual key are applied
* atomically, but there is no atomicity guarantee across different keys - some keys may be
* updated while others fail. Batch-level atomicity is not guaranteed, while per-key update
* atomicity is guaranteed.
*
* <p>Example usage:
*
* <pre>{@code
* Map<Key, Collection<SubDocumentUpdate>> updates = new HashMap<>();
*
* // Key 1: SET a field and ADD to a number
* updates.put(key1, List.of(
* SubDocumentUpdate.of("name", "NewName"),
* SubDocumentUpdate.builder()
* .subDocument("count")
* .operator(UpdateOperator.ADD)
* .subDocumentValue(SubDocumentValue.of(5))
* .build()
* ));
*
* // Key 2: APPEND to an array
* updates.put(key2, List.of(
* SubDocumentUpdate.builder()
* .subDocument("tags")
* .operator(UpdateOperator.APPEND_TO_LIST)
* .subDocumentValue(SubDocumentValue.of(new String[]{"newTag"}))
* .build()
* ));
*
* BulkUpdateResult result = collection.bulkUpdate(updates, UpdateOptions.builder().build());
* }</pre>
*
* @param updates Map of Key to Collection of SubDocumentUpdate operations. Each key's updates are
* applied atomically, but no cross-key atomicity is guaranteed.
* @param updateOptions Options for the update operation
* @return BulkUpdateResult containing the count of successfully updated documents
* @throws IOException if the update operation fails
*/
default BulkUpdateResult bulkUpdate(
Map<Key, java.util.Collection<SubDocumentUpdate>> updates, UpdateOptions updateOptions)
throws IOException {
throw new UnsupportedOperationException("bulkUpdate is not supported!");
}

String UNSUPPORTED_QUERY_OPERATION = "Query operation is not supported";
}
Loading
Loading