Skip to content

Commit

Permalink
createMetadataContainerIfNotExistsForMasterKeyAuth (#39973)
Browse files Browse the repository at this point in the history
* create metadata container if not exists

---------

Co-authored-by: annie-mac <xinlian@microsoft.com>
  • Loading branch information
xinlian12 and annie-mac committed May 1, 2024
1 parent b36fe32 commit c0a12f6
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 1 deletion.
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#### Bugs Fixed

#### Other Changes
* Added support to create metadata container if not exists for MasterKey auth - See [PR 39973](https://github.com/Azure/azure-sdk-for-java/pull/39973)

### 1.0.0-beta.1 (2024-04-26)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.azure.cosmos.implementation.apachecommons.lang.RandomUtils;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
import com.azure.cosmos.kafka.connect.implementation.CosmosMasterKeyAuthConfig;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosExceptionsHelper;
import com.azure.cosmos.kafka.connect.implementation.source.CosmosMetadataStorageType;
Expand All @@ -25,12 +26,15 @@
import com.azure.cosmos.kafka.connect.implementation.source.MetadataMonitorThread;
import com.azure.cosmos.kafka.connect.implementation.source.MetadataTaskUnit;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.PartitionKeyDefinition;
import com.azure.cosmos.models.ThroughputProperties;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -167,13 +171,37 @@ private IMetadataReader getMetadataReader() {
throw new IllegalStateException("Cosmos Metadata container need to be partitioned by /id");
}
})
.onErrorResume(throwable -> {
if (KafkaCosmosExceptionsHelper.isNotFoundException(throwable)
&& shouldCreateMetadataContainerIfNotExists()) {
return createMetadataContainer();
}

return Mono.error(new ConnectException(throwable));
})
.block();
return new MetadataCosmosStorageManager(metadataContainer);
default:
throw new IllegalArgumentException("Metadata storage type " + this.config.getMetadataConfig().getStorageType() + " is not supported");
}
}

private boolean shouldCreateMetadataContainerIfNotExists() {
// If customer does not create the metadata container ahead of time,
// then SDK will create one with default autoScale config only if using masterKey auth.
return this.config.getMetadataConfig().getStorageType() == CosmosMetadataStorageType.COSMOS
&& (this.config.getAccountConfig().getCosmosAuthConfig() instanceof CosmosMasterKeyAuthConfig);
}

private Mono<CosmosContainerResponse> createMetadataContainer() {
return this.cosmosClient
.getDatabase(this.config.getContainersConfig().getDatabaseName())
.createContainer(
this.config.getMetadataConfig().getStorageName(),
"/id",
ThroughputProperties.createAutoscaledThroughput(4000));
}

private void updateMetadataRecordsInCosmos(MetadataTaskUnit metadataTaskUnit) {
if (metadataTaskUnit.getStorageType() != CosmosMetadataStorageType.COSMOS) {
throw new IllegalStateException("updateMetadataRecordsInCosmos should not be called when metadata storage type is not cosmos");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.rnorth.ducttape.unreliables.Unreliables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorStatus;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -71,7 +73,17 @@ public static Object[][] sourceAuthParameterProvider() {
};
}

// TODO[public preview]: add more integration tests
@DataProvider(name = "metadataCosmosStorageParameterProvider")
public static Object[][] metadataCosmosStorageParameterProvider() {
return new Object[][]{
// use masterKey auth, pre-create the metadata container, should connector start successfully
{ true, true, true },
{ true, false, true },
{ false, true, true },
{ false, false, false}
};
}

@Test(groups = { "kafka-integration" }, dataProvider = "sourceAuthParameterProvider", timeOut = 2 * TIMEOUT)
public void readFromSingleContainer(boolean useMasterKey, CosmosMetadataStorageType metadataStorageType) {
logger.info("read from single container " + useMasterKey);
Expand Down Expand Up @@ -211,4 +223,88 @@ public void readFromSingleContainer(boolean useMasterKey, CosmosMetadataStorageT
}
}
}

