Skip to content

Commit

Permalink
Fixed stale minConnectionPoolSizePerEndpoint config. (#36599)
Browse files Browse the repository at this point in the history
* Fixed stale minConnectionPoolSizePerEndpoint config.

* Updated CHANGELOG.md.

* Updated CHANGELOG.md.
  • Loading branch information
jeet1995 committed Sep 21, 2023
1 parent 520505b commit 0fbfd34
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,12 @@ public void openConnectionsAndInitCachesWithInvalidCosmosClientConfig(List<Strin
}

@Test(groups = {"multi-master"}, dataProvider = "proactiveContainerInitConfigs")
public void openConnectionsAndInitCachesWithContainer(List<String> preferredRegions, int numProactiveConnectionRegions, int ignoredNoOfContainers, int ignoredMinConnectionPoolSize, Duration ignoredAggressiveConnectionEstablishmentDuration) {
public void openConnectionsAndInitCachesWithContainer(ProactiveConnectionManagementTestConfig proactiveConnectionManagementTestConfig) {
CosmosAsyncClient asyncClient = null;

List<String> preferredRegions = proactiveConnectionManagementTestConfig.getPreferredRegions();
int proactiveConnectionRegionCount = proactiveConnectionManagementTestConfig.getProactiveConnectionRegionsCount();

try {

asyncClient = new CosmosClientBuilder()
Expand All @@ -158,8 +161,8 @@ public void openConnectionsAndInitCachesWithContainer(List<String> preferredRegi
cosmosContainerIdentities.add(new CosmosContainerIdentity(cosmosAsyncDatabase.getId(), containerId));

CosmosContainerProactiveInitConfig proactiveContainerInitConfig = new CosmosContainerProactiveInitConfigBuilder(cosmosContainerIdentities)
.setProactiveConnectionRegionsCount(numProactiveConnectionRegions)
.build();
.setProactiveConnectionRegionsCount(proactiveConnectionRegionCount)
.build();

RntbdTransportClient rntbdTransportClient = (RntbdTransportClient) ReflectionUtils.getTransportClient(asyncClient);
AsyncDocumentClient asyncDocumentClient = ReflectionUtils.getAsyncDocumentClient(asyncClient);
Expand All @@ -172,7 +175,7 @@ public void openConnectionsAndInitCachesWithContainer(List<String> preferredRegi
ConcurrentHashMap<String, ?> collectionInfoByNameMap = getCollectionInfoByNameMap(rxDocumentClient);
Set<String> endpoints = ConcurrentHashMap.newKeySet();

cosmosAsyncContainer.openConnectionsAndInitCaches(numProactiveConnectionRegions).block();
cosmosAsyncContainer.openConnectionsAndInitCaches(proactiveConnectionRegionCount).block();

UnmodifiableList<URI> readEndpoints =
globalEndpointManager.getReadEndpoints();
Expand Down Expand Up @@ -232,15 +235,24 @@ public void openConnectionsAndInitCachesWithContainer(List<String> preferredRegi

@Test(groups = {"multi-master"}, dataProvider = "proactiveContainerInitConfigs")
public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnectionPoolSize_ThroughSystemConfig(
List<String> preferredRegions, int numProactiveConnectionRegions, int numContainers, int minConnectionPoolSizePerEndpoint, Duration ignoredAggressiveConnectionEstablishmentDuration) {
ProactiveConnectionManagementTestConfig proactiveConnectionManagementTestConfig) {

CosmosAsyncClient clientWithOpenConnections = null;
List<CosmosAsyncContainer> asyncContainers = new ArrayList<>();

// test config parameters
List<String> preferredRegions = proactiveConnectionManagementTestConfig.getPreferredRegions();

int containerCount = proactiveConnectionManagementTestConfig.getContainerCount();
int minConnectionPoolSizePerEndpoint = proactiveConnectionManagementTestConfig.getMinConnectionPoolSizePerEndpoint();
int proactiveConnectionRegionsCount = proactiveConnectionManagementTestConfig.getProactiveConnectionRegionsCount();

boolean isSystemPropertySetBeforeDirectConnectionConfig = proactiveConnectionManagementTestConfig.isSystemPropertySetBeforeDirectConnectionConfig();

try {
List<CosmosContainerIdentity> cosmosContainerIdentities = new ArrayList<>();

for (int i = 1; i <= numContainers; i++) {
for (int i = 1; i <= containerCount; i++) {
String containerId = String.format("id%d", i);
cosmosAsyncDatabase.createContainerIfNotExists(containerId, "/mypk").block();
asyncContainers.add(cosmosAsyncDatabase.getContainer(containerId));
Expand All @@ -249,19 +261,34 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect

CosmosContainerProactiveInitConfig proactiveContainerInitConfig = new
CosmosContainerProactiveInitConfigBuilder(cosmosContainerIdentities)
.setProactiveConnectionRegionsCount(numProactiveConnectionRegions)
.setProactiveConnectionRegionsCount(proactiveConnectionRegionsCount)
.build();

System.setProperty("COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT", String.valueOf(minConnectionPoolSizePerEndpoint));
if (isSystemPropertySetBeforeDirectConnectionConfig) {
System.setProperty("COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT", String.valueOf(minConnectionPoolSizePerEndpoint));

clientWithOpenConnections = new CosmosClientBuilder()
clientWithOpenConnections = new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
.key(TestConfigurations.MASTER_KEY)
.endpointDiscoveryEnabled(true)
.preferredRegions(preferredRegions)
.openConnectionsAndInitCaches(proactiveContainerInitConfig)
.directMode()
.directMode(DirectConnectionConfig.getDefaultConfig())
.buildAsyncClient();
} else {
DirectConnectionConfig directConnectionConfig = DirectConnectionConfig.getDefaultConfig();

System.setProperty("COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT", String.valueOf(minConnectionPoolSizePerEndpoint));

clientWithOpenConnections = new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
.key(TestConfigurations.MASTER_KEY)
.endpointDiscoveryEnabled(true)
.preferredRegions(preferredRegions)
.openConnectionsAndInitCaches(proactiveContainerInitConfig)
.directMode(directConnectionConfig)
.buildAsyncClient();
}

RntbdTransportClient rntbdTransportClient = (RntbdTransportClient) ReflectionUtils.getTransportClient(clientWithOpenConnections);
AsyncDocumentClient asyncDocumentClient = ReflectionUtils.getAsyncDocumentClient(clientWithOpenConnections);
Expand Down Expand Up @@ -332,23 +359,30 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect
for (CosmosAsyncContainer asyncContainer : asyncContainers) {
asyncContainer.delete().block();
}

System.clearProperty("COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT");
safeClose(clientWithOpenConnections);
}
}

@Test(groups = {"multi-master"}, dataProvider = "proactiveContainerInitConfigs")
public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnectionPoolSize_ThroughProactiveContainerInitConfig(
List<String> preferredRegions, int numProactiveConnectionRegions, int numContainers, int minConnectionPoolSizePerEndpoint, Duration ignoredAggressiveConnectionEstablishmentDuration) {
ProactiveConnectionManagementTestConfig proactiveConnectionManagementTestConfig) {

CosmosAsyncClient clientWithOpenConnections = null;
List<CosmosAsyncContainer> asyncContainers = new ArrayList<>();

// test config parameters
List<String> preferredRegions = proactiveConnectionManagementTestConfig.getPreferredRegions();

int containerCount = proactiveConnectionManagementTestConfig.getContainerCount();
int minConnectionPoolSizePerEndpoint = proactiveConnectionManagementTestConfig.getMinConnectionPoolSizePerEndpoint();
int proactiveConnectionRegionsCount = proactiveConnectionManagementTestConfig.getProactiveConnectionRegionsCount();

try {

List<CosmosContainerIdentity> cosmosContainerIdentities = new ArrayList<>();

for (int i = 1; i <= numContainers; i++) {
for (int i = 1; i <= containerCount; i++) {
String containerId = String.format("id%d", i);
cosmosAsyncDatabase.createContainerIfNotExists(containerId, "/mypk").block();
asyncContainers.add(cosmosAsyncDatabase.getContainer(containerId));
Expand All @@ -357,7 +391,7 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect

CosmosContainerProactiveInitConfigBuilder proactiveContainerInitConfigBuilder = new
CosmosContainerProactiveInitConfigBuilder(cosmosContainerIdentities)
.setProactiveConnectionRegionsCount(numProactiveConnectionRegions);
.setProactiveConnectionRegionsCount(proactiveConnectionRegionsCount);

for (int i = 0; i < cosmosContainerIdentities.size(); i++) {
proactiveContainerInitConfigBuilder = proactiveContainerInitConfigBuilder
Expand Down Expand Up @@ -388,8 +422,8 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect
Set<String> endpoints = ConcurrentHashMap.newKeySet();
UnmodifiableList<URI> readEndpoints = globalEndpointManager.getReadEndpoints();
List<URI> proactiveConnectionEndpoints = readEndpoints.subList(
0,
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()));
0,
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()));

Flux<CosmosAsyncContainer> asyncContainerFlux = Flux.fromIterable(asyncContainers);
Flux<Utils.ValueHolder<List<PartitionKeyRange>>> partitionKeyRangeFlux =
Expand Down Expand Up @@ -454,27 +488,37 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect

@Test(groups = {"multi-master"}, dataProvider = "proactiveContainerInitConfigs")
public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnectionPoolSize_ThroughProactiveContainerInitConfig_WithTimeout(
List<String> preferredRegions, int numProactiveConnectionRegions, int numContainers, int minConnectionPoolSizePerEndpoint, Duration aggressiveWarmupDuration) {
ProactiveConnectionManagementTestConfig proactiveConnectionManagementTestConfig) {

CosmosAsyncClient clientWithOpenConnections = null;
List<CosmosAsyncContainer> asyncContainers = new ArrayList<>();


// test config parameters
List<String> preferredRegions = proactiveConnectionManagementTestConfig.getPreferredRegions();

int containerCount = proactiveConnectionManagementTestConfig.getContainerCount();
int minConnectionPoolSizePerEndpoint = proactiveConnectionManagementTestConfig.getMinConnectionPoolSizePerEndpoint();
int proactiveConnectionRegionsCount = proactiveConnectionManagementTestConfig.getProactiveConnectionRegionsCount();

Duration aggressiveWarmupDuration = proactiveConnectionManagementTestConfig.getAggressiveWarmupDuration();

try {

List<CosmosContainerIdentity> cosmosContainerIdentities = new ArrayList<>();

for (int i = 0; i < numContainers; i++) {
for (int i = 0; i < containerCount; i++) {
String containerId = String.format("id%d", i);
cosmosAsyncDatabase.createContainerIfNotExists(containerId, "/mypk").block();
asyncContainers.add(cosmosAsyncDatabase.getContainer(containerId));
cosmosContainerIdentities.add(new CosmosContainerIdentity(cosmosAsyncDatabase.getId(), containerId));
}

CosmosContainerProactiveInitConfigBuilder proactiveContainerInitConfigBuilder = new
CosmosContainerProactiveInitConfigBuilder(cosmosContainerIdentities)
.setProactiveConnectionRegionsCount(numProactiveConnectionRegions);
CosmosContainerProactiveInitConfigBuilder(cosmosContainerIdentities)
.setProactiveConnectionRegionsCount(proactiveConnectionRegionsCount);

for (int i = 0; i < numContainers; i++) {
for (int i = 0; i < containerCount; i++) {
proactiveContainerInitConfigBuilder = proactiveContainerInitConfigBuilder
.setMinConnectionPoolSizePerEndpointForContainer(cosmosContainerIdentities.get(i), minConnectionPoolSizePerEndpoint);
}
Expand Down Expand Up @@ -571,24 +615,35 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect
asyncContainer.delete().block();
}

safeClose(clientWithOpenConnections);
safeClose(clientWithOpenConnections);
}
}

@DataProvider(name = "proactiveContainerInitConfigs")
private Object[][] proactiveContainerInitConfigs() {
private Object[] proactiveContainerInitConfigs() {
Iterator<DatabaseAccountLocation> locationIterator = this.databaseAccount.getReadableLocations().iterator();
List<String> preferredLocations = new ArrayList<>();
List<String> preferredRegions = new ArrayList<>();

while (locationIterator.hasNext()) {
DatabaseAccountLocation accountLocation = locationIterator.next();
preferredLocations.add(accountLocation.getName());
preferredRegions.add(accountLocation.getName());
}

// configure list of preferredLocation, no of proactive connection regions, no of containers, min connection pool size per endpoint, connection warm up timeout
return new Object[][]{
new Object[]{preferredLocations, 2, 3, 4, Duration.ofMillis(250)},
new Object[]{preferredLocations, 2, 3, 5, Duration.ofMillis(1000)}
return new Object[] {
new ProactiveConnectionManagementTestConfig()
.withPreferredRegions(preferredRegions)
.withProactiveConnectionRegionsCount(2)
.withContainerCount(3)
.withMinConnectionPoolSizePerEndpoint(4)
.withAggressiveWarmupDuration(Duration.ofMillis(250))
.withIsSystemPropertySetBeforeDirectConnectionConfig(true),
new ProactiveConnectionManagementTestConfig()
.withPreferredRegions(preferredRegions)
.withProactiveConnectionRegionsCount(2)
.withContainerCount(3)
.withMinConnectionPoolSizePerEndpoint(5)
.withAggressiveWarmupDuration(Duration.ofMillis(1000))
.withIsSystemPropertySetBeforeDirectConnectionConfig(false)
};
}

Expand Down Expand Up @@ -663,4 +718,77 @@ private Mono<Utils.ValueHolder<List<PartitionKeyRange>>> buildPartitionKeyRangeR
false,
null));
}

private class ProactiveConnectionManagementTestConfig {
private List<String> preferredRegions;
private int proactiveConnectionRegionsCount;
private int minConnectionPoolSizePerEndpoint;
private int containerCount;
private Duration aggressiveWarmupDuration;
private boolean isSystemPropertySetBeforeDirectConnectionConfig;

public ProactiveConnectionManagementTestConfig() {
this.preferredRegions = new ArrayList<>();
this.proactiveConnectionRegionsCount = 0;
this.minConnectionPoolSizePerEndpoint = 0;
this.containerCount = 0;
this.aggressiveWarmupDuration = Duration.ofHours(24);
this.isSystemPropertySetBeforeDirectConnectionConfig = false;
}

public ProactiveConnectionManagementTestConfig withPreferredRegions(List<String> preferredRegions) {
this.preferredRegions = preferredRegions;
return this;
}

public ProactiveConnectionManagementTestConfig withProactiveConnectionRegionsCount(int proactiveConnectionRegionsCount) {
this.proactiveConnectionRegionsCount = proactiveConnectionRegionsCount;
return this;
}

public ProactiveConnectionManagementTestConfig withMinConnectionPoolSizePerEndpoint(int minConnectionPoolSizePerEndpoint) {
this.minConnectionPoolSizePerEndpoint = minConnectionPoolSizePerEndpoint;
return this;
}

public ProactiveConnectionManagementTestConfig withContainerCount(int containerCount) {
this.containerCount = containerCount;
return this;
}

public ProactiveConnectionManagementTestConfig withAggressiveWarmupDuration(Duration aggressiveWarmupDuration) {
this.aggressiveWarmupDuration = aggressiveWarmupDuration;
return this;
}

public ProactiveConnectionManagementTestConfig withIsSystemPropertySetBeforeDirectConnectionConfig(
boolean isSystemPropertySetBeforeDirectConnectionConfig) {
this.isSystemPropertySetBeforeDirectConnectionConfig = isSystemPropertySetBeforeDirectConnectionConfig;
return this;
}

public List<String> getPreferredRegions() {
return this.preferredRegions;
}

public int getProactiveConnectionRegionsCount() {
return this.proactiveConnectionRegionsCount;
}

public int getMinConnectionPoolSizePerEndpoint() {
return this.minConnectionPoolSizePerEndpoint;
}

public int getContainerCount() {
return this.containerCount;
}

public Duration getAggressiveWarmupDuration() {
return this.aggressiveWarmupDuration;
}

public boolean isSystemPropertySetBeforeDirectConnectionConfig() {
return this.isSystemPropertySetBeforeDirectConnectionConfig;
}
}
}
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed staleness issue of `COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT` system property - See [PR 36599](https://github.com/Azure/azure-sdk-for-java/pull/36599).

#### Other Changes
* Handling negative end-to-end timeouts provided more gracefully by throwing a `CosmsoException` (`OperationCancelledException`) instead of `IllegalArgumentException`. - See [PR 36507](https://github.com/Azure/azure-sdk-for-java/pull/36507)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,6 @@ CosmosAsyncClient buildAsyncClient(boolean logStartupInfo) {
buildConnectionPolicy();
CosmosAsyncClient cosmosAsyncClient = new CosmosAsyncClient(this);
if (proactiveContainerInitConfig != null) {

cosmosAsyncClient.recordOpenConnectionsAndInitCachesStarted(proactiveContainerInitConfig.getCosmosContainerIdentities());

Duration aggressiveWarmupDuration = proactiveContainerInitConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,14 @@ private ConnectionPolicy(
.DirectConnectionConfigHelper
.getDirectConnectionConfigAccessor()
.isHealthCheckTimeoutDetectionEnabled(directConnectionConfig);

// NOTE: should be compared with COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT
// read during client initialization before connections are created for the container
this.minConnectionPoolSizePerEndpoint =
ImplementationBridgeHelpers
.DirectConnectionConfigHelper
.getDirectConnectionConfigAccessor()
.getMinConnectionPoolSizePerEndpoint(directConnectionConfig);
Math.max(ImplementationBridgeHelpers
.DirectConnectionConfigHelper
.getDirectConnectionConfigAccessor()
.getMinConnectionPoolSizePerEndpoint(directConnectionConfig), Configs.getMinConnectionPoolSizePerEndpoint());
}

private ConnectionPolicy() {
Expand Down
Loading

0 comments on commit 0fbfd34

Please sign in to comment.