Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 45 additions & 8 deletions LNX-docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ services:
- ADD_minio_ENDPOINT=minio:9000
- ADD_minio_PORT=80 # allow unencrypted connections
- ADD_minio_PREFIX=/datajoint
# ports:
# - "80:80"
# - "443:443"
# - "3306:3306"
ports:
- "3306:3306"
# - "80:80"
# - "443:443"
app:
<<: *net
image: datajoint/djtest:py${PY_VER:-3.8}-${DISTRO:-alpine}
Expand Down Expand Up @@ -87,17 +87,54 @@ services:
- -c
- |
set -e
pip install --user nose nose-cov
pip install -e .
pip install -q -e .
pip list --format=freeze | grep datajoint
pytest -sv --cov-report term-missing --cov=datajoint tests
nosetests -vsw tests_old --with-coverage --cover-package=datajoint
pytest -sv tests/test_university.py::test_indefinite
# ports:
# - "8888:8888"
user: ${HOST_UID:-1000}:anaconda
volumes:
- .:/src
- /tmp/.X11-unix:/tmp/.X11-unix:rw
# - ./notebooks:/home/dja/notebooks
rabbitmq:
<<: *net
image: rabbitmq:3-management
hostname: rabbitmq-host
ports:
- 15672:15672 # prometheus metrics
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 30s
timeout: 30s
retries: 3
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
pre-deb-setup:
<<: *net
build:
context: ./debezium-demo/pre-deb-setup/
dockerfile: Dockerfile
depends_on:
fakeservices.datajoint.io:
condition: service_started
app:
condition: service_started
rabbitmq:
condition: service_healthy
command: bash -c /tmp/init.sh
debezium:
<<: *net
image: quay.io/debezium/server:${DEBEZIUM_VERSION:-2.3}
ports:
- "8080:8080"
volumes:
- ./debezium-demo/debezium:/debezium/conf
depends_on:
pre-deb-setup:
condition: service_completed_successfully


networks:
main:
48 changes: 48 additions & 0 deletions debezium-demo/debezium/application.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
debezium.sink.type=rabbitmq
debezium.sink.rabbitmq.connection.host=rabbitmq-host
debezium.sink.rabbitmq.connection.port=5672
debezium.sink.rabbitmq.connection.username=guest
debezium.sink.rabbitmq.connection.password=guest
debezium.sink.rabbitmq.exchange=my_stream
debezium.sink.rabbitmq.routingKey=my_key
debezium.sink.rabbitmq.autoCreateRoutingKey=true
debezium.sink.rabbitmq.routingKeyDurable=true
# debezium.sink.rabbitmq.routingKeyFromTopicName=false

# Source connector
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
debezium.source.schema.history.internal.file.filename=schema_history.dat
debezium.source.database.history=io.debezium.storage.file.history.FileSchemaHistory
database.history.file.filename=history.dat
debezium.source.offset.storage.file.filename=offsets.dat
debezium.source.offset.flush.interval.ms=0

# Stream properties
debezium.source.schema.include.list=djtest_university
debezium.source.topic.prefix=my_stream
# debezium.source.schema.include.list=djtest_university
# debezium.source.table.include.list="inventory.customers,inventory.addresses"
debezium.source.table.include.list=djtest_university.department
# debezium.source.table.exclude.list="djtest_university.#letter_grade"
# debezium.source.plugin.path=

# MySQL properties
debezium.source.database.hostname=fakeservices.datajoint.io
debezium.source.database.port=3306
debezium.source.database.user=root
debezium.source.database.password=password
debezium.source.database.dbname=djtest_university
debezium.source.database.server.name=fakeservices.datajoint.io
# Check uniqueness of slave ID: mysql -hfakeservices.datajoint.io -uroot -ppassword -e 'SHOW SLAVE HOSTS;'
debezium.source.database.server.id=184054
# debezium.source.database.encrypt=false
debezium.source.database.allowPublicKeyRetrieval=true

# Snapshot properties: use schema_only_recovery when indicated in ERROR logs
debezium.source.snapshot.mode=when_needed
# debezium.source.snapshot.mode=schema_only_recovery

# Use minimal_percona for Percona cluster
debezium.source.snapshot.locking.mode=minimal
# debezium.source.snapshot.locking.mode=minimal_percona
7 changes: 7 additions & 0 deletions debezium-demo/pre-deb-setup/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM mambaorg/micromamba:1.5.6
COPY --chown=$MAMBA_USER:$MAMBA_USER env.yaml /tmp/env.yaml
RUN micromamba install --yes --file /tmp/env.yaml && \
micromamba clean --all --yes
ARG MAMBA_DOCKERFILE_ACTIVATE=1
COPY --chown=$MAMBA_USER:$MAMBA_USER init.sh /tmp/init.sh
RUN chmod +x /tmp/init.sh
6 changes: 6 additions & 0 deletions debezium-demo/pre-deb-setup/env.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
name: base
channels:
- conda-forge
dependencies:
- mysql-client
- curl
14 changes: 14 additions & 0 deletions debezium-demo/pre-deb-setup/init.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/bash

set -e
sleep 20
mysql -hfakeservices.datajoint.io -uroot -ppassword -e 'SHOW DATABASES;' | grep 'djtest_university'
declare -a endpoints=("my_stream" "schema_changes" "schema_changes.djtest_university.department")
for endpoint in "${endpoints[@]}"; do
curl -sf -u 'guest:guest' \
-H 'content-type:application/json' \
-XPUT -d'{"type":"topic","durable":true}' \
"http://rabbitmq:15672/api/exchanges/%2f/${endpoint}"
done
echo 'Finished pre Debezium setup'
exit 0
24 changes: 23 additions & 1 deletion tests/test_university.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def schema_uni(db_creds_test, schema_uni_inactive, connection_test, prefix):
path = test_data_dir / Path(table.__name__ + ".csv")
assert path.is_file(), f"File {path} is not a file"
assert path.exists(), f"File {path} does not exist"
table().insert(path)
table().insert(path, skip_duplicates=True)
return schema_uni_inactive


Expand Down Expand Up @@ -167,3 +167,25 @@ def test_aggr(schema_uni):
assert len(set(section.fetch("dept"))) == 1
assert len(section) == 168
assert bool(section)


def test_indefinite(schema_uni):
"""
Continually adds rows to the Department table.
"""
from time import sleep

i = 0
while True:
Department().insert1(
dict(
dept=str(i),
dept_name="foobar",
dept_address="my address",
dept_phone=str(i),
),
skip_duplicates=True,
)
i += 1
print(f"Inserting row with ID {i}")
sleep(1)