diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ProactiveConnectionManagementTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ProactiveConnectionManagementTest.java index ac3f7cf83575..595262830aec 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ProactiveConnectionManagementTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ProactiveConnectionManagementTest.java @@ -133,9 +133,12 @@ public void openConnectionsAndInitCachesWithInvalidCosmosClientConfig(List preferredRegions, int numProactiveConnectionRegions, int ignoredNoOfContainers, int ignoredMinConnectionPoolSize, Duration ignoredAggressiveConnectionEstablishmentDuration) { + public void openConnectionsAndInitCachesWithContainer(ProactiveConnectionManagementTestConfig proactiveConnectionManagementTestConfig) { CosmosAsyncClient asyncClient = null; + List preferredRegions = proactiveConnectionManagementTestConfig.getPreferredRegions(); + int proactiveConnectionRegionCount = proactiveConnectionManagementTestConfig.getProactiveConnectionRegionsCount(); + try { asyncClient = new CosmosClientBuilder() @@ -158,8 +161,8 @@ public void openConnectionsAndInitCachesWithContainer(List preferredRegi cosmosContainerIdentities.add(new CosmosContainerIdentity(cosmosAsyncDatabase.getId(), containerId)); CosmosContainerProactiveInitConfig proactiveContainerInitConfig = new CosmosContainerProactiveInitConfigBuilder(cosmosContainerIdentities) - .setProactiveConnectionRegionsCount(numProactiveConnectionRegions) - .build(); + .setProactiveConnectionRegionsCount(proactiveConnectionRegionCount) + .build(); RntbdTransportClient rntbdTransportClient = (RntbdTransportClient) ReflectionUtils.getTransportClient(asyncClient); AsyncDocumentClient asyncDocumentClient = ReflectionUtils.getAsyncDocumentClient(asyncClient); @@ -172,7 +175,7 @@ public void openConnectionsAndInitCachesWithContainer(List preferredRegi ConcurrentHashMap collectionInfoByNameMap = getCollectionInfoByNameMap(rxDocumentClient); Set endpoints = ConcurrentHashMap.newKeySet(); - cosmosAsyncContainer.openConnectionsAndInitCaches(numProactiveConnectionRegions).block(); + cosmosAsyncContainer.openConnectionsAndInitCaches(proactiveConnectionRegionCount).block(); UnmodifiableList readEndpoints = globalEndpointManager.getReadEndpoints(); @@ -232,15 +235,24 @@ public void openConnectionsAndInitCachesWithContainer(List preferredRegi @Test(groups = {"multi-master"}, dataProvider = "proactiveContainerInitConfigs") public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnectionPoolSize_ThroughSystemConfig( - List preferredRegions, int numProactiveConnectionRegions, int numContainers, int minConnectionPoolSizePerEndpoint, Duration ignoredAggressiveConnectionEstablishmentDuration) { + ProactiveConnectionManagementTestConfig proactiveConnectionManagementTestConfig) { CosmosAsyncClient clientWithOpenConnections = null; List asyncContainers = new ArrayList<>(); + // test config parameters + List preferredRegions = proactiveConnectionManagementTestConfig.getPreferredRegions(); + + int containerCount = proactiveConnectionManagementTestConfig.getContainerCount(); + int minConnectionPoolSizePerEndpoint = proactiveConnectionManagementTestConfig.getMinConnectionPoolSizePerEndpoint(); + int proactiveConnectionRegionsCount = proactiveConnectionManagementTestConfig.getProactiveConnectionRegionsCount(); + + boolean isSystemPropertySetBeforeDirectConnectionConfig = proactiveConnectionManagementTestConfig.isSystemPropertySetBeforeDirectConnectionConfig(); + try { List cosmosContainerIdentities = new ArrayList<>(); - for (int i = 1; i <= numContainers; i++) { + for (int i = 1; i <= containerCount; i++) { String containerId = String.format("id%d", i); cosmosAsyncDatabase.createContainerIfNotExists(containerId, "/mypk").block(); asyncContainers.add(cosmosAsyncDatabase.getContainer(containerId)); @@ -249,19 +261,34 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect CosmosContainerProactiveInitConfig proactiveContainerInitConfig = new CosmosContainerProactiveInitConfigBuilder(cosmosContainerIdentities) - .setProactiveConnectionRegionsCount(numProactiveConnectionRegions) + .setProactiveConnectionRegionsCount(proactiveConnectionRegionsCount) .build(); - System.setProperty("COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT", String.valueOf(minConnectionPoolSizePerEndpoint)); + if (isSystemPropertySetBeforeDirectConnectionConfig) { + System.setProperty("COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT", String.valueOf(minConnectionPoolSizePerEndpoint)); - clientWithOpenConnections = new CosmosClientBuilder() + clientWithOpenConnections = new CosmosClientBuilder() .endpoint(TestConfigurations.HOST) .key(TestConfigurations.MASTER_KEY) .endpointDiscoveryEnabled(true) .preferredRegions(preferredRegions) .openConnectionsAndInitCaches(proactiveContainerInitConfig) - .directMode() + .directMode(DirectConnectionConfig.getDefaultConfig()) + .buildAsyncClient(); + } else { + DirectConnectionConfig directConnectionConfig = DirectConnectionConfig.getDefaultConfig(); + + System.setProperty("COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT", String.valueOf(minConnectionPoolSizePerEndpoint)); + + clientWithOpenConnections = new CosmosClientBuilder() + .endpoint(TestConfigurations.HOST) + .key(TestConfigurations.MASTER_KEY) + .endpointDiscoveryEnabled(true) + .preferredRegions(preferredRegions) + .openConnectionsAndInitCaches(proactiveContainerInitConfig) + .directMode(directConnectionConfig) .buildAsyncClient(); + } RntbdTransportClient rntbdTransportClient = (RntbdTransportClient) ReflectionUtils.getTransportClient(clientWithOpenConnections); AsyncDocumentClient asyncDocumentClient = ReflectionUtils.getAsyncDocumentClient(clientWithOpenConnections); @@ -332,23 +359,30 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect for (CosmosAsyncContainer asyncContainer : asyncContainers) { asyncContainer.delete().block(); } - + System.clearProperty("COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT"); safeClose(clientWithOpenConnections); } } @Test(groups = {"multi-master"}, dataProvider = "proactiveContainerInitConfigs") public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnectionPoolSize_ThroughProactiveContainerInitConfig( - List preferredRegions, int numProactiveConnectionRegions, int numContainers, int minConnectionPoolSizePerEndpoint, Duration ignoredAggressiveConnectionEstablishmentDuration) { + ProactiveConnectionManagementTestConfig proactiveConnectionManagementTestConfig) { CosmosAsyncClient clientWithOpenConnections = null; List asyncContainers = new ArrayList<>(); + // test config parameters + List preferredRegions = proactiveConnectionManagementTestConfig.getPreferredRegions(); + + int containerCount = proactiveConnectionManagementTestConfig.getContainerCount(); + int minConnectionPoolSizePerEndpoint = proactiveConnectionManagementTestConfig.getMinConnectionPoolSizePerEndpoint(); + int proactiveConnectionRegionsCount = proactiveConnectionManagementTestConfig.getProactiveConnectionRegionsCount(); + try { List cosmosContainerIdentities = new ArrayList<>(); - for (int i = 1; i <= numContainers; i++) { + for (int i = 1; i <= containerCount; i++) { String containerId = String.format("id%d", i); cosmosAsyncDatabase.createContainerIfNotExists(containerId, "/mypk").block(); asyncContainers.add(cosmosAsyncDatabase.getContainer(containerId)); @@ -357,7 +391,7 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect CosmosContainerProactiveInitConfigBuilder proactiveContainerInitConfigBuilder = new CosmosContainerProactiveInitConfigBuilder(cosmosContainerIdentities) - .setProactiveConnectionRegionsCount(numProactiveConnectionRegions); + .setProactiveConnectionRegionsCount(proactiveConnectionRegionsCount); for (int i = 0; i < cosmosContainerIdentities.size(); i++) { proactiveContainerInitConfigBuilder = proactiveContainerInitConfigBuilder @@ -388,8 +422,8 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect Set endpoints = ConcurrentHashMap.newKeySet(); UnmodifiableList readEndpoints = globalEndpointManager.getReadEndpoints(); List proactiveConnectionEndpoints = readEndpoints.subList( - 0, - Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount())); + 0, + Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount())); Flux asyncContainerFlux = Flux.fromIterable(asyncContainers); Flux>> partitionKeyRangeFlux = @@ -454,16 +488,26 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect @Test(groups = {"multi-master"}, dataProvider = "proactiveContainerInitConfigs") public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnectionPoolSize_ThroughProactiveContainerInitConfig_WithTimeout( - List preferredRegions, int numProactiveConnectionRegions, int numContainers, int minConnectionPoolSizePerEndpoint, Duration aggressiveWarmupDuration) { + ProactiveConnectionManagementTestConfig proactiveConnectionManagementTestConfig) { CosmosAsyncClient clientWithOpenConnections = null; List asyncContainers = new ArrayList<>(); + + // test config parameters + List preferredRegions = proactiveConnectionManagementTestConfig.getPreferredRegions(); + + int containerCount = proactiveConnectionManagementTestConfig.getContainerCount(); + int minConnectionPoolSizePerEndpoint = proactiveConnectionManagementTestConfig.getMinConnectionPoolSizePerEndpoint(); + int proactiveConnectionRegionsCount = proactiveConnectionManagementTestConfig.getProactiveConnectionRegionsCount(); + + Duration aggressiveWarmupDuration = proactiveConnectionManagementTestConfig.getAggressiveWarmupDuration(); + try { List cosmosContainerIdentities = new ArrayList<>(); - for (int i = 0; i < numContainers; i++) { + for (int i = 0; i < containerCount; i++) { String containerId = String.format("id%d", i); cosmosAsyncDatabase.createContainerIfNotExists(containerId, "/mypk").block(); asyncContainers.add(cosmosAsyncDatabase.getContainer(containerId)); @@ -471,10 +515,10 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect } CosmosContainerProactiveInitConfigBuilder proactiveContainerInitConfigBuilder = new - CosmosContainerProactiveInitConfigBuilder(cosmosContainerIdentities) - .setProactiveConnectionRegionsCount(numProactiveConnectionRegions); + CosmosContainerProactiveInitConfigBuilder(cosmosContainerIdentities) + .setProactiveConnectionRegionsCount(proactiveConnectionRegionsCount); - for (int i = 0; i < numContainers; i++) { + for (int i = 0; i < containerCount; i++) { proactiveContainerInitConfigBuilder = proactiveContainerInitConfigBuilder .setMinConnectionPoolSizePerEndpointForContainer(cosmosContainerIdentities.get(i), minConnectionPoolSizePerEndpoint); } @@ -571,24 +615,35 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect asyncContainer.delete().block(); } - safeClose(clientWithOpenConnections); + safeClose(clientWithOpenConnections); } } @DataProvider(name = "proactiveContainerInitConfigs") - private Object[][] proactiveContainerInitConfigs() { + private Object[] proactiveContainerInitConfigs() { Iterator locationIterator = this.databaseAccount.getReadableLocations().iterator(); - List preferredLocations = new ArrayList<>(); + List preferredRegions = new ArrayList<>(); while (locationIterator.hasNext()) { DatabaseAccountLocation accountLocation = locationIterator.next(); - preferredLocations.add(accountLocation.getName()); + preferredRegions.add(accountLocation.getName()); } - // configure list of preferredLocation, no of proactive connection regions, no of containers, min connection pool size per endpoint, connection warm up timeout - return new Object[][]{ - new Object[]{preferredLocations, 2, 3, 4, Duration.ofMillis(250)}, - new Object[]{preferredLocations, 2, 3, 5, Duration.ofMillis(1000)} + return new Object[] { + new ProactiveConnectionManagementTestConfig() + .withPreferredRegions(preferredRegions) + .withProactiveConnectionRegionsCount(2) + .withContainerCount(3) + .withMinConnectionPoolSizePerEndpoint(4) + .withAggressiveWarmupDuration(Duration.ofMillis(250)) + .withIsSystemPropertySetBeforeDirectConnectionConfig(true), + new ProactiveConnectionManagementTestConfig() + .withPreferredRegions(preferredRegions) + .withProactiveConnectionRegionsCount(2) + .withContainerCount(3) + .withMinConnectionPoolSizePerEndpoint(5) + .withAggressiveWarmupDuration(Duration.ofMillis(1000)) + .withIsSystemPropertySetBeforeDirectConnectionConfig(false) }; } @@ -663,4 +718,77 @@ private Mono>> buildPartitionKeyRangeR false, null)); } + + private class ProactiveConnectionManagementTestConfig { + private List preferredRegions; + private int proactiveConnectionRegionsCount; + private int minConnectionPoolSizePerEndpoint; + private int containerCount; + private Duration aggressiveWarmupDuration; + private boolean isSystemPropertySetBeforeDirectConnectionConfig; + + public ProactiveConnectionManagementTestConfig() { + this.preferredRegions = new ArrayList<>(); + this.proactiveConnectionRegionsCount = 0; + this.minConnectionPoolSizePerEndpoint = 0; + this.containerCount = 0; + this.aggressiveWarmupDuration = Duration.ofHours(24); + this.isSystemPropertySetBeforeDirectConnectionConfig = false; + } + + public ProactiveConnectionManagementTestConfig withPreferredRegions(List preferredRegions) { + this.preferredRegions = preferredRegions; + return this; + } + + public ProactiveConnectionManagementTestConfig withProactiveConnectionRegionsCount(int proactiveConnectionRegionsCount) { + this.proactiveConnectionRegionsCount = proactiveConnectionRegionsCount; + return this; + } + + public ProactiveConnectionManagementTestConfig withMinConnectionPoolSizePerEndpoint(int minConnectionPoolSizePerEndpoint) { + this.minConnectionPoolSizePerEndpoint = minConnectionPoolSizePerEndpoint; + return this; + } + + public ProactiveConnectionManagementTestConfig withContainerCount(int containerCount) { + this.containerCount = containerCount; + return this; + } + + public ProactiveConnectionManagementTestConfig withAggressiveWarmupDuration(Duration aggressiveWarmupDuration) { + this.aggressiveWarmupDuration = aggressiveWarmupDuration; + return this; + } + + public ProactiveConnectionManagementTestConfig withIsSystemPropertySetBeforeDirectConnectionConfig( + boolean isSystemPropertySetBeforeDirectConnectionConfig) { + this.isSystemPropertySetBeforeDirectConnectionConfig = isSystemPropertySetBeforeDirectConnectionConfig; + return this; + } + + public List getPreferredRegions() { + return this.preferredRegions; + } + + public int getProactiveConnectionRegionsCount() { + return this.proactiveConnectionRegionsCount; + } + + public int getMinConnectionPoolSizePerEndpoint() { + return this.minConnectionPoolSizePerEndpoint; + } + + public int getContainerCount() { + return this.containerCount; + } + + public Duration getAggressiveWarmupDuration() { + return this.aggressiveWarmupDuration; + } + + public boolean isSystemPropertySetBeforeDirectConnectionConfig() { + return this.isSystemPropertySetBeforeDirectConnectionConfig; + } + } } diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index efd358519dd2..fee7555c2d09 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -8,6 +8,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixed staleness issue of `COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT` system property - See [PR 36599](https://github.com/Azure/azure-sdk-for-java/pull/36599). #### Other Changes * Handling negative end-to-end timeouts provided more gracefully by throwing a `CosmsoException` (`OperationCancelledException`) instead of `IllegalArgumentException`. - See [PR 36507](https://github.com/Azure/azure-sdk-for-java/pull/36507) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosClientBuilder.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosClientBuilder.java index a6c8fe70662c..d9d45240de25 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosClientBuilder.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosClientBuilder.java @@ -1051,7 +1051,6 @@ CosmosAsyncClient buildAsyncClient(boolean logStartupInfo) { buildConnectionPolicy(); CosmosAsyncClient cosmosAsyncClient = new CosmosAsyncClient(this); if (proactiveContainerInitConfig != null) { - cosmosAsyncClient.recordOpenConnectionsAndInitCachesStarted(proactiveContainerInitConfig.getCosmosContainerIdentities()); Duration aggressiveWarmupDuration = proactiveContainerInitConfig diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ConnectionPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ConnectionPolicy.java index f42ad853289d..7d81799968d0 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ConnectionPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ConnectionPolicy.java @@ -96,11 +96,14 @@ private ConnectionPolicy( .DirectConnectionConfigHelper .getDirectConnectionConfigAccessor() .isHealthCheckTimeoutDetectionEnabled(directConnectionConfig); + + // NOTE: should be compared with COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT + // read during client initialization before connections are created for the container this.minConnectionPoolSizePerEndpoint = - ImplementationBridgeHelpers - .DirectConnectionConfigHelper - .getDirectConnectionConfigAccessor() - .getMinConnectionPoolSizePerEndpoint(directConnectionConfig); + Math.max(ImplementationBridgeHelpers + .DirectConnectionConfigHelper + .getDirectConnectionConfigAccessor() + .getMinConnectionPoolSizePerEndpoint(directConnectionConfig), Configs.getMinConnectionPoolSizePerEndpoint()); } private ConnectionPolicy() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ContainerDirectConnectionMetadata.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ContainerDirectConnectionMetadata.java index 61e97beaa67f..cd569ebf76f3 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ContainerDirectConnectionMetadata.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ContainerDirectConnectionMetadata.java @@ -3,14 +3,18 @@ package com.azure.cosmos.implementation.directconnectivity; +import com.azure.cosmos.implementation.Configs; + import java.util.concurrent.atomic.AtomicInteger; public final class ContainerDirectConnectionMetadata { private final AtomicInteger minConnectionPoolSizePerEndpointForContainer; + // NOTE: should be compared with COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT + // read during client initialization before connections are created for the container public ContainerDirectConnectionMetadata() { - this.minConnectionPoolSizePerEndpointForContainer = new AtomicInteger(1); + this.minConnectionPoolSizePerEndpointForContainer = new AtomicInteger(Configs.getMinConnectionPoolSizePerEndpoint()); } public void setMinConnectionPoolSizePerEndpointForContainer(int minConnectionPoolSizePerEndpointForContainer) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java index 29ac9f9e9a9f..fed1cbde61a8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java @@ -175,8 +175,10 @@ public Flux submitOpenConnectionTasksAndInitCaches(CosmosContainerProactiv ContainerDirectConnectionMetadata containerDirectConnectionMetadata = containerPropertiesMap .get(cosmosContainerIdentity); - int connectionsPerEndpointCountForContainer = containerDirectConnectionMetadata - .getMinConnectionPoolSizePerEndpointForContainer(); + // check against the COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT system property + // during client initialization + int connectionsPerEndpointCountForContainer = Math.max(containerDirectConnectionMetadata + .getMinConnectionPoolSizePerEndpointForContainer(), Configs.getMinConnectionPoolSizePerEndpoint()); return this.submitOpenConnectionInternal( endpointCache,