Skip to content

Commit

Permalink
addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
shreyakhajanchi committed May 10, 2024
1 parent e8ab30a commit 5667766
Showing 1 changed file with 45 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.TransactionRunner;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.migrations.constants.Constants;
import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventSessionConvertor;
import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
import com.google.cloud.teleport.v2.templates.datastream.DatastreamConstants;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import java.util.Iterator;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
Expand Down Expand Up @@ -111,6 +114,7 @@ public void testProcessElement() throws Exception {
DoFn.ProcessContext processContextMock = mock(DoFn.ProcessContext.class);
DatabaseClient databaseClientMock = mock(DatabaseClient.class);
TransactionRunner transactionCallableMock = mock(TransactionRunner.class);
TransactionContext transactionContext = mock(TransactionContext.class);
ValueProvider<Options.RpcPriority> rpcPriorityValueProviderMock = mock(ValueProvider.class);
ChangeEventSessionConvertor changeEventSessionConvertor =
mock(ChangeEventSessionConvertor.class);
Expand All @@ -122,10 +126,10 @@ public void testProcessElement() throws Exception {
ObjectNode outputObject = mapper.createObjectNode();
outputObject.put(DatastreamConstants.EVENT_SOURCE_TYPE_KEY, Constants.MYSQL_SOURCE_TYPE);
outputObject.put(DatastreamConstants.EVENT_TABLE_NAME_KEY, "Users");
outputObject.put("first_name", "Johny");
outputObject.put("first_name", "Johnny");
outputObject.put("last_name", "Depp");
outputObject.put("age", 13);
outputObject.put(DatastreamConstants.MYSQL_TIMESTAMP_KEY, 123);
outputObject.put(DatastreamConstants.MYSQL_TIMESTAMP_KEY, 12345);
FailsafeElement<String, String> failsafeElement =
FailsafeElement.of(outputObject.toString(), outputObject.toString());
Ddl ddl = getTestDdl();
Expand All @@ -139,7 +143,12 @@ public void testProcessElement() throws Exception {
when(spannerAccessor.getDatabaseClient()).thenReturn(databaseClientMock);
when(changeEventSessionConvertor.transformChangeEventData(eq(outputObject), any(), eq(ddl)))
.thenReturn(outputObject);
when(transactionCallableMock.run(any())).thenReturn(null);
when(transactionCallableMock.run(any()))
.thenAnswer(
invocation -> {
TransactionRunner.TransactionCallable<Void> callable = invocation.getArgument(0);
return callable.run(transactionContext);
});
when(databaseClientMock.readWriteTransaction(any(), any())).thenReturn(transactionCallableMock);

SpannerTransactionWriterDoFn spannerTransactionWriterDoFn =
Expand All @@ -149,6 +158,38 @@ public void testProcessElement() throws Exception {
spannerTransactionWriterDoFn.setChangeEventSessionConvertor(changeEventSessionConvertor);
spannerTransactionWriterDoFn.setSpannerAccessor(spannerAccessor);
spannerTransactionWriterDoFn.processElement(processContextMock);
ArgumentCaptor<Iterable<Mutation>> argument = ArgumentCaptor.forClass(Iterable.class);
verify(transactionContext, times(1)).buffer(argument.capture());
Iterable<Mutation> capturedMutations = argument.getValue();
Iterator<Mutation> mutationIterator = capturedMutations.iterator();
Mutation actualDataMutation = null;
Mutation actualShadowTableMutation = null;

if (mutationIterator.hasNext()) {
// Get the first mutation
actualDataMutation = mutationIterator.next();

if (mutationIterator.hasNext()) {
// Get the second mutation
actualShadowTableMutation = mutationIterator.next();
}
}

Mutation.WriteBuilder dataBuilder = Mutation.newInsertOrUpdateBuilder("Users");
dataBuilder.set("first_name").to("Johnny");
dataBuilder.set("last_name").to("Depp");
dataBuilder.set("age").to(13);
Mutation expectedDataMutation = dataBuilder.build();
assertEquals(actualDataMutation, expectedDataMutation);

Mutation.WriteBuilder shadowBuilder = Mutation.newInsertOrUpdateBuilder("shadow_Users");
shadowBuilder.set("first_name").to("Johnny");
shadowBuilder.set("last_name").to("Depp");
shadowBuilder.set("timestamp").to(12345);
shadowBuilder.set("log_file").to("");
shadowBuilder.set("log_position").to(-1);
Mutation expectedShadowMutation = shadowBuilder.build();
assertEquals(actualShadowTableMutation, expectedShadowMutation);

verify(processContextMock, times(1)).output(any(com.google.cloud.Timestamp.class));
}
Expand All @@ -168,7 +209,7 @@ public void testProcessElementWithInvalidChangeEvent() throws Exception {
ObjectNode outputObject = mapper.createObjectNode();
outputObject.put(DatastreamConstants.EVENT_SOURCE_TYPE_KEY, Constants.MYSQL_SOURCE_TYPE);
outputObject.put(DatastreamConstants.EVENT_TABLE_NAME_KEY, "Users1");
outputObject.put("first_name", "Johny");
outputObject.put("first_name", "Johnny");
outputObject.put("last_name", "Depp");
outputObject.put("age", 13);
outputObject.put(DatastreamConstants.MYSQL_TIMESTAMP_KEY, 123);
Expand Down

0 comments on commit 5667766

Please sign in to comment.