Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-4783: Support for multiple databases and tasks in the SQL Server connector #3261

Merged
merged 5 commits into from Mar 1, 2022
Merged

Conversation

morozov
Copy link
Contributor

@morozov morozov commented Feb 23, 2022

Change summary:

  1. Manage change tables for each partition individually.
  2. Expose the actual task id to the metrics in order to be able to run multiple tasks.
  3. Run the connector with multiple databases and multiple tasks.

Note, running multiple tasks is not necessary for capturing multiple databases. Furthermore, running multiple tasks cannot be tested in the current test suite. The only reason why it's contributed here is that extracting the support for multiple tasks in a separate patch would require extra work (it was originally implemented like this). I can remove it from the patch, if necessary.

TODO:

  • Provide an integration test for the multi-partition scenario.

Manual testing of the multi-database and multi-task configuration

  1. Modify docker-compose-sqlserver.yaml from the tutorial to use the Docker image built from this PR.

  2. Start the services:

    export DEBEZIUM_VERSION=1.9
    docker-compose -f docker-compose-sqlserver.yaml up
  3. Initialize test databases:

    docker-compose -f docker-compose-sqlserver.yaml exec -T sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD' << 'EOF'
    CREATE DATABASE testDB1;
    GO
    USE testDB1;
    EXEC sys.sp_cdc_enable_db;
    
    CREATE TABLE products (
      id INTEGER IDENTITY(101,1) NOT NULL PRIMARY KEY,
      name VARCHAR(255) NOT NULL,
      description VARCHAR(512),
      weight FLOAT
    );
    INSERT INTO products(name,description,weight)
      VALUES ('scooter','Small 2-wheel scooter',3.14);
    
    EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'products', @role_name = NULL, @supports_net_changes = 0;
    GO
    
    CREATE DATABASE testDB2;
    GO
    USE testDB2;
    EXEC sys.sp_cdc_enable_db;
    
    CREATE TABLE customers (
      id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
      first_name VARCHAR(255) NOT NULL,
      last_name VARCHAR(255) NOT NULL,
      email VARCHAR(255) NOT NULL UNIQUE
    );
    INSERT INTO customers(first_name,last_name,email)
      VALUES ('Sally','Thomas','sally.thomas@acme.com');
    
    EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0;
    GO
    EOF
  4. Start the connector:

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d@- << 'EOF'
    {
        "name": "inventory-connector",
        "config": {
            "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
            "tasks.max" : "2",
            "database.server.name" : "server1",
            "database.hostname" : "sqlserver",
            "database.port" : "1433",
            "database.user" : "sa",
            "database.password" : "Password!",
            "database.names" : "testDB1,testDB2",
            "database.history.kafka.bootstrap.servers" : "kafka:9092",
            "database.history.kafka.topic": "schema-changes.inventory"
        }
    }
    EOF
  5. Check that the connector and both tasks are running:

    curl -s http://localhost:8083/connectors/inventory-connector/status
  6. Check that each task exposes metrics for its subset of databases:
    Multi-partition and multi-task metrics

  7. Run a consumer and confirm that both databases were snapshotted:

    docker-compose -f docker-compose-sqlserver.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
      --bootstrap-server kafka:9092 \
      --from-beginning \
      --property print.key=true \
      --whitelist 'server1\.(testDB\d+)\..*'
  8. Make changes in both databases and confirm that the changes from both databases are captured:

    docker-compose -f docker-compose-sqlserver.yaml exec -T sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD' << 'EOF'
    UPDATE testDB1.dbo.products SET weight = 3.15 WHERE id = 101;
    UPDATE testDB2.dbo.customers SET first_name = 'Molly' WHERE id = 1001;
    EOF

@github-actions
Copy link

Welcome as a new contributor to Debezium, @morozov. Reviewers, please add missing author name(s) and alias name(s) to the COPYRIGHT.txt and Aliases.txt respectively.

@morozov morozov marked this pull request as ready for review February 24, 2022 00:52
@gunnarmorling
Copy link
Member

Thanks a lot, @morozov! Will take a loop asap.

@jpechane, as per Sergei's mail, could you suggest a few key tests which should be adapted (well, copied, I suppose) to ensure the multi-partition mode works?

Copy link
Member

@gunnarmorling gunnarmorling left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot, @morozov! A few stylistic comments inline. Still needs to do some testing using Compose (thanks for sharing the instructions for that!).

@@ -90,8 +128,7 @@ protected void validateConnection(Map<String, ConfigValue> configValues, Configu
final SqlServerConnectorConfig sqlServerConfig = new SqlServerConnectorConfig(config);

if (Strings.isNullOrEmpty(sqlServerConfig.getDatabaseName())) {
throw new IllegalArgumentException("Either '" + SqlServerConnectorConfig.DATABASE_NAME
+ "' or '" + SqlServerConnectorConfig.DATABASE_NAMES
throw new IllegalArgumentException("Either '" + DATABASE_NAME + "' or '" + DATABASE_NAMES
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather a configuration error should be added (see connection validation failure in this method below).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is actually already validation of both parameters:

  1. For DATABASE_NAME:
    .withValidation(SqlServerConnectorConfig::validateDatabaseName);
  2. And for DATABASE_NAMES:
    .withValidation(SqlServerConnectorConfig::validateDatabaseNames)

So we just need to early return if any of these fields is invalid as it was done prior to #2604:

final ConfigValue databaseValue = configValues.get(RelationalDatabaseConnectorConfig.DATABASE_NAME.name());
if (!databaseValue.errorMessages().isEmpty()) {
return;
}


List<String> databaseNames;

if (multiPartitionMode) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above.

@morozov
Copy link
Contributor Author

morozov commented Feb 28, 2022

@gunnarmorling I addressed your review comments and added a basic snapshotting/streaming test. Please take a look.

Copy link
Member

@gunnarmorling gunnarmorling left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, merging.

@morozov, is there anything else missing before we can declare multi-partitioning mode as functional? I.e. could we announce it as a new feature in the Beta1 release with this PR in?

@gunnarmorling gunnarmorling merged commit 2952b7a into debezium:main Mar 1, 2022
@morozov
Copy link
Contributor Author

morozov commented Mar 1, 2022

There are a couple of minor things left:

  1. Add task id and partition to logging context. Originally implemented in DBZ-2975 - Add task id and partition to logging context sugarcrm/debezium#80, the current state is sugarcrm@e3774be. It may need some minor tweaking to make it work with DBZ-2224 Test logging based on logback #3103.

  2. The partition-scoped CapturedTables metric will likely expose all tables captured by the task, not only the ones that belong to the corresponding partition:

    @Override
    public String[] getCapturedTables() {
    return streamingMeter.getCapturedTables();
    }
    @Override
    public String[] getCapturedTables() {
    return taskContext.capturedDataCollections();
    }
    We haven't attempted to address or even reproduce this issue since we're not using this metric. Some of our MySQL connectors capture a couple of hundred thousand tables.

And this is it, there are no more changes related to the multi-partition mode in our fork. I will file Jira cases for each of them but I believe it shouldn't block the announcement.

UPD: DBZ-4808, DBZ-4809.

@morozov morozov deleted the DBZ-4783 branch March 1, 2022 17:38
@gunnarmorling
Copy link
Member

Thx a lot for logging these follow-up, issues, @morozov! Also a big thank you for that awesome PR description, it just came in super-handy for creating a screenshot of the metrics in JMC.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants