Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,11 @@ private static void updateDocumentAtLeafNode(BsonValue newVal, UpdateOp updateOp
* present in the document. If it is, we return its value. Otherwise, we return the provided
* fallback value. If the current value is a bson document with $ADD or $SUBTRACT as key, we get
* the array of operands from this document and perform the corresponding operation. Operand can
* be an $IF_NOT_EXISTS bson document.
* be an $IF_NOT_EXISTS bson document. If the current value is a bson document with $LIST_APPEND
* as key, the value is a two-element array of operands; each operand resolves to a BsonArray
* (literal array, a path string referring to an existing array attribute, or an $IF_NOT_EXISTS
* document whose resolved value is an array) and the two arrays are concatenated in order with
* duplicates preserved.
* @param curValue The current value.
* @param bsonDocument The document with all field key-value pairs.
* @return Updated values to be used by SET operation.
Expand Down Expand Up @@ -654,11 +658,73 @@ private static BsonValue getNewFieldValue(final BsonValue curValue,
Number result = resolveSetOperand(operands.get(0), bsonDocument);
result = subtractNum(result, resolveSetOperand(operands.get(1), bsonDocument));
return getBsonNumberFromNumber(result);
} else if (doc.containsKey("$LIST_APPEND")) {
BsonArray operands = doc.getArray("$LIST_APPEND");
if (operands.size() != 2) {
throw new BsonUpdateInvalidArgumentException(
"Incorrect number of operands for operator or function; operator or function: "
+ "$LIST_APPEND, number of operands: " + operands.size());
}
BsonArray list1 = resolveListAppendOperand(operands.get(0), bsonDocument);
BsonArray list2 = resolveListAppendOperand(operands.get(1), bsonDocument);
BsonArray result = new BsonArray(new ArrayList<>(list1.size() + list2.size()));
result.addAll(list1);
result.addAll(list2);
return result;
}
}
return curValue;
}

/**
* Resolve a single operand of <code>$LIST_APPEND</code> to a {@link BsonArray}. Accepted operand
* shapes:
* <ul>
* <li>literal array — used as-is</li>
* <li>{@link BsonString} naming a top-level or nested document path — the resolved value must
* exist and be an array; otherwise an exception is thrown</li>
* <li>{@code {"$IF_NOT_EXISTS": {<path>: <fallback>}}} — fallback is used when the path is
* absent; the resolved value must be an array regardless of which branch is taken</li>
* </ul>
* Any other operand shape (including a nested {@code $LIST_APPEND}) is rejected.
*/
private static BsonArray resolveListAppendOperand(final BsonValue operand,
final BsonDocument bsonDocument) {
if (operand == null) {
throw new BsonUpdateInvalidArgumentException(
"An operand in the update expression has an incorrect data type");
}
if (operand.isArray()) {
return operand.asArray();
}
if (operand instanceof BsonDocument && ((BsonDocument) operand).get("$IF_NOT_EXISTS") != null) {
BsonValue resolved = resolveIfNotExists((BsonDocument) operand, bsonDocument);
if (resolved == null || !resolved.isArray()) {
throw new BsonUpdateInvalidArgumentException(
"An operand in the update expression has an incorrect data type");
}
return resolved.asArray();
}
if (operand instanceof BsonString) {
String path = ((BsonString) operand).getValue();
BsonValue topLevelValue = bsonDocument.get(path);
BsonValue bsonValue = topLevelValue != null
? topLevelValue
: CommonComparisonExpressionUtils.getFieldFromDocument(path, bsonDocument);
if (bsonValue == null) {
throw new BsonUpdateInvalidArgumentException(
"The provided expression refers to an attribute that does not exist in the item: "
+ path);
}
if (!bsonValue.isArray()) {
throw new BsonUpdateInvalidArgumentException(
"An operand in the update expression has an incorrect data type");
}
return bsonValue.asArray();
}
throw new BsonUpdateInvalidArgumentException("Invalid operand for $LIST_APPEND: " + operand);
}

