Skip to content

Commit

Permalink
check before update
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Jul 10, 2023
1 parent 38f3c2c commit 26eba08
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 10 deletions.
Expand Up @@ -46,7 +46,7 @@ public class EmulatorController {
private Process process;
private boolean isStopped = true;
private Thread shutdownHook;

private int port;

public static EmulatorController createFromPath(Path path) {
Expand Down
Expand Up @@ -86,10 +86,11 @@ public BulkMutation add(@Nonnull ByteString rowKey, @Nonnull Mutation mutation)
Preconditions.checkNotNull(rowKey);
Preconditions.checkNotNull(mutation);

this.mutationCountSum += mutation.getMutations().size();
long mutationCount = mutation.getMutations().size();
Preconditions.checkArgument(
mutationCountSum <= MAX_MUTATION,
mutationCountSum + mutationCount <= MAX_MUTATION,
String.format("Too many mutations, got %s, limit is %s", mutationCountSum, MAX_MUTATION));
this.mutationCountSum += mutationCount;

builder.addEntries(
MutateRowsRequest.Entry.newBuilder()
Expand Down
Expand Up @@ -16,15 +16,18 @@
package com.google.cloud.bigtable.data.v2.it;

import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.TruthJUnit.assume;

import com.google.api.gax.batching.BatcherImpl;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlEventStats;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.test_helpers.env.EmulatorEnv;
import com.google.cloud.bigtable.test_helpers.env.TestEnvRule;
import java.io.IOException;
import java.util.Objects;
Expand All @@ -33,6 +36,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
public class BulkMutateIT {
Expand Down Expand Up @@ -86,24 +90,36 @@ public void test() throws IOException, InterruptedException {

@Test
public void testManyMutations() throws IOException, InterruptedException {
// Emulator is very slow and will take a long time for the test to run
assume()
.withMessage("testManyMutations is not supported on Emulator")
.that(testEnvRule.env())
.isNotInstanceOf(EmulatorEnv.class);

BigtableDataSettings settings = testEnvRule.env().getDataClientSettings();
String rowPrefix = UUID.randomUUID().toString();
// Set target latency really low so it'll trigger adjusting thresholds
BigtableDataSettings.Builder builder =
settings.toBuilder().enableBatchMutationLatencyBasedThrottling(2L);

try (BigtableDataClient client = BigtableDataClient.create(builder.build());
BatchingSettings batchingSettings =
settings.getStubSettings().bulkMutateRowsSettings().getBatchingSettings();

settings
.toBuilder()
.stubSettings()
.bulkMutateRowsSettings()
.setBatchingSettings(
batchingSettings.toBuilder().setDelayThreshold(Duration.ofHours(1)).build());
try (BigtableDataClient client = BigtableDataClient.create(settings);
BatcherImpl<RowMutationEntry, Void, BulkMutation, Void> batcher =
(BatcherImpl<RowMutationEntry, Void, BulkMutation, Void>)
client.newBulkMutationBatcher(testEnvRule.env().getTableId())) {

String familyId = testEnvRule.env().getFamilyId();
for (int i = 0; i < 4; i++) {
for (int i = 0; i < 2; i++) {
String key = rowPrefix + "test-key";
RowMutationEntry rowMutationEntry = RowMutationEntry.create(key);
// Create mutation entries with many columns. The batcher should flush every time.
for (long j = 0; j < 50001; j++) {
rowMutationEntry.setCell(familyId, "q" + j + i, "value" + j);
rowMutationEntry.setCell(familyId, "q" + j + i, j);
}
batcher.add(rowMutationEntry);
}
Expand All @@ -116,7 +132,7 @@ public void testManyMutations() throws IOException, InterruptedException {
.readRowsCallable()
.first()
.call(Query.create(testEnvRule.env().getTableId()).rowKey(rowPrefix + "test-key"));
assertThat(row.getCells()).hasSize(50001);
assertThat(row.getCells()).hasSize(100002);
}
}
}

0 comments on commit 26eba08

Please sign in to comment.