@Test(groups = { "kafka-integration" }, dataProvider = "metadataCosmosStorageParameterProvider", timeOut = 2 * TIMEOUT)
public void connectorStart_metadata_cosmosStorageType(
boolean useMasterKey,
boolean preCreateMetadataContainer,
boolean canConnectorStart) {

String topicName = singlePartitionContainerName + "-" + UUID.randomUUID();
String metadataStorageName = "Metadata-" + UUID.randomUUID();

Map<String, String> sourceConnectorConfig = new HashMap<>();
sourceConnectorConfig.put("connector.class", "com.azure.cosmos.kafka.connect.CosmosSourceConnector");
sourceConnectorConfig.put("azure.cosmos.account.endpoint", KafkaCosmosTestConfigurations.HOST);
sourceConnectorConfig.put("azure.cosmos.application.name", "Test");
sourceConnectorConfig.put("azure.cosmos.source.database.name", databaseName);
sourceConnectorConfig.put("azure.cosmos.source.containers.includeAll", "false");
sourceConnectorConfig.put("azure.cosmos.source.containers.includedList", singlePartitionContainerName);
sourceConnectorConfig.put("azure.cosmos.source.containers.topicMap", topicName + "#" + singlePartitionContainerName);
sourceConnectorConfig.put("azure.cosmos.source.metadata.storage.name", metadataStorageName);
sourceConnectorConfig.put("azure.cosmos.source.metadata.storage.type", CosmosMetadataStorageType.COSMOS.getName());

if (useMasterKey) {
sourceConnectorConfig.put("azure.cosmos.account.key", KafkaCosmosTestConfigurations.MASTER_KEY);
} else {
sourceConnectorConfig.put("azure.cosmos.auth.type", CosmosAuthType.SERVICE_PRINCIPAL.getName());
sourceConnectorConfig.put("azure.cosmos.account.tenantId", KafkaCosmosTestConfigurations.ACCOUNT_TENANT_ID);
sourceConnectorConfig.put("azure.cosmos.auth.aad.clientId", KafkaCosmosTestConfigurations.ACCOUNT_AAD_CLIENT_ID);
sourceConnectorConfig.put("azure.cosmos.auth.aad.clientSecret", KafkaCosmosTestConfigurations.ACCOUNT_AAD_CLIENT_SECRET);
}

// Create topic ahead of time
kafkaCosmosConnectContainer.createTopic(topicName, 1);

String connectorName = "simpleTest-" + UUID.randomUUID();

try {
// if using cosmos container to persiste the metadata, pre-create it
if (preCreateMetadataContainer) {
logger.info("Creating metadata container");
client.getDatabase(databaseName)
.createContainerIfNotExists(metadataStorageName, "/id")
.block();
} else {
logger.info("Skip creating metadata container");
}

kafkaCosmosConnectContainer.registerConnector(connectorName, sourceConnectorConfig);

Thread.sleep(10000); // give some time for the connector to start up
// verify connector tasks
ConnectorStatus connectorStatus = kafkaCosmosConnectContainer.getConnectorStatus(connectorName);
if (canConnectorStart) {
assertThat(connectorStatus.getConnector().get("state").equals("RUNNING")).isTrue();
} else {
assertThat(connectorStatus.getConnector().get("state").equals("FAILED")).isTrue();
}

} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (client != null) {
logger.info("cleaning container {}", singlePartitionContainerName);
cleanUpContainer(client, databaseName, singlePartitionContainerName);

// delete the metadata container if created
if (preCreateMetadataContainer || canConnectorStart) {
client
.getDatabase(databaseName)
.getContainer(metadataStorageName)
.delete()
.onErrorResume(throwable -> {
logger.error("Deleting metadata container failed ", throwable);
return Mono.empty();
})
.block();
}
}

// IMPORTANT: remove the connector after use
if (kafkaCosmosConnectContainer != null) {
kafkaCosmosConnectContainer.deleteConnector(connectorName);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.sourcelab.kafka.connect.apiclient.Configuration;
import org.sourcelab.kafka.connect.apiclient.KafkaConnectClient;
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorDefinition;
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorStatus;
import org.sourcelab.kafka.connect.apiclient.request.dto.NewConnectorDefinition;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
Expand Down Expand Up @@ -181,6 +182,11 @@ public void deleteConnector(String name) {
}
}

public ConnectorStatus getConnectorStatus(String name) {
KafkaConnectClient kafkaConnectClient = new KafkaConnectClient(new Configuration(getTarget()));
return kafkaConnectClient.getConnectorStatus(name);
}

public String getTarget() {
return "http://" + getContainerIpAddress() + ":" + getMappedPort(KAFKA_CONNECT_PORT);
}
Expand Down

0 comments on commit c0a12f6

Please sign in to comment.