/**
* Resolves an $IF_NOT_EXISTS expression. Returns the existing field value if present, otherwise
* returns the fallback value.
Expand Down
101 changes: 101 additions & 0 deletions phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson2IT.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Arrays;
import java.util.Properties;
import org.apache.phoenix.util.PropertiesUtil;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonString;
import org.bson.RawBsonDocument;
import org.junit.Test;
Expand Down Expand Up @@ -1087,4 +1090,102 @@ private static RawBsonDocument getDocument3() {
return RawBsonDocument.parse(json);
}

@Test
public void testListAppendUpdateExpression() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
String tableName = generateUniqueName();
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String ddl = "CREATE TABLE " + tableName
+ " (PK1 VARCHAR NOT NULL, COL BSON CONSTRAINT pk PRIMARY KEY(PK1))";
conn.createStatement().execute(ddl);

BsonDocument initial = new BsonDocument()
.append("events", new BsonArray(Arrays.asList(new BsonString("a"), new BsonString("b"))))
.append("counter", new BsonInt32(0));

PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?, ?)");
stmt.setString(1, "pk1");
stmt.setObject(2, initial);
stmt.executeUpdate();
conn.commit();

BsonDocument appendExisting = new BsonDocument().append("$SET",
new BsonDocument().append("events",
new BsonDocument().append("$LIST_APPEND",
new BsonArray(Arrays.asList(new BsonString("events"),
new BsonArray(Arrays.asList(new BsonString("c"))))))));

stmt = conn.prepareStatement("UPSERT INTO " + tableName
+ " VALUES (?) ON DUPLICATE KEY UPDATE COL = BSON_UPDATE_EXPRESSION(COL, '" + appendExisting
+ "')");
stmt.setString(1, "pk1");
stmt.executeUpdate();
conn.commit();

ResultSet rs =
conn.createStatement().executeQuery("SELECT COL FROM " + tableName + " WHERE PK1 = 'pk1'");
assertTrue(rs.next());
BsonDocument afterAppend = (BsonDocument) rs.getObject(1);
BsonArray events = afterAppend.getArray("events");
assertEquals(3, events.size());
assertEquals("a", events.get(0).asString().getValue());
assertEquals("b", events.get(1).asString().getValue());
assertEquals("c", events.get(2).asString().getValue());

BsonDocument createOrAppend = new BsonDocument().append("$SET",
new BsonDocument()
.append("newQueue",
new BsonDocument().append("$LIST_APPEND",
new BsonArray(Arrays.asList(
new BsonDocument().append("$IF_NOT_EXISTS",
new BsonDocument().append("newQueue", new BsonArray())),
new BsonArray(Arrays.asList(new BsonString("ev1"), new BsonString("ev2")))))))
.append("counter", new BsonDocument().append("$ADD",
new BsonArray(Arrays.asList(new BsonString("counter"), new BsonInt32(1))))));

stmt = conn.prepareStatement("UPSERT INTO " + tableName
+ " VALUES (?) ON DUPLICATE KEY UPDATE COL = BSON_UPDATE_EXPRESSION(COL, '" + createOrAppend
+ "')");
stmt.setString(1, "pk1");
stmt.executeUpdate();
conn.commit();

rs =
conn.createStatement().executeQuery("SELECT COL FROM " + tableName + " WHERE PK1 = 'pk1'");
assertTrue(rs.next());
BsonDocument afterCreate = (BsonDocument) rs.getObject(1);

assertEquals(3, afterCreate.getArray("events").size());
BsonArray queue = afterCreate.getArray("newQueue");
assertEquals(2, queue.size());
assertEquals("ev1", queue.get(0).asString().getValue());
assertEquals("ev2", queue.get(1).asString().getValue());
assertEquals(1, afterCreate.getInt32("counter").getValue());

// Re-apply the same create-or-append. newQueue now exists, so $IF_NOT_EXISTS resolves
// to the existing array (not the empty-array fallback) and the same elements are
// appended again, producing duplicates. counter advances once more.
stmt = conn.prepareStatement("UPSERT INTO " + tableName
+ " VALUES (?) ON DUPLICATE KEY UPDATE COL = BSON_UPDATE_EXPRESSION(COL, '" + createOrAppend
+ "')");
stmt.setString(1, "pk1");
stmt.executeUpdate();
conn.commit();

rs =
conn.createStatement().executeQuery("SELECT COL FROM " + tableName + " WHERE PK1 = 'pk1'");
assertTrue(rs.next());
BsonDocument afterRepeat = (BsonDocument) rs.getObject(1);

assertEquals(3, afterRepeat.getArray("events").size());
BsonArray queueRepeat = afterRepeat.getArray("newQueue");
assertEquals(4, queueRepeat.size());
assertEquals("ev1", queueRepeat.get(0).asString().getValue());
assertEquals("ev2", queueRepeat.get(1).asString().getValue());
assertEquals("ev1", queueRepeat.get(2).asString().getValue());
assertEquals("ev2", queueRepeat.get(3).asString().getValue());
assertEquals(2, afterRepeat.getInt32("counter").getValue());
}
}

}
Loading