diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java index 520a2e180e5..0362ffc2050 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java @@ -461,7 +461,10 @@ public void run() { CallType.SYNC, SELECT1_STATEMENT, AnalyzeMode.NONE, - Options.tag("connection.transaction-keep-alive")); + Options.tag( + System.getProperty( + "spanner.connection.keep_alive_query_tag", + "connection.transaction-keep-alive"))); future.addListener( ReadWriteTransaction.this::maybeScheduleKeepAlivePing, MoreExecutors.directExecutor()); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SavepointMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SavepointMockServerTest.java index e615b78e9de..31972481629 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SavepointMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SavepointMockServerTest.java @@ -682,14 +682,16 @@ public void testRollbackToSavepointWithoutInternalRetriesInReadOnlyTransaction() @Test public void testKeepAlive() throws InterruptedException, TimeoutException { + String keepAliveTag = "test_keep_alive_tag"; System.setProperty("spanner.connection.keep_alive_interval_millis", "1"); + System.setProperty("spanner.connection.keep_alive_query_tag", keepAliveTag); try (Connection connection = createConnection()) { connection.setSavepointSupport(SavepointSupport.ENABLED); connection.setKeepTransactionAlive(true); // Start a transaction by executing a statement. connection.execute(INSERT_STATEMENT); // Verify that we get a keep-alive request. - verifyHasKeepAliveRequest(); + verifyHasKeepAliveRequest(keepAliveTag); // Set a savepoint, execute another statement, and rollback to the savepoint. // The keep-alive should not be sent after the transaction has been rolled back to the // savepoint. @@ -697,38 +699,22 @@ public void testKeepAlive() throws InterruptedException, TimeoutException { connection.execute(INSERT_STATEMENT); connection.rollbackToSavepoint("s1"); mockSpanner.waitForRequestsToContain(RollbackRequest.class, 1000L); - // Wait for up to 2 milliseconds to make sure that any keep-alive requests that were in flight - // have finished. - try { - mockSpanner.waitForRequestsToContain( - r -> { - if (!(r instanceof ExecuteSqlRequest)) { - return false; - } - ExecuteSqlRequest request = (ExecuteSqlRequest) r; - return request.getSql().equals("SELECT 1") - && request - .getRequestOptions() - .getRequestTag() - .equals("connection.transaction-keep-alive"); - }, - 2L); - } catch (TimeoutException ignore) { - } + String keepAliveTagAfterRollback = "test_keep_alive_tag_after_rollback"; + System.setProperty("spanner.connection.keep_alive_query_tag", keepAliveTagAfterRollback); - // Verify that we don't get any keep-alive requests from this point. - mockSpanner.clearRequests(); + // Verify that we don't get any new keep-alive requests from this point. Thread.sleep(2L); - assertEquals(0, countKeepAliveRequest()); + assertEquals(0, countKeepAliveRequest(keepAliveTagAfterRollback)); // Resume the transaction and verify that we get a keep-alive again. connection.execute(INSERT_STATEMENT); - verifyHasKeepAliveRequest(); + verifyHasKeepAliveRequest(keepAliveTagAfterRollback); } finally { System.clearProperty("spanner.connection.keep_alive_interval_millis"); + System.clearProperty("spanner.connection.keep_alive_query_tag"); } } - private void verifyHasKeepAliveRequest() throws InterruptedException, TimeoutException { + private void verifyHasKeepAliveRequest(String tag) throws InterruptedException, TimeoutException { mockSpanner.waitForRequestsToContain( r -> { if (!(r instanceof ExecuteSqlRequest)) { @@ -736,23 +722,17 @@ private void verifyHasKeepAliveRequest() throws InterruptedException, TimeoutExc } ExecuteSqlRequest request = (ExecuteSqlRequest) r; return request.getSql().equals("SELECT 1") - && request - .getRequestOptions() - .getRequestTag() - .equals("connection.transaction-keep-alive"); + && request.getRequestOptions().getRequestTag().equals(tag); }, 1000L); } - private long countKeepAliveRequest() { + private long countKeepAliveRequest(String tag) { return mockSpanner.getRequestsOfType(ExecuteSqlRequest.class).stream() .filter( request -> request.getSql().equals("SELECT 1") - && request - .getRequestOptions() - .getRequestTag() - .equals("connection.transaction-keep-alive")) + && request.getRequestOptions().getRequestTag().equals(tag)) .count(); } }