diff --git a/LNX-docker-compose.yml b/LNX-docker-compose.yml index eaf3a48cd..82e301ecd 100644 --- a/LNX-docker-compose.yml +++ b/LNX-docker-compose.yml @@ -53,10 +53,10 @@ services: - ADD_minio_ENDPOINT=minio:9000 - ADD_minio_PORT=80 # allow unencrypted connections - ADD_minio_PREFIX=/datajoint - # ports: - # - "80:80" - # - "443:443" - # - "3306:3306" + ports: + - "3306:3306" + # - "80:80" + # - "443:443" app: <<: *net image: datajoint/djtest:py${PY_VER:-3.8}-${DISTRO:-alpine} @@ -87,11 +87,9 @@ services: - -c - | set -e - pip install --user nose nose-cov - pip install -e . + pip install -q -e . pip list --format=freeze | grep datajoint - pytest -sv --cov-report term-missing --cov=datajoint tests - nosetests -vsw tests_old --with-coverage --cover-package=datajoint + pytest -sv tests/test_university.py::test_indefinite # ports: # - "8888:8888" user: ${HOST_UID:-1000}:anaconda @@ -99,5 +97,44 @@ services: - .:/src - /tmp/.X11-unix:/tmp/.X11-unix:rw # - ./notebooks:/home/dja/notebooks + rabbitmq: + <<: *net + image: rabbitmq:3-management + hostname: rabbitmq-host + ports: + - 15672:15672 # prometheus metrics + healthcheck: + test: rabbitmq-diagnostics -q ping + interval: 30s + timeout: 30s + retries: 3 + environment: + RABBITMQ_DEFAULT_USER: guest + RABBITMQ_DEFAULT_PASS: guest + pre-deb-setup: + <<: *net + build: + context: ./debezium-demo/pre-deb-setup/ + dockerfile: Dockerfile + depends_on: + fakeservices.datajoint.io: + condition: service_started + app: + condition: service_started + rabbitmq: + condition: service_healthy + command: bash -c /tmp/init.sh + debezium: + <<: *net + image: quay.io/debezium/server:${DEBEZIUM_VERSION:-2.3} + ports: + - "8080:8080" + volumes: + - ./debezium-demo/debezium:/debezium/conf + depends_on: + pre-deb-setup: + condition: service_completed_successfully + + networks: main: diff --git a/debezium-demo/debezium/application.properties b/debezium-demo/debezium/application.properties new file mode 100644 index 000000000..9a6c00e2a --- /dev/null +++ b/debezium-demo/debezium/application.properties @@ -0,0 +1,48 @@ +debezium.sink.type=rabbitmq +debezium.sink.rabbitmq.connection.host=rabbitmq-host +debezium.sink.rabbitmq.connection.port=5672 +debezium.sink.rabbitmq.connection.username=guest +debezium.sink.rabbitmq.connection.password=guest +debezium.sink.rabbitmq.exchange=my_stream +debezium.sink.rabbitmq.routingKey=my_key +debezium.sink.rabbitmq.autoCreateRoutingKey=true +debezium.sink.rabbitmq.routingKeyDurable=true +# debezium.sink.rabbitmq.routingKeyFromTopicName=false + +# Source connector +debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector +debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory +debezium.source.schema.history.internal.file.filename=schema_history.dat +debezium.source.database.history=io.debezium.storage.file.history.FileSchemaHistory +database.history.file.filename=history.dat +debezium.source.offset.storage.file.filename=offsets.dat +debezium.source.offset.flush.interval.ms=0 + +# Stream properties +debezium.source.schema.include.list=djtest_university +debezium.source.topic.prefix=my_stream +# debezium.source.schema.include.list=djtest_university +# debezium.source.table.include.list="inventory.customers,inventory.addresses" +debezium.source.table.include.list=djtest_university.department +# debezium.source.table.exclude.list="djtest_university.#letter_grade" +# debezium.source.plugin.path= + +# MySQL properties +debezium.source.database.hostname=fakeservices.datajoint.io +debezium.source.database.port=3306 +debezium.source.database.user=root +debezium.source.database.password=password +debezium.source.database.dbname=djtest_university +debezium.source.database.server.name=fakeservices.datajoint.io +# Check uniqueness of slave ID: mysql -hfakeservices.datajoint.io -uroot -ppassword -e 'SHOW SLAVE HOSTS;' +debezium.source.database.server.id=184054 +# debezium.source.database.encrypt=false +debezium.source.database.allowPublicKeyRetrieval=true + +# Snapshot properties: use schema_only_recovery when indicated in ERROR logs +debezium.source.snapshot.mode=when_needed +# debezium.source.snapshot.mode=schema_only_recovery + +# Use minimal_percona for Percona cluster +debezium.source.snapshot.locking.mode=minimal +# debezium.source.snapshot.locking.mode=minimal_percona \ No newline at end of file diff --git a/debezium-demo/pre-deb-setup/Dockerfile b/debezium-demo/pre-deb-setup/Dockerfile new file mode 100644 index 000000000..9effc7aa3 --- /dev/null +++ b/debezium-demo/pre-deb-setup/Dockerfile @@ -0,0 +1,7 @@ +FROM mambaorg/micromamba:1.5.6 +COPY --chown=$MAMBA_USER:$MAMBA_USER env.yaml /tmp/env.yaml +RUN micromamba install --yes --file /tmp/env.yaml && \ + micromamba clean --all --yes +ARG MAMBA_DOCKERFILE_ACTIVATE=1 +COPY --chown=$MAMBA_USER:$MAMBA_USER init.sh /tmp/init.sh +RUN chmod +x /tmp/init.sh \ No newline at end of file diff --git a/debezium-demo/pre-deb-setup/env.yaml b/debezium-demo/pre-deb-setup/env.yaml new file mode 100644 index 000000000..f2cd19a74 --- /dev/null +++ b/debezium-demo/pre-deb-setup/env.yaml @@ -0,0 +1,6 @@ +name: base +channels: + - conda-forge +dependencies: + - mysql-client + - curl diff --git a/debezium-demo/pre-deb-setup/init.sh b/debezium-demo/pre-deb-setup/init.sh new file mode 100644 index 000000000..0a77ddd08 --- /dev/null +++ b/debezium-demo/pre-deb-setup/init.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +set -e +sleep 20 +mysql -hfakeservices.datajoint.io -uroot -ppassword -e 'SHOW DATABASES;' | grep 'djtest_university' +declare -a endpoints=("my_stream" "schema_changes" "schema_changes.djtest_university.department") +for endpoint in "${endpoints[@]}"; do + curl -sf -u 'guest:guest' \ + -H 'content-type:application/json' \ + -XPUT -d'{"type":"topic","durable":true}' \ + "http://rabbitmq:15672/api/exchanges/%2f/${endpoint}" +done +echo 'Finished pre Debezium setup' +exit 0 diff --git a/tests/test_university.py b/tests/test_university.py index 800ee7cdf..eaadd64d7 100644 --- a/tests/test_university.py +++ b/tests/test_university.py @@ -53,7 +53,7 @@ def schema_uni(db_creds_test, schema_uni_inactive, connection_test, prefix): path = test_data_dir / Path(table.__name__ + ".csv") assert path.is_file(), f"File {path} is not a file" assert path.exists(), f"File {path} does not exist" - table().insert(path) + table().insert(path, skip_duplicates=True) return schema_uni_inactive @@ -167,3 +167,25 @@ def test_aggr(schema_uni): assert len(set(section.fetch("dept"))) == 1 assert len(section) == 168 assert bool(section) + + +def test_indefinite(schema_uni): + """ + Continually adds rows to the Department table. + """ + from time import sleep + + i = 0 + while True: + Department().insert1( + dict( + dept=str(i), + dept_name="foobar", + dept_address="my address", + dept_phone=str(i), + ), + skip_duplicates=True, + ) + i += 1 + print(f"Inserting row with ID {i}") + sleep(1)