From 993aae033146a551f90ed11cdacd3ec0911627f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Wed, 7 Aug 2024 09:56:55 +0200 Subject: [PATCH 1/2] chore: improve test execution speed Improves the unit test execution speed from around 2m15s to around 60s. --- .../cloud/spanner/DatabaseClientImplTest.java | 5 +- .../google/cloud/spanner/SessionPoolTest.java | 22 ++- .../cloud/spanner/spi/v1/GfeLatencyTest.java | 166 ++++++++---------- 3 files changed, 90 insertions(+), 103 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 3dacd43a55e..62a10c0adb4 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -3859,7 +3859,8 @@ public void testBatchCreateSessionsFailure_shouldNotPropagateToCloseMethod() { try { // Simulate session creation failures on the backend. mockSpanner.setBatchCreateSessionsExecutionTime( - SimulatedExecutionTime.ofStickyException(Status.RESOURCE_EXHAUSTED.asRuntimeException())); + SimulatedExecutionTime.ofStickyException( + Status.FAILED_PRECONDITION.asRuntimeException())); DatabaseClient client = spannerWithEmptySessionPool.getDatabaseClient( DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); @@ -3867,7 +3868,7 @@ public void testBatchCreateSessionsFailure_shouldNotPropagateToCloseMethod() { // non-blocking, and any exceptions will be delayed until actual query execution. try (ResultSet rs = client.singleUse().executeQuery(SELECT1)) { SpannerException e = assertThrows(SpannerException.class, rs::next); - assertThat(e.getErrorCode()).isEqualTo(ErrorCode.RESOURCE_EXHAUSTED); + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION); } } finally { mockSpanner.setBatchCreateSessionsExecutionTime(SimulatedExecutionTime.none()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index 564e2b97aa0..58b0280cf08 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -74,6 +74,7 @@ import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.cloud.spanner.spi.v1.SpannerRpc.ResultStreamConsumer; import com.google.cloud.spanner.v1.stub.SpannerStubSettings; +import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; @@ -2022,14 +2023,16 @@ public void testOpenCensusMetricsDisable() { public void testOpenTelemetrySessionMetrics() throws Exception { SpannerOptions.resetActiveTracingFramework(); SpannerOptions.enableOpenTelemetryMetrics(); - // Create a session pool with max 2 session and a low timeout for waiting for a session. + // Create a session pool with max 3 session and a low timeout for waiting for a session. if (minSessions == 1) { options = SessionPoolOptions.newBuilder() .setMinSessions(1) .setMaxSessions(3) - .setMaxIdleSessions(0) - .setInitialWaitForSessionTimeoutMillis(50L) + // This must be set to null for the setInitialWaitForSessionTimeoutMillis call to have + // any effect. + .setAcquireSessionTimeout(null) + .setInitialWaitForSessionTimeoutMillis(1L) .build(); FakeClock clock = new FakeClock(); clock.currentTimeMillis.set(System.currentTimeMillis()); @@ -2080,26 +2083,29 @@ public void testOpenTelemetrySessionMetrics() throws Exception { Future fut = executor.submit( () -> { + PooledSessionFuture session = pool.getSession(); latch.countDown(); - Session session = pool.getSession(); + session.get(); session.close(); return null; }); // Wait until the background thread is actually waiting for a session. latch.await(); // Wait until the request has timed out. - int waitCount = 0; - while (pool.getNumWaiterTimeouts() == 0L && waitCount < 1000) { - Thread.sleep(5L); - waitCount++; + Stopwatch watch = Stopwatch.createStarted(); + while (pool.getNumWaiterTimeouts() == 0L && watch.elapsed(TimeUnit.MILLISECONDS) < 100) { + Thread.yield(); } + assertTrue(pool.getNumWaiterTimeouts() > 0); // Return the checked out session to the pool so the async request will get a session and // finish. session2.close(); // Verify that the async request also succeeds. fut.get(10L, TimeUnit.SECONDS); executor.shutdown(); + assertTrue(executor.awaitTermination(10L, TimeUnit.SECONDS)); + inMemoryMetricReader.forceFlush(); metricDataCollection = inMemoryMetricReader.collectAllMetrics(); // Max Allowed sessions should be 3 diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java index 9bb09aace70..7b027a27551 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java @@ -17,6 +17,7 @@ package com.google.cloud.spanner.spi.v1; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.OAuth2Credentials; @@ -27,6 +28,9 @@ import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.Statement; +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.protobuf.ListValue; import com.google.spanner.v1.ResultSetMetadata; import com.google.spanner.v1.StructType; @@ -47,7 +51,6 @@ import io.opencensus.tags.TagValue; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; @@ -80,26 +83,22 @@ public class GfeLatencyTest { private static MockSpannerServiceImpl mockSpanner; private static Server server; - private static InetSocketAddress address; private static Spanner spanner; private static DatabaseClient databaseClient; - private static final Map optionsMap = new HashMap<>(); - private static MockSpannerServiceImpl mockSpannerNoHeader; private static Server serverNoHeader; - private static InetSocketAddress addressNoHeader; private static Spanner spannerNoHeader; private static DatabaseClient databaseClientNoHeader; - private static String instanceId = "fake-instance"; - private static String databaseId = "fake-database"; - private static String projectId = "fake-project"; + private static final String INSTANCE_ID = "fake-instance"; + private static final String DATABASE_ID = "fake-database"; + private static final String PROJECT_ID = "fake-project"; - private static final long WAIT_FOR_METRICS_TIME_MS = 1_000; - private static final int MAXIMUM_RETRIES = 5; + private static final int MAXIMUM_RETRIES = 50000; - private static AtomicInteger fakeServerTiming = new AtomicInteger(new Random().nextInt(1000) + 1); + private static final AtomicInteger FAKE_SERVER_TIMING = + new AtomicInteger(new Random().nextInt(1000) + 1); private static final Statement SELECT1AND2 = Statement.of("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL1"); @@ -135,6 +134,7 @@ public class GfeLatencyTest { @BeforeClass public static void startServer() throws IOException { + //noinspection deprecation SpannerRpcViews.registerGfeLatencyAndHeaderMissingCountViews(); mockSpanner = new MockSpannerServiceImpl(); @@ -143,7 +143,7 @@ public static void startServer() throws IOException { MockSpannerServiceImpl.StatementResult.query(SELECT1AND2, SELECT1_RESULTSET)); mockSpanner.putStatementResult( MockSpannerServiceImpl.StatementResult.update(UPDATE_FOO_STATEMENT, 1L)); - address = new InetSocketAddress("localhost", 0); + InetSocketAddress address = new InetSocketAddress("localhost", 0); server = NettyServerBuilder.forAddress(address) .addService(mockSpanner) @@ -161,7 +161,7 @@ public ServerCall.Listener interceptCall( public void sendHeaders(Metadata headers) { headers.put( Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER), - String.format("gfet4t7; dur=%d", fakeServerTiming.get())); + String.format("gfet4t7; dur=%d", FAKE_SERVER_TIMING.get())); super.sendHeaders(headers); } }, @@ -170,9 +170,8 @@ public void sendHeaders(Metadata headers) { }) .build() .start(); - optionsMap.put(SpannerRpc.Option.CHANNEL_HINT, 1L); spanner = createSpannerOptions(address, server).getService(); - databaseClient = spanner.getDatabaseClient(DatabaseId.of(projectId, instanceId, databaseId)); + databaseClient = spanner.getDatabaseClient(DatabaseId.of(PROJECT_ID, INSTANCE_ID, DATABASE_ID)); mockSpannerNoHeader = new MockSpannerServiceImpl(); mockSpannerNoHeader.setAbortProbability(0.0D); @@ -180,7 +179,7 @@ public void sendHeaders(Metadata headers) { MockSpannerServiceImpl.StatementResult.query(SELECT1AND2, SELECT1_RESULTSET)); mockSpannerNoHeader.putStatementResult( MockSpannerServiceImpl.StatementResult.update(UPDATE_FOO_STATEMENT, 1L)); - addressNoHeader = new InetSocketAddress("localhost", 0); + InetSocketAddress addressNoHeader = new InetSocketAddress("localhost", 0); serverNoHeader = NettyServerBuilder.forAddress(addressNoHeader) .addService(mockSpannerNoHeader) @@ -188,7 +187,7 @@ public void sendHeaders(Metadata headers) { .start(); spannerNoHeader = createSpannerOptions(addressNoHeader, serverNoHeader).getService(); databaseClientNoHeader = - spannerNoHeader.getDatabaseClient(DatabaseId.of(projectId, instanceId, databaseId)); + spannerNoHeader.getDatabaseClient(DatabaseId.of(PROJECT_ID, INSTANCE_ID, DATABASE_ID)); } @AfterClass @@ -221,12 +220,9 @@ public void testGfeLatencyExecuteStreamingSql() throws InterruptedException { long latency = getMetric( SpannerRpcViews.SPANNER_GFE_LATENCY_VIEW, - projectId, - instanceId, - databaseId, "google.spanner.v1.Spanner/ExecuteStreamingSql", false); - assertEquals(fakeServerTiming.get(), latency); + assertEquals(FAKE_SERVER_TIMING.get(), latency); } @Test @@ -238,12 +234,9 @@ public void testGfeLatencyExecuteSql() throws InterruptedException { long latency = getMetric( SpannerRpcViews.SPANNER_GFE_LATENCY_VIEW, - projectId, - instanceId, - databaseId, "google.spanner.v1.Spanner/ExecuteSql", false); - assertEquals(fakeServerTiming.get(), latency); + assertEquals(FAKE_SERVER_TIMING.get(), latency); } @Test @@ -251,15 +244,14 @@ public void testGfeMissingHeaderCountExecuteStreamingSql() throws InterruptedExc try (ResultSet rs = databaseClient.singleUse().executeQuery(SELECT1AND2)) { rs.next(); } + Stopwatch watch = Stopwatch.createStarted(); long count = getMetric( SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, - projectId, - instanceId, - databaseId, "google.spanner.v1.Spanner/ExecuteStreamingSql", false); assertEquals(0, count); + System.out.println("Duration: " + watch.elapsed()); try (ResultSet rs = databaseClientNoHeader.singleUse().executeQuery(SELECT1AND2)) { rs.next(); @@ -267,9 +259,6 @@ public void testGfeMissingHeaderCountExecuteStreamingSql() throws InterruptedExc long count1 = getMetric( SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, - projectId, - instanceId, - databaseId, "google.spanner.v1.Spanner/ExecuteStreamingSql", true); assertEquals(1, count1); @@ -283,9 +272,6 @@ public void testGfeMissingHeaderExecuteSql() throws InterruptedException { long count = getMetric( SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, - projectId, - instanceId, - databaseId, "google.spanner.v1.Spanner/ExecuteSql", false); assertEquals(0, count); @@ -296,9 +282,6 @@ public void testGfeMissingHeaderExecuteSql() throws InterruptedException { long count1 = getMetric( SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, - projectId, - instanceId, - databaseId, "google.spanner.v1.Spanner/ExecuteSql", true); assertEquals(1, count1); @@ -321,78 +304,75 @@ private static SpannerOptions createSpannerOptions(InetSocketAddress address, Se } private long getAggregationValueAsLong(AggregationData aggregationData) { - return aggregationData.match( - new io.opencensus.common.Function() { - @Override - public Long apply(AggregationData.SumDataDouble arg) { - return (long) arg.getSum(); - } - }, - new io.opencensus.common.Function() { - @Override - public Long apply(AggregationData.SumDataLong arg) { - return arg.getSum(); - } - }, - new io.opencensus.common.Function() { - @Override - public Long apply(AggregationData.CountData arg) { - return arg.getCount(); - } - }, - new io.opencensus.common.Function() { - @Override - public Long apply(AggregationData.DistributionData arg) { - return (long) arg.getMean(); - } - }, - new io.opencensus.common.Function() { - @Override - public Long apply(AggregationData.LastValueDataDouble arg) { - return (long) arg.getLastValue(); - } - }, - new io.opencensus.common.Function() { - @Override - public Long apply(AggregationData.LastValueDataLong arg) { - return arg.getLastValue(); - } - }, - new io.opencensus.common.Function() { - @Override - public Long apply(AggregationData arg) { - throw new UnsupportedOperationException(); - } - }); + return MoreObjects.firstNonNull( + aggregationData.match( + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.SumDataDouble arg) { + return (long) Preconditions.checkNotNull(arg).getSum(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.SumDataLong arg) { + return Preconditions.checkNotNull(arg).getSum(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.CountData arg) { + return Preconditions.checkNotNull(arg).getCount(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.DistributionData arg) { + return (long) Preconditions.checkNotNull(arg).getMean(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.LastValueDataDouble arg) { + return (long) Preconditions.checkNotNull(arg).getLastValue(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData.LastValueDataLong arg) { + return Preconditions.checkNotNull(arg).getLastValue(); + } + }, + new io.opencensus.common.Function() { + @Override + public Long apply(AggregationData arg) { + throw new UnsupportedOperationException(); + } + }), + -1L); } - private long getMetric( - View view, - String projectId, - String instanceId, - String databaseId, - String method, - boolean withOverride) - throws InterruptedException { + private long getMetric(View view, String method, boolean withOverride) { List tagValues = new java.util.ArrayList<>(); for (TagKey column : view.getColumns()) { if (column == SpannerRpcViews.INSTANCE_ID) { - tagValues.add(TagValue.create(instanceId)); + tagValues.add(TagValue.create(INSTANCE_ID)); } else if (column == SpannerRpcViews.DATABASE_ID) { - tagValues.add(TagValue.create(databaseId)); + tagValues.add(TagValue.create(DATABASE_ID)); } else if (column == SpannerRpcViews.METHOD) { tagValues.add(TagValue.create(method)); } else if (column == SpannerRpcViews.PROJECT_ID) { - tagValues.add(TagValue.create(projectId)); + tagValues.add(TagValue.create(PROJECT_ID)); } } for (int i = 0; i < MAXIMUM_RETRIES; i++) { - Thread.sleep(WAIT_FOR_METRICS_TIME_MS); + Thread.yield(); ViewData viewData = SpannerRpcViews.viewManager.getView(view.getName()); + assertNotNull(viewData); if (viewData.getAggregationMap() != null) { Map, AggregationData> aggregationMap = viewData.getAggregationMap(); AggregationData aggregationData = aggregationMap.get(tagValues); - if (withOverride && getAggregationValueAsLong(aggregationData) == 0) { + if (aggregationData == null + || withOverride && getAggregationValueAsLong(aggregationData) == 0) { continue; } return getAggregationValueAsLong(aggregationData); From 3bcd67cec984530b9173abbe6c24028a84d0c242 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Wed, 7 Aug 2024 13:01:19 +0200 Subject: [PATCH 2/2] fix: remove print statement --- .../java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java index 7b027a27551..908a4ad5573 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GfeLatencyTest.java @@ -30,7 +30,6 @@ import com.google.cloud.spanner.Statement; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; import com.google.protobuf.ListValue; import com.google.spanner.v1.ResultSetMetadata; import com.google.spanner.v1.StructType; @@ -244,14 +243,12 @@ public void testGfeMissingHeaderCountExecuteStreamingSql() throws InterruptedExc try (ResultSet rs = databaseClient.singleUse().executeQuery(SELECT1AND2)) { rs.next(); } - Stopwatch watch = Stopwatch.createStarted(); long count = getMetric( SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW, "google.spanner.v1.Spanner/ExecuteStreamingSql", false); assertEquals(0, count); - System.out.println("Duration: " + watch.elapsed()); try (ResultSet rs = databaseClientNoHeader.singleUse().executeQuery(SELECT1AND2)) { rs.next();