Skip to content

Conversation

@ethho
Copy link
Owner

@ethho ethho commented Dec 27, 2023

This is a working example of Debezium, MySQL, and RabbitMQ in a Docker Compose environment.

References

Steps to Reproduce

This example was tested on commit 6454ca3.

  1. Create the shell alias for dkc: alias dkc='docker compose'

  2. Start the Debezium stack

    dkc up -d app pre-deb-setup debezium && dkc logs -f app pre-deb-setup debezium
    # ...
    ethho-datajoint-python-debezium-1       | {"timestamp":"2023-12-28T14:27:32.316Z","sequence":202,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.common.BaseSourceTask","level":"INFO","message":"39 records sent during previous 00:00:40.118, last recorded offset of {server=schema_changes} partition is {transaction_id=null, ts_sec=1703773652, file=binlog.000002, pos=209581, row=1, server_id=1, event=2}","threadName":"pool-7-thread-1","threadId":25,"mdc":{},"ndc":"","hostName":"66a5b998674a","processName":"io.debezium.server.Main","processId":1}
  3. Ctrl-C to stop tailing logs.

  4. In RabbitMQ admin console, check the message rates. Should see about 1 message per second. In addition, Debezium logs should contain INFO level messages about messages posting to the queue. Alternatively, check the rate field using curl:

    curl -u guest:guest http://localhost:15672/api/exchanges | jq '.[] | {name: .name, message_stats: .message_stats}'
    % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                    Dload  Upload   Total   Spent    Left  Speed
    100  1747  100  1747    0     0   831k      0 --:--:-- --:--:-- --:--:--  853k
    {
    "name": "",
    "message_stats": null
    }
    {
    "name": "amq.direct",
    "message_stats": null
    }
    {
    "name": "amq.fanout",
    "message_stats": null
    }
    {
    "name": "amq.headers",
    "message_stats": null
    }
    {
    "name": "amq.match",
    "message_stats": null
    }
    {
    "name": "amq.rabbitmq.trace",
    "message_stats": null
    }
    {
    "name": "amq.topic",
    "message_stats": null
    }
    {
    "name": "my_stream",
    "message_stats": null
    }
    {
    "name": "schema_changes",
    "message_stats": {
        "publish_in": 52,
        "publish_in_details": {
        "rate": 0
        }
    }
    }
    {
    "name": "schema_changes.djtest_university.department",
    "message_stats": {
        "publish_in": 372,
        "publish_in_details": {
        "rate": 1
        }
    }
    }
  5. Clean up: dkc down to stop the stack.

Steps to Reproduce (DEPRECATED, SEE UPDATED INSTRUCTIONS ABOVE)

This example was tested on commit 092c294.

  1. Create the shell alias for dkc: alias dkc='docker compose'

  2. Start the dependent services: dkc run -it app python3 -m pytest -sv tests/test_university.py::test_indefinite

    • Should see rows populating every second in the department table
  3. Check that table djtest_university.department exists

    mysql -hfakeservices.datajoint.io -uroot -ppassword -e 'SHOW TABLES IN djtest_university;'
    mysql: [Warning] Using a password on the command line interface can be insecure.
    +-----------------------------+
    | Tables_in_djtest_university |
    +-----------------------------+
    | #letter_grade               |
    | course                      |
    | current_term                |
    | department                  |
    | enroll                      |
    | grade                       |
    | section                     |
    | student                     |
    | student_major               |
    | term                        |
    | ~log                        |
    +-----------------------------+
  4. Start rabbitmq service

    dkc up --build -d rabbitmq && dkc logs -f rabbitmq
    # ...
    # ethho-datajoint-python-rabbitmq-1  | 2023-12-27 23:29:57.600808+00:00 [info] <0.9.0> Time to start RabbitMQ: 4524317 us
  5. Create exchanges schema_changes, my_stream, and schema_changes.djtest_university.department of type topic in the RabbitMQ console or using the following CLI commands:

    dkc exec rabbitmq rabbitmqadmin declare exchange name=schema_changes type=topic
    dkc exec rabbitmq rabbitmqadmin declare exchange name=my_stream type=topic
    dkc exec rabbitmq rabbitmqadmin declare exchange name=schema_changes.djtest_university.department type=topic
  6. Start debezium service: dkc up --build debezium

    • Might get the following error: io.debezium.DebeziumException: Could not find existing binlog information while attempting schema only recovery snapshot
      • If so, comment the last line of debezium-config/application.properties and restart the debezium service.
      • Last line is the setting debezium.source.snapshot.mode=schema_only_recovery
    • If it is commented and you get the following error: io.debezium.DebeziumException: The db history topic is missing. You may attempt to recover it by reconfiguring the connector to SCHEMA_ONLY_RECOVERY
      • Uncomment this line to fix, setting debezium.source.snapshot.mode=schema_only_recovery
    • On success, you will see the message: Keepalive thread is running
  7. In RabbitMQ admin console, check the message rates. Should see about 1 message per second. In addition, Debezium logs should contain INFO level messages about messages posting to the queue. Alternatively, check the rate field using curl:

    curl -u guest:guest http://localhost:15672/api/exchanges | jq '.[] | {name: .name, message_stats: .message_stats}'
    % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                    Dload  Upload   Total   Spent    Left  Speed
    100  1747  100  1747    0     0   831k      0 --:--:-- --:--:-- --:--:--  853k
    {
    "name": "",
    "message_stats": null
    }
    {
    "name": "amq.direct",
    "message_stats": null
    }
    {
    "name": "amq.fanout",
    "message_stats": null
    }
    {
    "name": "amq.headers",
    "message_stats": null
    }
    {
    "name": "amq.match",
    "message_stats": null
    }
    {
    "name": "amq.rabbitmq.trace",
    "message_stats": null
    }
    {
    "name": "amq.topic",
    "message_stats": null
    }
    {
    "name": "my_stream",
    "message_stats": null
    }
    {
    "name": "schema_changes",
    "message_stats": {
        "publish_in": 52,
        "publish_in_details": {
        "rate": 0
        }
    }
    }
    {
    "name": "schema_changes.djtest_university.department",
    "message_stats": {
        "publish_in": 372,
        "publish_in_details": {
        "rate": 1
        }
    }
    }

@ethho ethho changed the base branch from master to dev-tests December 27, 2023 23:55
@ethho ethho changed the base branch from dev-tests to dev-tests-cleanup December 27, 2023 23:56
@ethho ethho self-assigned this Feb 29, 2024
@ethho ethho added the enhancement New feature or request label Feb 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants