Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Jul 10, 2023
1 parent 7a9cb62 commit 38f3c2c
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 13 deletions.
Expand Up @@ -43,6 +43,8 @@ public final class BulkMutation implements Serializable, Cloneable {
private final String tableId;
private transient MutateRowsRequest.Builder builder;

private long mutationCountSum = 0;

public static BulkMutation create(String tableId) {
return new BulkMutation(tableId);
}
Expand Down Expand Up @@ -72,11 +74,6 @@ private void writeObject(ObjectOutputStream output) throws IOException {
public BulkMutation add(@Nonnull String rowKey, @Nonnull Mutation mutation) {
Preconditions.checkNotNull(rowKey);
Preconditions.checkNotNull(mutation);
Preconditions.checkArgument(
mutation.getMutations().size() < MAX_MUTATION,
String.format(
"Too many mutations, got %s, limit is %s",
mutation.getMutations().size(), MAX_MUTATION));

return add(ByteString.copyFromUtf8(rowKey), mutation);
}
Expand All @@ -88,11 +85,11 @@ public BulkMutation add(@Nonnull String rowKey, @Nonnull Mutation mutation) {
public BulkMutation add(@Nonnull ByteString rowKey, @Nonnull Mutation mutation) {
Preconditions.checkNotNull(rowKey);
Preconditions.checkNotNull(mutation);

this.mutationCountSum += mutation.getMutations().size();
Preconditions.checkArgument(
mutation.getMutations().size() <= MAX_MUTATION,
String.format(
"Too many mutations, got %s, limit is %s",
mutation.getMutations().size(), MAX_MUTATION));
mutationCountSum <= MAX_MUTATION,
String.format("Too many mutations, got %s, limit is %s", mutationCountSum, MAX_MUTATION));

builder.addEntries(
MutateRowsRequest.Entry.newBuilder()
Expand Down
Expand Up @@ -99,11 +99,11 @@ public void testManyMutations() throws IOException, InterruptedException {

String familyId = testEnvRule.env().getFamilyId();
for (int i = 0; i < 4; i++) {
String key = rowPrefix + "test-key" + i;
String key = rowPrefix + "test-key";
RowMutationEntry rowMutationEntry = RowMutationEntry.create(key);
// Create mutation entries with many columns. The batcher should flush every time.
for (long j = 0; j < 50001; j++) {
rowMutationEntry.setCell(familyId, "q" + j, "value" + j);
rowMutationEntry.setCell(familyId, "q" + j + i, "value" + j);
}
batcher.add(rowMutationEntry);
}
Expand All @@ -115,8 +115,7 @@ public void testManyMutations() throws IOException, InterruptedException {
.getDataClient()
.readRowsCallable()
.first()
.call(
Query.create(testEnvRule.env().getTableId()).rowKey(rowPrefix + "test-key" + 2));
.call(Query.create(testEnvRule.env().getTableId()).rowKey(rowPrefix + "test-key"));
assertThat(row.getCells()).hasSize(50001);
}
}
Expand Down
Expand Up @@ -28,6 +28,7 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -172,4 +173,31 @@ public void fromProtoTest() {
.matches(NameUtil.formatTableName(projectId, instanceId, TABLE_ID));
assertThat(overriddenRequest.getAppProfileId()).matches(appProfile);
}

@Test
public void testManyMutations() {
BulkMutation bulkMutation = BulkMutation.create(TABLE_ID);

try {
for (int i = 0; i < 3; i++) {
String key = "key" + i;
Mutation mutation = Mutation.create();
for (int j = 0; j < 50000; j++) {
mutation.setCell("f", "q" + j, "value");
}
bulkMutation.add(key, mutation);
}
Assert.fail("Test should fail with IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage()).contains("Too many mutations");
}

// we should be able to add 10000 mutations
bulkMutation = BulkMutation.create(TABLE_ID);
Mutation mutation = Mutation.create();
for (int i = 0; i < 100000; i++) {
mutation.setCell("f", "q" + i, "value");
}
bulkMutation.add("key", mutation);
}
}

0 comments on commit 38f3c2c

Please sign in to comment.