Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed stale minConnectionPoolSizePerEndpoint config. #36599

Merged
merged 4 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,12 @@ public void openConnectionsAndInitCachesWithInvalidCosmosClientConfig(List<Strin
}

@Test(groups = {"multi-master"}, dataProvider = "proactiveContainerInitConfigs")
public void openConnectionsAndInitCachesWithContainer(List<String> preferredRegions, int numProactiveConnectionRegions, int ignoredNoOfContainers, int ignoredMinConnectionPoolSize, Duration ignoredAggressiveConnectionEstablishmentDuration) {
public void openConnectionsAndInitCachesWithContainer(ProactiveConnectionManagementTestConfig proactiveConnectionManagementTestConfig) {
CosmosAsyncClient asyncClient = null;

List<String> preferredRegions = proactiveConnectionManagementTestConfig.getPreferredRegions();
int proactiveConnectionRegionCount = proactiveConnectionManagementTestConfig.getProactiveConnectionRegionsCount();

try {

asyncClient = new CosmosClientBuilder()
Expand All @@ -158,8 +161,8 @@ public void openConnectionsAndInitCachesWithContainer(List<String> preferredRegi
cosmosContainerIdentities.add(new CosmosContainerIdentity(cosmosAsyncDatabase.getId(), containerId));

CosmosContainerProactiveInitConfig proactiveContainerInitConfig = new CosmosContainerProactiveInitConfigBuilder(cosmosContainerIdentities)
.setProactiveConnectionRegionsCount(numProactiveConnectionRegions)
.build();
.setProactiveConnectionRegionsCount(proactiveConnectionRegionCount)
.build();

RntbdTransportClient rntbdTransportClient = (RntbdTransportClient) ReflectionUtils.getTransportClient(asyncClient);
AsyncDocumentClient asyncDocumentClient = ReflectionUtils.getAsyncDocumentClient(asyncClient);
Expand All @@ -172,7 +175,7 @@ public void openConnectionsAndInitCachesWithContainer(List<String> preferredRegi
ConcurrentHashMap<String, ?> collectionInfoByNameMap = getCollectionInfoByNameMap(rxDocumentClient);
Set<String> endpoints = ConcurrentHashMap.newKeySet();

cosmosAsyncContainer.openConnectionsAndInitCaches(numProactiveConnectionRegions).block();
cosmosAsyncContainer.openConnectionsAndInitCaches(proactiveConnectionRegionCount).block();

UnmodifiableList<URI> readEndpoints =
globalEndpointManager.getReadEndpoints();
Expand Down Expand Up @@ -229,15 +232,24 @@ public void openConnectionsAndInitCachesWithContainer(List<String> preferredRegi

@Test(groups = {"multi-master"}, dataProvider = "proactiveContainerInitConfigs")
public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnectionPoolSize_ThroughSystemConfig(
List<String> preferredRegions, int numProactiveConnectionRegions, int numContainers, int minConnectionPoolSizePerEndpoint, Duration ignoredAggressiveConnectionEstablishmentDuration) {
ProactiveConnectionManagementTestConfig proactiveConnectionManagementTestConfig) {

CosmosAsyncClient clientWithOpenConnections = null;
List<CosmosAsyncContainer> asyncContainers = new ArrayList<>();

// test config parameters
List<String> preferredRegions = proactiveConnectionManagementTestConfig.getPreferredRegions();

int containerCount = proactiveConnectionManagementTestConfig.getContainerCount();
int minConnectionPoolSizePerEndpoint = proactiveConnectionManagementTestConfig.getMinConnectionPoolSizePerEndpoint();
int proactiveConnectionRegionsCount = proactiveConnectionManagementTestConfig.getProactiveConnectionRegionsCount();

boolean isSystemPropertySetBeforeDirectConnectionConfig = proactiveConnectionManagementTestConfig.isSystemPropertySetBeforeDirectConnectionConfig();

try {
List<CosmosContainerIdentity> cosmosContainerIdentities = new ArrayList<>();

for (int i = 1; i <= numContainers; i++) {
for (int i = 1; i <= containerCount; i++) {
String containerId = String.format("id%d", i);
cosmosAsyncDatabase.createContainerIfNotExists(containerId, "/mypk").block();
asyncContainers.add(cosmosAsyncDatabase.getContainer(containerId));
Expand All @@ -246,19 +258,34 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect

CosmosContainerProactiveInitConfig proactiveContainerInitConfig = new
CosmosContainerProactiveInitConfigBuilder(cosmosContainerIdentities)
.setProactiveConnectionRegionsCount(numProactiveConnectionRegions)
.setProactiveConnectionRegionsCount(proactiveConnectionRegionsCount)
.build();

System.setProperty("COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT", String.valueOf(minConnectionPoolSizePerEndpoint));
if (isSystemPropertySetBeforeDirectConnectionConfig) {
System.setProperty("COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT", String.valueOf(minConnectionPoolSizePerEndpoint));

clientWithOpenConnections = new CosmosClientBuilder()
clientWithOpenConnections = new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
.key(TestConfigurations.MASTER_KEY)
.endpointDiscoveryEnabled(true)
.preferredRegions(preferredRegions)
.openConnectionsAndInitCaches(proactiveContainerInitConfig)
.directMode()
.directMode(DirectConnectionConfig.getDefaultConfig())
.buildAsyncClient();
} else {
DirectConnectionConfig directConnectionConfig = DirectConnectionConfig.getDefaultConfig();

System.setProperty("COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT", String.valueOf(minConnectionPoolSizePerEndpoint));

clientWithOpenConnections = new CosmosClientBuilder()
.endpoint(TestConfigurations.HOST)
.key(TestConfigurations.MASTER_KEY)
.endpointDiscoveryEnabled(true)
.preferredRegions(preferredRegions)
.openConnectionsAndInitCaches(proactiveContainerInitConfig)
.directMode(directConnectionConfig)
.buildAsyncClient();
}

RntbdTransportClient rntbdTransportClient = (RntbdTransportClient) ReflectionUtils.getTransportClient(clientWithOpenConnections);
AsyncDocumentClient asyncDocumentClient = ReflectionUtils.getAsyncDocumentClient(clientWithOpenConnections);
Expand Down Expand Up @@ -329,23 +356,30 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect
for (CosmosAsyncContainer asyncContainer : asyncContainers) {
asyncContainer.delete().block();
}

System.clearProperty("COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT");
safeClose(clientWithOpenConnections);
}
}

@Test(groups = {"multi-master"}, dataProvider = "proactiveContainerInitConfigs")
public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnectionPoolSize_ThroughProactiveContainerInitConfig(
List<String> preferredRegions, int numProactiveConnectionRegions, int numContainers, int minConnectionPoolSizePerEndpoint, Duration ignoredAggressiveConnectionEstablishmentDuration) {
ProactiveConnectionManagementTestConfig proactiveConnectionManagementTestConfig) {

CosmosAsyncClient clientWithOpenConnections = null;
List<CosmosAsyncContainer> asyncContainers = new ArrayList<>();

// test config parameters
List<String> preferredRegions = proactiveConnectionManagementTestConfig.getPreferredRegions();

int containerCount = proactiveConnectionManagementTestConfig.getContainerCount();
int minConnectionPoolSizePerEndpoint = proactiveConnectionManagementTestConfig.getMinConnectionPoolSizePerEndpoint();
int proactiveConnectionRegionsCount = proactiveConnectionManagementTestConfig.getProactiveConnectionRegionsCount();

try {

List<CosmosContainerIdentity> cosmosContainerIdentities = new ArrayList<>();

for (int i = 1; i <= numContainers; i++) {
for (int i = 1; i <= containerCount; i++) {
String containerId = String.format("id%d", i);
cosmosAsyncDatabase.createContainerIfNotExists(containerId, "/mypk").block();
asyncContainers.add(cosmosAsyncDatabase.getContainer(containerId));
Expand All @@ -354,7 +388,7 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect

CosmosContainerProactiveInitConfigBuilder proactiveContainerInitConfigBuilder = new
CosmosContainerProactiveInitConfigBuilder(cosmosContainerIdentities)
.setProactiveConnectionRegionsCount(numProactiveConnectionRegions);
.setProactiveConnectionRegionsCount(proactiveConnectionRegionsCount);

for (int i = 0; i < cosmosContainerIdentities.size(); i++) {
proactiveContainerInitConfigBuilder = proactiveContainerInitConfigBuilder
Expand Down Expand Up @@ -385,8 +419,8 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect
Set<String> endpoints = ConcurrentHashMap.newKeySet();
UnmodifiableList<URI> readEndpoints = globalEndpointManager.getReadEndpoints();
List<URI> proactiveConnectionEndpoints = readEndpoints.subList(
0,
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()));
0,
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()));

Flux<CosmosAsyncContainer> asyncContainerFlux = Flux.fromIterable(asyncContainers);
Flux<Utils.ValueHolder<List<PartitionKeyRange>>> partitionKeyRangeFlux =
Expand Down Expand Up @@ -451,27 +485,37 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect

@Test(groups = {"multi-master"}, dataProvider = "proactiveContainerInitConfigs")
public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnectionPoolSize_ThroughProactiveContainerInitConfig_WithTimeout(
List<String> preferredRegions, int numProactiveConnectionRegions, int numContainers, int minConnectionPoolSizePerEndpoint, Duration aggressiveWarmupDuration) {
ProactiveConnectionManagementTestConfig proactiveConnectionManagementTestConfig) {

CosmosAsyncClient clientWithOpenConnections = null;
List<CosmosAsyncContainer> asyncContainers = new ArrayList<>();


// test config parameters
List<String> preferredRegions = proactiveConnectionManagementTestConfig.getPreferredRegions();

int containerCount = proactiveConnectionManagementTestConfig.getContainerCount();
int minConnectionPoolSizePerEndpoint = proactiveConnectionManagementTestConfig.getMinConnectionPoolSizePerEndpoint();
int proactiveConnectionRegionsCount = proactiveConnectionManagementTestConfig.getProactiveConnectionRegionsCount();

Duration aggressiveWarmupDuration = proactiveConnectionManagementTestConfig.getAggressiveWarmupDuration();

try {

List<CosmosContainerIdentity> cosmosContainerIdentities = new ArrayList<>();

for (int i = 0; i < numContainers; i++) {
for (int i = 0; i < containerCount; i++) {
String containerId = String.format("id%d", i);
cosmosAsyncDatabase.createContainerIfNotExists(containerId, "/mypk").block();
asyncContainers.add(cosmosAsyncDatabase.getContainer(containerId));
cosmosContainerIdentities.add(new CosmosContainerIdentity(cosmosAsyncDatabase.getId(), containerId));
}

CosmosContainerProactiveInitConfigBuilder proactiveContainerInitConfigBuilder = new
CosmosContainerProactiveInitConfigBuilder(cosmosContainerIdentities)
.setProactiveConnectionRegionsCount(numProactiveConnectionRegions);
CosmosContainerProactiveInitConfigBuilder(cosmosContainerIdentities)
.setProactiveConnectionRegionsCount(proactiveConnectionRegionsCount);

for (int i = 0; i < numContainers; i++) {
for (int i = 0; i < containerCount; i++) {
proactiveContainerInitConfigBuilder = proactiveContainerInitConfigBuilder
.setMinConnectionPoolSizePerEndpointForContainer(cosmosContainerIdentities.get(i), minConnectionPoolSizePerEndpoint);
}
Expand Down Expand Up @@ -568,24 +612,35 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect
asyncContainer.delete().block();
}

safeClose(clientWithOpenConnections);
safeClose(clientWithOpenConnections);
}
}

@DataProvider(name = "proactiveContainerInitConfigs")
private Object[][] proactiveContainerInitConfigs() {
private Object[] proactiveContainerInitConfigs() {
Iterator<DatabaseAccountLocation> locationIterator = this.databaseAccount.getReadableLocations().iterator();
List<String> preferredLocations = new ArrayList<>();
List<String> preferredRegions = new ArrayList<>();

while (locationIterator.hasNext()) {
DatabaseAccountLocation accountLocation = locationIterator.next();
preferredLocations.add(accountLocation.getName());
preferredRegions.add(accountLocation.getName());
}

// configure list of preferredLocation, no of proactive connection regions, no of containers, min connection pool size per endpoint, connection warm up timeout
return new Object[][]{
new Object[]{preferredLocations, 2, 3, 4, Duration.ofMillis(250)},
new Object[]{preferredLocations, 2, 3, 5, Duration.ofMillis(1000)}
return new Object[] {
new ProactiveConnectionManagementTestConfig()
.withPreferredRegions(preferredRegions)
.withProactiveConnectionRegionsCount(2)
.withContainerCount(3)
.withMinConnectionPoolSizePerEndpoint(4)
.withAggressiveWarmupDuration(Duration.ofMillis(250))
.withIsSystemPropertySetBeforeDirectConnectionConfig(true),
new ProactiveConnectionManagementTestConfig()
.withPreferredRegions(preferredRegions)
.withProactiveConnectionRegionsCount(2)
.withContainerCount(3)
.withMinConnectionPoolSizePerEndpoint(5)
.withAggressiveWarmupDuration(Duration.ofMillis(1000))
.withIsSystemPropertySetBeforeDirectConnectionConfig(false)
};
}

Expand Down Expand Up @@ -660,4 +715,77 @@ private Mono<Utils.ValueHolder<List<PartitionKeyRange>>> buildPartitionKeyRangeR
false,
null));
}

private class ProactiveConnectionManagementTestConfig {
private List<String> preferredRegions;
private int proactiveConnectionRegionsCount;
private int minConnectionPoolSizePerEndpoint;
private int containerCount;
private Duration aggressiveWarmupDuration;
private boolean isSystemPropertySetBeforeDirectConnectionConfig;

public ProactiveConnectionManagementTestConfig() {
this.preferredRegions = new ArrayList<>();
this.proactiveConnectionRegionsCount = 0;
this.minConnectionPoolSizePerEndpoint = 0;
this.containerCount = 0;
this.aggressiveWarmupDuration = Duration.ofHours(24);
this.isSystemPropertySetBeforeDirectConnectionConfig = false;
}

public ProactiveConnectionManagementTestConfig withPreferredRegions(List<String> preferredRegions) {
this.preferredRegions = preferredRegions;
return this;
}

public ProactiveConnectionManagementTestConfig withProactiveConnectionRegionsCount(int proactiveConnectionRegionsCount) {
this.proactiveConnectionRegionsCount = proactiveConnectionRegionsCount;
return this;
}

public ProactiveConnectionManagementTestConfig withMinConnectionPoolSizePerEndpoint(int minConnectionPoolSizePerEndpoint) {
this.minConnectionPoolSizePerEndpoint = minConnectionPoolSizePerEndpoint;
return this;
}

public ProactiveConnectionManagementTestConfig withContainerCount(int containerCount) {
this.containerCount = containerCount;
return this;
}

public ProactiveConnectionManagementTestConfig withAggressiveWarmupDuration(Duration aggressiveWarmupDuration) {
this.aggressiveWarmupDuration = aggressiveWarmupDuration;
return this;
}

public ProactiveConnectionManagementTestConfig withIsSystemPropertySetBeforeDirectConnectionConfig(
boolean isSystemPropertySetBeforeDirectConnectionConfig) {
this.isSystemPropertySetBeforeDirectConnectionConfig = isSystemPropertySetBeforeDirectConnectionConfig;
return this;
}

public List<String> getPreferredRegions() {
return this.preferredRegions;
}

public int getProactiveConnectionRegionsCount() {
return this.proactiveConnectionRegionsCount;
}

public int getMinConnectionPoolSizePerEndpoint() {
return this.minConnectionPoolSizePerEndpoint;
}

public int getContainerCount() {
return this.containerCount;
}

public Duration getAggressiveWarmupDuration() {
return this.aggressiveWarmupDuration;
}

public boolean isSystemPropertySetBeforeDirectConnectionConfig() {
return this.isSystemPropertySetBeforeDirectConnectionConfig;
}
}
}
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed staleness issue of `COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT` system property - See [PR 36599](https://github.com/Azure/azure-sdk-for-java/pull/36599).

#### Other Changes
* Handling negative end-to-end timeouts provided more gracefully by throwing a `CosmsoException` (`OperationCancelledException`) instead of `IllegalArgumentException`. - See [PR 36507](https://github.com/Azure/azure-sdk-for-java/pull/36507)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,6 @@ CosmosAsyncClient buildAsyncClient(boolean logStartupInfo) {
buildConnectionPolicy();
CosmosAsyncClient cosmosAsyncClient = new CosmosAsyncClient(this);
if (proactiveContainerInitConfig != null) {

cosmosAsyncClient.recordOpenConnectionsAndInitCachesStarted(proactiveContainerInitConfig.getCosmosContainerIdentities());

Duration aggressiveWarmupDuration = proactiveContainerInitConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,14 @@ private ConnectionPolicy(
.DirectConnectionConfigHelper
.getDirectConnectionConfigAccessor()
.isHealthCheckTimeoutDetectionEnabled(directConnectionConfig);

// NOTE: should be compared with COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT
// read during client initialization before connections are created for the container
this.minConnectionPoolSizePerEndpoint =
ImplementationBridgeHelpers
.DirectConnectionConfigHelper
.getDirectConnectionConfigAccessor()
.getMinConnectionPoolSizePerEndpoint(directConnectionConfig);
Math.max(ImplementationBridgeHelpers
jeet1995 marked this conversation as resolved.
Show resolved Hide resolved
.DirectConnectionConfigHelper
.getDirectConnectionConfigAccessor()
.getMinConnectionPoolSizePerEndpoint(directConnectionConfig), Configs.getMinConnectionPoolSizePerEndpoint());
}

private ConnectionPolicy() {
Expand Down
Loading
Loading