diff --git a/.circleci/config.yml b/.circleci/config.yml index 51e180a1..5004432e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -64,24 +64,6 @@ jobs: path: ~/junit - store_artifacts: path: ~/junit - build-cdc-mysql: - machine: true - working_directory: ~/eventuate-local-java - steps: - - checkout - - restore_cache: - key: eventuate-local-java-{{ checksum "build.gradle" }} - - run: TERM=dumb ./scripts/build-and-test-all-cdc-mysql.sh - - run: - name: Save test results - command: | - mkdir -p ~/junit/ - find . -type f -regex ".*/build/test-results/.*xml" -exec cp {} ~/junit/ \; - when: always - - store_test_results: - path: ~/junit - - store_artifacts: - path: ~/junit build-new-cdc-mysql: machine: true working_directory: ~/eventuate-local-java @@ -180,9 +162,6 @@ workflows: - build-postgres-wal: requires: - build - - build-cdc-mysql: - requires: - - build - build-new-cdc-mysql: requires: - build @@ -198,7 +177,6 @@ workflows: - publish: requires: - build-postgres-wal - - build-cdc-mysql - build-new-cdc-mysql - build-new-cdc-mariadb - build-new-cdc-postgres-wal diff --git a/eventuate-local-java-cdc-service/Dockerfile b/eventuate-local-java-cdc-service/Dockerfile deleted file mode 100644 index 1a23dc30..00000000 --- a/eventuate-local-java-cdc-service/Dockerfile +++ /dev/null @@ -1,5 +0,0 @@ -FROM java:openjdk-8u91-jdk -CMD java -jar *.jar -EXPOSE 8080 -COPY build/libs/*-SNAPSHOT.jar . - diff --git a/eventuate-local-java-cdc-service/build-docker.sh b/eventuate-local-java-cdc-service/build-docker.sh deleted file mode 100755 index 34e8430e..00000000 --- a/eventuate-local-java-cdc-service/build-docker.sh +++ /dev/null @@ -1,8 +0,0 @@ -#! /bin/bash -e - -if [ $(ls build/libs/*SNAPSHOT.jar | wc -l) != "1" ] ; then - echo not exactly one jar in build/libs/ - exit 99 -fi - -docker build -t test-eventuateio-local-cdc-service . diff --git a/eventuate-local-java-cdc-service/build.gradle b/eventuate-local-java-cdc-service/build.gradle deleted file mode 100644 index 3b726d29..00000000 --- a/eventuate-local-java-cdc-service/build.gradle +++ /dev/null @@ -1,17 +0,0 @@ -apply plugin: PrivateModulePlugin -apply plugin: 'org.springframework.boot' - -dependencies { - compile "org.springframework.boot:spring-boot-starter-actuator:$springBootVersion" - compile "org.springframework.boot:spring-boot-starter-web:$springBootVersion" - compile project(":eventuate-local-java-embedded-cdc-autoconfigure") - - compile ('org.postgresql:postgresql:42.1.4') { - exclude group: "org.slf4j", module: "slf4j-simple" - } - - compile('log4j:log4j:1.2.17') -} - -uploadArchives.dependsOn(tasks.findByName('bootRepackage') == null ? tasks['bootJar'] : tasks['bootRepackage']) -bintrayUpload.dependsOn(tasks.findByName('bootRepackage') == null ? tasks['bootJar'] : tasks['bootRepackage']) \ No newline at end of file diff --git a/eventuate-local-java-cdc-service/src/main/java/io/eventuate/local/cdc/main/EventuateLocalCdcMain.java b/eventuate-local-java-cdc-service/src/main/java/io/eventuate/local/cdc/main/EventuateLocalCdcMain.java deleted file mode 100644 index cf0a9953..00000000 --- a/eventuate-local-java-cdc-service/src/main/java/io/eventuate/local/cdc/main/EventuateLocalCdcMain.java +++ /dev/null @@ -1,20 +0,0 @@ -package io.eventuate.local.cdc.main; - -import io.eventuate.local.cdc.debezium.EventTableChangesToAggregateTopicRelay; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.actuate.health.HealthIndicator; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.annotation.Bean; - -@SpringBootApplication -public class EventuateLocalCdcMain { - - @Bean - public HealthIndicator relayHealthIndicator(EventTableChangesToAggregateTopicRelay relay) { - return new RelayHealthIndicator(relay); - } - - public static void main(String[] args) { - SpringApplication.run(EventuateLocalCdcMain.class, args); - } -} diff --git a/eventuate-local-java-cdc-service/src/main/java/io/eventuate/local/cdc/main/RelayHealthIndicator.java b/eventuate-local-java-cdc-service/src/main/java/io/eventuate/local/cdc/main/RelayHealthIndicator.java deleted file mode 100644 index c38c76b4..00000000 --- a/eventuate-local-java-cdc-service/src/main/java/io/eventuate/local/cdc/main/RelayHealthIndicator.java +++ /dev/null @@ -1,26 +0,0 @@ -package io.eventuate.local.cdc.main; - -import io.eventuate.local.cdc.debezium.EventTableChangesToAggregateTopicRelay; -import io.eventuate.local.cdc.debezium.RelayStatus; -import org.springframework.boot.actuate.health.Health; -import org.springframework.boot.actuate.health.HealthIndicator; - -public class RelayHealthIndicator implements HealthIndicator { - private EventTableChangesToAggregateTopicRelay relay; - - public RelayHealthIndicator(EventTableChangesToAggregateTopicRelay relay) { - - this.relay = relay; - } - - @Override - public Health health() { - RelayStatus status = relay.getStatus(); - switch (status) { - case FAILED: - return Health.down().build(); - default: - return Health.up().withDetail("status", status.name()).build(); - } - } -} diff --git a/eventuate-local-java-cdc-service/src/main/resources/application.properties b/eventuate-local-java-cdc-service/src/main/resources/application.properties deleted file mode 100644 index 0b89635e..00000000 --- a/eventuate-local-java-cdc-service/src/main/resources/application.properties +++ /dev/null @@ -1,2 +0,0 @@ -spring.datasource.test.on.borrow=true -spring.datasource.validation.query=SELECT 1 \ No newline at end of file diff --git a/eventuate-local-java-cdc-service/src/main/resources/banner.txt b/eventuate-local-java-cdc-service/src/main/resources/banner.txt deleted file mode 100644 index e39d18b4..00000000 --- a/eventuate-local-java-cdc-service/src/main/resources/banner.txt +++ /dev/null @@ -1,7 +0,0 @@ -####### -# # # ###### # # ##### # # ## ##### ###### -# # # # ## # # # # # # # # -##### # # ##### # # # # # # # # # ##### -# # # # # # # # # # ###### # # -# # # # # ## # # # # # # # -####### ## ###### # # # #### # # # ###### diff --git a/eventuate-local-java-embedded-cdc-autoconfigure/build.gradle b/eventuate-local-java-embedded-cdc-autoconfigure/build.gradle deleted file mode 100644 index cfef519b..00000000 --- a/eventuate-local-java-embedded-cdc-autoconfigure/build.gradle +++ /dev/null @@ -1,5 +0,0 @@ -apply plugin: PrivateModulePlugin - -dependencies { - compile project(":eventuate-local-java-embedded-cdc") -} \ No newline at end of file diff --git a/eventuate-local-java-embedded-cdc-autoconfigure/src/main/resources/META-INF/spring.factories b/eventuate-local-java-embedded-cdc-autoconfigure/src/main/resources/META-INF/spring.factories deleted file mode 100644 index 76b90378..00000000 --- a/eventuate-local-java-embedded-cdc-autoconfigure/src/main/resources/META-INF/spring.factories +++ /dev/null @@ -1,2 +0,0 @@ -org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ -io.eventuate.local.cdc.debezium.EventTableChangesToAggregateTopicRelayConfiguration \ No newline at end of file diff --git a/eventuate-local-java-embedded-cdc/build.gradle b/eventuate-local-java-embedded-cdc/build.gradle deleted file mode 100644 index b30793bf..00000000 --- a/eventuate-local-java-embedded-cdc/build.gradle +++ /dev/null @@ -1,44 +0,0 @@ -//configurations { -// // Kafka Connect has some un-needed dependencies that introduce problems: -// // "Unable to create a Configuration, because no Bean Validation provider" -// -// all*.exclude group: "org.glassfish.jersey.containers", module: "jersey-container-servlet" -// all*.exclude group: "org.eclipse.jetty", module: "jetty-servlets" -// all*.exclude group: "org.eclipse.jetty", module: "jetty-server" -// all*.exclude module: "slf4j-log4j12" -//} -apply plugin: PrivateModulePlugin - -dependencies { - compile 'commons-lang:commons-lang:2.6' - - compile ("io.debezium:debezium-embedded:$debeziumVersion") { - exclude group: "org.glassfish.jersey.containers", module: "jersey-container-servlet" - exclude group: "org.eclipse.jetty", module: "jetty-servlets" - exclude group: "org.eclipse.jetty", module: "jetty-server" - exclude group: "org.slf4j", module: "slf4j-log4j12" - } - - compile "io.debezium:debezium-connector-mysql:$debeziumVersion" - compile project(":eventuate-local-java-common") - compile 'org.apache.curator:curator-recipes:2.11.0' - - compile "ch.qos.logback:logback-classic:1.1.5" - compile "io.eventuate.client.java:eventuate-client-java-common-impl:$eventuateClientVersion" - compile "io.eventuate.client.java:eventuate-client-java-jdbc-common:$eventuateClientVersion" - - compile "org.springframework.boot:spring-boot-starter-jdbc:$springBootVersion" - compile ('org.postgresql:postgresql:42.1.4') { - exclude group: "org.slf4j", module: "slf4j-simple" - } - - testCompile project(":eventuate-local-java-test-util") - testCompile project(":eventuate-local-java-jdbc") - testCompile "io.eventuate.client.java:eventuate-client-java-test-util:$eventuateClientVersion" - testCompile "junit:junit:4.11" - testCompile "org.springframework.boot:spring-boot-starter-test:$springBootVersion" -} - -test { - forkEvery 1 -} \ No newline at end of file diff --git a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/CdcStartupValidator.java b/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/CdcStartupValidator.java deleted file mode 100644 index 6c632e3a..00000000 --- a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/CdcStartupValidator.java +++ /dev/null @@ -1,85 +0,0 @@ -package io.eventuate.local.cdc.debezium; - -import io.debezium.config.Configuration; -import io.debezium.connector.mysql.MySqlConnectorConfig; -import io.debezium.connector.mysql.MySqlJdbcContext; -import io.debezium.jdbc.JdbcConnection; -import io.eventuate.local.java.kafka.consumer.EventuateKafkaConsumerConfigurationProperties; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.KafkaException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.SQLException; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -public class CdcStartupValidator { - - private long kafkaValidationTimeoutMillis; - private int kafkaValidationMaxAttempts; - - private String bootstrapServers; - - private Logger logger = LoggerFactory.getLogger(this.getClass()); - private EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties; - - public CdcStartupValidator(String bootstrapServers, - EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties) { - this.bootstrapServers = bootstrapServers; - this.eventuateKafkaConsumerConfigurationProperties = eventuateKafkaConsumerConfigurationProperties; - } - - public void validateEnvironment() { - validateKafkaConnection(); - } - - private void validateKafkaConnection() { - KafkaConsumer consumer = getTestConsumer(); - - int i = kafkaValidationMaxAttempts; - KafkaException lastException = null; - while (i > 0) { - try { - consumer.listTopics(); - logger.info("Successfully tested Kafka connection"); - return; - } catch (KafkaException e) { - logger.info("Failed to connection to Kafka"); - lastException = e; - i--; - try { - Thread.sleep(kafkaValidationTimeoutMillis); - } catch (InterruptedException ie) { - throw new RuntimeException("Kafka validation had been interrupted!", ie); - } - } - } - throw lastException; - } - - private KafkaConsumer getTestConsumer() { - Properties consumerProperties = new Properties(); - consumerProperties.put("bootstrap.servers", bootstrapServers); - consumerProperties.put("group.id", "stratup-test-subscriber"); - consumerProperties.put("request.timeout.ms", String.valueOf(kafkaValidationTimeoutMillis + 1)); - consumerProperties.put("session.timeout.ms", String.valueOf(kafkaValidationTimeoutMillis)); - consumerProperties.put("heartbeat.interval.ms", String.valueOf(kafkaValidationTimeoutMillis - 1)); - consumerProperties.put("fetch.max.wait.ms", String.valueOf(kafkaValidationTimeoutMillis)); - consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - - consumerProperties.putAll(eventuateKafkaConsumerConfigurationProperties.getProperties()); - - return new KafkaConsumer<>(consumerProperties); - } - - public void setKafkaValidationTimeoutMillis(long kafkaValidationTimeoutMillis) { - this.kafkaValidationTimeoutMillis = kafkaValidationTimeoutMillis; - } - - public void setKafkaValidationMaxAttempts(int kafkaValidationMaxAttempts) { - this.kafkaValidationMaxAttempts = kafkaValidationMaxAttempts; - } -} diff --git a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/CdcStartupValidatorConfigurationProperties.java b/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/CdcStartupValidatorConfigurationProperties.java deleted file mode 100644 index 7aaeea55..00000000 --- a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/CdcStartupValidatorConfigurationProperties.java +++ /dev/null @@ -1,34 +0,0 @@ -package io.eventuate.local.cdc.debezium; - -import org.springframework.beans.factory.annotation.Value; - -public class CdcStartupValidatorConfigurationProperties { - - @Value("${eventuatelocal.startup.validator.mysql.validation.timeout.millis:#{1000}}") - private long mySqlValidationTimeoutMillis = 1000; - - @Value("${eventuatelocal.startup.validator.mysql.validation.max.attempts:#{100}}") - private int mySqlValidationMaxAttempts = 100; - - @Value("${eventuatelocal.startup.validator.kafka.validation.timeout.millis:#{1000}}") - private long kafkaValidationTimeoutMillis = 1000; - - @Value("${eventuatelocal.startup.validator.kafka.validation.max.attempts:#{100}}") - private int kafkaValidationMaxAttempts = 100; - - public long getMySqlValidationTimeoutMillis() { - return mySqlValidationTimeoutMillis; - } - - public int getMySqlValidationMaxAttempts() { - return mySqlValidationMaxAttempts; - } - - public long getKafkaValidationTimeoutMillis() { - return kafkaValidationTimeoutMillis; - } - - public int getKafkaValidationMaxAttempts() { - return kafkaValidationMaxAttempts; - } -} diff --git a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/DebeziumCdcStartupValidator.java b/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/DebeziumCdcStartupValidator.java deleted file mode 100644 index e6b3b36e..00000000 --- a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/DebeziumCdcStartupValidator.java +++ /dev/null @@ -1,84 +0,0 @@ -package io.eventuate.local.cdc.debezium; - -import io.debezium.config.Configuration; -import io.debezium.connector.mysql.MySqlConnectorConfig; -import io.debezium.connector.mysql.MySqlJdbcContext; -import io.debezium.jdbc.JdbcConnection; -import io.eventuate.local.java.kafka.consumer.EventuateKafkaConsumerConfigurationProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.SQLException; -import java.util.HashMap; -import java.util.Map; - -public class DebeziumCdcStartupValidator extends CdcStartupValidator { - - private JdbcUrl jdbcUrl; - private String dbUser; - private String dbPassword; - private long mySqlValidationTimeoutMillis; - private int mySqlValidationMaxAttempts; - - private Logger logger = LoggerFactory.getLogger(this.getClass()); - - public DebeziumCdcStartupValidator(JdbcUrl jdbcUrl, - String dbUser, - String dbPassword, - String bootstrapServers, - EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties) { - super(bootstrapServers, eventuateKafkaConsumerConfigurationProperties); - this.jdbcUrl = jdbcUrl; - this.dbUser = dbUser; - this.dbPassword = dbPassword; - } - - @Override - public void validateEnvironment() { - super.validateEnvironment(); - validateDatasourceConnection(); - } - - private void validateDatasourceConnection() { - logger.info("About to validate DataSource connection"); - Map connectorConfig = new HashMap<>(); - connectorConfig.put(MySqlConnectorConfig.HOSTNAME.name(), jdbcUrl.getHost()); - connectorConfig.put(MySqlConnectorConfig.PORT.name(), String.valueOf(jdbcUrl.getPort())); - connectorConfig.put(MySqlConnectorConfig.USER.name(), dbUser); - connectorConfig.put(MySqlConnectorConfig.PASSWORD.name(), dbPassword); - - Configuration config = Configuration.from(connectorConfig); - try (MySqlJdbcContext jdbcContext = new MySqlJdbcContext(config)) { - jdbcContext.start(); - JdbcConnection mysql = jdbcContext.jdbc(); - int i = mySqlValidationMaxAttempts; - SQLException lastException = null; - while (i > 0) { - try { - mysql.execute("SELECT version()"); - logger.info("Successfully tested connection for {}:{} with user '{}'", jdbcContext.hostname(), jdbcContext.port(), mysql.username()); - return; - } catch (SQLException e) { - lastException = e; - logger.warn(String.format("Failed testing connection for %s:%s with user '%s'", jdbcContext.hostname(), jdbcContext.port(), mysql.username()), e); - i--; - try { - Thread.sleep(mySqlValidationTimeoutMillis); - } catch (InterruptedException ie) { - throw new RuntimeException("MySql validation had been interrupted!", ie); - } - } - } - logger.error(String.format("Could not connect to database for %s:%s with user '%s'", jdbcContext.hostname(), jdbcContext.port(), mysql.username()), lastException); - throw new RuntimeException(lastException); - } - } - - public void setMySqlValidationTimeoutMillis(long mySqlValidationTimeoutMillis) { - this.mySqlValidationTimeoutMillis = mySqlValidationTimeoutMillis; - } - - public void setMySqlValidationMaxAttempts(int mySqlValidationMaxAttempts) { - this.mySqlValidationMaxAttempts = mySqlValidationMaxAttempts; - } -} diff --git a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/EventPollingDao.java b/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/EventPollingDao.java deleted file mode 100644 index 18341c23..00000000 --- a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/EventPollingDao.java +++ /dev/null @@ -1,87 +0,0 @@ -package io.eventuate.local.cdc.debezium; - -import com.google.common.collect.ImmutableMap; -import io.eventuate.javaclient.spring.jdbc.EventuateSchema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.dao.DataAccessResourceFailureException; -import org.springframework.jdbc.core.BeanPropertyRowMapper; -import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; - -import javax.sql.DataSource; -import java.util.List; -import java.util.concurrent.Callable; - -public class EventPollingDao { - private Logger logger = LoggerFactory.getLogger(getClass()); - - private DataSource dataSource; - private NamedParameterJdbcTemplate namedParameterJdbcTemplate; - private int maxEventsPerPolling; - private int maxAttemptsForPolling; - private int pollingRetryIntervalInMilliseconds; - private String eventTable; - - public EventPollingDao(DataSource dataSource, - int maxEventsPerPolling, - int maxAttemptsForPolling, - int pollingRetryIntervalInMilliseconds, - EventuateSchema eventuateSchema) { - - if (maxEventsPerPolling <= 0) { - throw new IllegalArgumentException("Max events per polling parameter should be greater than 0."); - } - - this.dataSource = dataSource; - this.namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(dataSource); - this.maxEventsPerPolling = maxEventsPerPolling; - this.maxAttemptsForPolling = maxAttemptsForPolling; - this.pollingRetryIntervalInMilliseconds = pollingRetryIntervalInMilliseconds; - - eventTable = eventuateSchema.qualifyTable("events"); - } - - public int getMaxEventsPerPolling() { - return maxEventsPerPolling; - } - - public void setMaxEventsPerPolling(int maxEventsPerPolling) { - this.maxEventsPerPolling = maxEventsPerPolling; - } - - public List findEventsToPublish() { - return handleConnectionLost(() -> - namedParameterJdbcTemplate.query(String.format("SELECT * FROM %s WHERE published = 0 ORDER BY event_id ASC limit :limit", eventTable), - ImmutableMap.of("limit", maxEventsPerPolling), new BeanPropertyRowMapper(EventToPublish.class))); - } - - public void markEventsAsPublished(List ids) { - handleConnectionLost(() -> namedParameterJdbcTemplate.update(String.format("UPDATE %s SET published = 1 WHERE event_id in (:ids)", eventTable), - ImmutableMap.of("ids", ids))); - } - - private T handleConnectionLost(Callable query) { - int attempt = 0; - - while(true) { - try { - return query.call(); - } catch (DataAccessResourceFailureException e) { - - logger.error(e.getMessage(), e); - - if (attempt++ >= maxAttemptsForPolling) { - throw e; - } - - try { - Thread.sleep(pollingRetryIntervalInMilliseconds); - } catch (InterruptedException ie) { - logger.error(ie.getMessage(), ie); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } -} diff --git a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/EventTableChangesToAggregateTopicRelay.java b/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/EventTableChangesToAggregateTopicRelay.java deleted file mode 100644 index 3007f3be..00000000 --- a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/EventTableChangesToAggregateTopicRelay.java +++ /dev/null @@ -1,191 +0,0 @@ -package io.eventuate.local.cdc.debezium; - - -import io.eventuate.javaclient.commonimpl.JSonMapper; -import io.eventuate.local.common.AggregateTopicMapping; -import io.eventuate.local.common.PublishedEvent; -import io.eventuate.local.java.kafka.producer.EventuateKafkaProducer; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.leader.LeaderSelector; -import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; -import org.apache.curator.framework.state.ConnectionState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Subscribes to changes made to EVENTS table and publishes them to aggregate topics - */ -public abstract class EventTableChangesToAggregateTopicRelay { - - private Logger logger = LoggerFactory.getLogger(getClass()); - - protected EventuateKafkaProducer producer; - protected String kafkaBootstrapServers; - protected CdcStartupValidator cdcStartupValidator; - - private final LeaderSelector leaderSelector; - - private AtomicReference status = new AtomicReference<>(RelayStatus.IDLE); - - public RelayStatus getStatus() { - return status.get(); - } - - private final TakeLeadershipAttemptTracker takeLeadershipAttemptTracker; - - protected EventTableChangesToAggregateTopicRelay(String kafkaBootstrapServers, - CuratorFramework client, - CdcStartupValidator cdcStartupValidator, - TakeLeadershipAttemptTracker takeLeadershipAttemptTracker, String leadershipLockPath) { - - this.kafkaBootstrapServers = kafkaBootstrapServers; - - this.cdcStartupValidator = cdcStartupValidator; - this.takeLeadershipAttemptTracker = takeLeadershipAttemptTracker; - - leaderSelector = new LeaderSelector(client, leadershipLockPath, new LeaderSelectorListener() { - - @Override - public void takeLeadership(CuratorFramework client) throws Exception { - takeLeadership(); - } - - private void takeLeadership() throws InterruptedException { - - RuntimeException laste = null; - do { - takeLeadershipAttemptTracker.attempting(); - status.set(RelayStatus.RUNNING); - logger.info("Taking leadership"); - try { - CompletableFuture completion = startCapturingChanges(); - try { - completion.get(); - } catch (InterruptedException e) { - logger.error("Interrupted while taking leadership"); - } - return; - } catch (Throwable t) { - switch (status.get()) { - case RUNNING: - status.set(RelayStatus.FAILED); - } - logger.error("In takeLeadership", t); - laste = t instanceof RuntimeException ? (RuntimeException) t : new RuntimeException(t); - doResign(); - } finally { - logger.debug("TakeLeadership returning"); - } - } while (status.get() == RelayStatus.FAILED && takeLeadershipAttemptTracker.shouldAttempt()); - throw laste; - } - - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) { - - logger.debug("StateChanged: {}", newState); - - switch (newState) { - case SUSPENDED: - resignLeadership(); - break; - - case RECONNECTED: - try { - takeLeadership(); - } catch (InterruptedException e) { - logger.error("While handling RECONNECTED", e); - } - break; - - case LOST: - resignLeadership(); - break; - } - } - - private void resignLeadership() { - status.set(RelayStatus.STOPPING); - doResign(); - status.set(RelayStatus.IDLE); - } - - private void doResign() { - logger.info("Resigning leadership"); - try { - stopCapturingChanges(); - } catch (InterruptedException e) { - logger.error("While handling SUSPEND", e); - } - } - }); - } - - @PostConstruct - public void start() { - logger.info("CDC initialized. Ready to become leader"); - leaderSelector.start(); - } - - public abstract CompletableFuture startCapturingChanges() throws InterruptedException; - - @PreDestroy - public void stop() throws InterruptedException { - //stopCapturingChanges(); - leaderSelector.close(); - } - - public abstract void stopCapturingChanges() throws InterruptedException; - - - public static String toJson(PublishedEvent eventInfo) { - return JSonMapper.toJson(eventInfo); - } - - protected void handleEvent(EventToPublish eventToPublish) { - - PublishedEvent pe = new PublishedEvent(eventToPublish.getEventId(), - eventToPublish.getEntityId(), - eventToPublish.getEntityType(), - eventToPublish.getEventData(), - eventToPublish.getEventType(), - null, - eventToPublish.getMetadataOptional()); - - - String aggregateTopic = AggregateTopicMapping.aggregateTypeToTopic(pe.getEntityType()); - String json = toJson(pe); - - if (logger.isInfoEnabled()) - logger.debug("Publishing triggeringEvent={}, event={}", eventToPublish.getTriggeringEvent(), json); - - try { - producer.send(aggregateTopic, - eventToPublish.getEntityId(), - json).get(10, TimeUnit.SECONDS); - } catch (RuntimeException e) { - logger.error("error publishing to " + aggregateTopic, e); - throw e; - } catch (Throwable e) { - logger.error("error publishing to " + aggregateTopic, e); - throw new RuntimeException(e); - } - } - -// public static class MyKafkaOffsetBackingStore extends KafkaOffsetBackingStore { -// -// @Override -// public void configure(WorkerConfig configs) { -// Map updatedConfig = new HashMap<>(configs.originals()); -// updatedConfig.put("bootstrap.servers", kafkaBootstrapServers); -// super.configure(new WorkerConfig(configs.)); -// } -// } - -} diff --git a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/EventTableChangesToAggregateTopicRelayConfiguration.java b/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/EventTableChangesToAggregateTopicRelayConfiguration.java deleted file mode 100644 index 66d9411a..00000000 --- a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/EventTableChangesToAggregateTopicRelayConfiguration.java +++ /dev/null @@ -1,164 +0,0 @@ -package io.eventuate.local.cdc.debezium; - -import io.eventuate.javaclient.spring.jdbc.EventuateSchema; -import io.eventuate.local.common.MySqlBinlogCondition; -import io.eventuate.local.java.kafka.EventuateKafkaConfigurationProperties; -import io.eventuate.local.java.kafka.consumer.EventuateKafkaConsumerConfigurationProperties; -import io.eventuate.local.java.kafka.producer.EventuateKafkaProducerConfigurationProperties; -import org.apache.curator.RetryPolicy; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.ExponentialBackoffRetry; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Conditional; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Profile; - -import javax.sql.DataSource; - -@Configuration -@EnableConfigurationProperties({EventuateKafkaProducerConfigurationProperties.class, - EventuateKafkaConsumerConfigurationProperties.class}) -public class EventTableChangesToAggregateTopicRelayConfiguration { - - @Bean - public EventuateKafkaConfigurationProperties eventuateKafkaConfigurationProperties() { - return new EventuateKafkaConfigurationProperties(); - } - - @Bean - public EventuateLocalZookeperConfigurationProperties eventuateLocalZookeperConfigurationProperties() { - return new EventuateLocalZookeperConfigurationProperties(); - } - - @Bean - public CdcStartupValidatorConfigurationProperties cdcStartupValidatorConfigurationProperties() { - return new CdcStartupValidatorConfigurationProperties(); - } - - @Bean - public EventTableChangesToAggregateTopicRelayConfigurationProperties eventTableChangesToAggregateTopicRelayConfigurationProperties() { - return new EventTableChangesToAggregateTopicRelayConfigurationProperties(); - } - - @Bean - public EventuateSchema eventuateSchema(@Value("${eventuate.database.schema:#{null}}") String eventuateDatabaseSchema) { - return new EventuateSchema(eventuateDatabaseSchema); - } - - @Bean - @Conditional(MySqlBinlogCondition.class) - public EventTableChangesToAggregateTopicRelay embeddedDebeziumCDC(EventuateSchema eventuateSchema, - @Value("${spring.datasource.url}") String dataSourceURL, - EventTableChangesToAggregateTopicRelayConfigurationProperties eventTableChangesToAggregateTopicRelayConfigurationProperties, - EventuateKafkaConfigurationProperties eventuateKafkaConfigurationProperties, - CuratorFramework client, - CdcStartupValidator cdcStartupValidator, - EventuateKafkaProducerConfigurationProperties eventuateKafkaProducerConfigurationProperties) { - - JdbcUrl jdbcUrl = JdbcUrlParser.parse(dataSourceURL); - - return new MySqlBinLogBasedEventTableChangesToAggregateTopicRelay( - eventuateKafkaConfigurationProperties.getBootstrapServers(), - jdbcUrl, - eventTableChangesToAggregateTopicRelayConfigurationProperties.getDbUserName(), - eventTableChangesToAggregateTopicRelayConfigurationProperties.getDbPassword(), - client, - cdcStartupValidator, - new TakeLeadershipAttemptTracker(eventTableChangesToAggregateTopicRelayConfigurationProperties.getMaxRetries(), - eventTableChangesToAggregateTopicRelayConfigurationProperties.getRetryPeriodInMilliseconds()), - eventTableChangesToAggregateTopicRelayConfigurationProperties.getLeadershipLockPath(), - eventuateSchema, - eventuateKafkaProducerConfigurationProperties); - } - - @Bean - @Profile("EventuatePolling") - public EventTableChangesToAggregateTopicRelay pollingCDC(EventPollingDao eventPollingDao, - EventTableChangesToAggregateTopicRelayConfigurationProperties eventTableChangesToAggregateTopicRelayConfigurationProperties, - EventuateKafkaConfigurationProperties eventuateKafkaConfigurationProperties, - CuratorFramework client, - CdcStartupValidator cdcStartupValidator, - EventuateKafkaProducerConfigurationProperties eventuateKafkaProducerConfigurationProperties) { - - return new PollingBasedEventTableChangesToAggregateTopicRelay(eventPollingDao, - eventTableChangesToAggregateTopicRelayConfigurationProperties.getPollingIntervalInMilliseconds(), - eventuateKafkaConfigurationProperties.getBootstrapServers(), - client, - cdcStartupValidator, - new TakeLeadershipAttemptTracker(eventTableChangesToAggregateTopicRelayConfigurationProperties.getMaxRetries(), - eventTableChangesToAggregateTopicRelayConfigurationProperties.getRetryPeriodInMilliseconds()), - eventTableChangesToAggregateTopicRelayConfigurationProperties.getLeadershipLockPath(), - eventuateKafkaProducerConfigurationProperties); - } - - @Bean - @Conditional(MySqlBinlogCondition.class) - public CdcStartupValidator debeziumCdcStartupValidator(@Value("${spring.datasource.url}") String dataSourceURL, - EventTableChangesToAggregateTopicRelayConfigurationProperties eventTableChangesToAggregateTopicRelayConfigurationProperties, - EventuateKafkaConfigurationProperties eventuateKafkaConfigurationProperties, - CdcStartupValidatorConfigurationProperties cdcStartupValidatorConfigurationProperties, - EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties) { - - JdbcUrl jdbcUrl = JdbcUrlParser.parse(dataSourceURL); - - DebeziumCdcStartupValidator cdcStartupValidator = new DebeziumCdcStartupValidator(jdbcUrl, - eventTableChangesToAggregateTopicRelayConfigurationProperties.getDbUserName(), - eventTableChangesToAggregateTopicRelayConfigurationProperties.getDbPassword(), - eventuateKafkaConfigurationProperties.getBootstrapServers(), - eventuateKafkaConsumerConfigurationProperties); - - cdcStartupValidator.setMySqlValidationMaxAttempts(cdcStartupValidatorConfigurationProperties.getMySqlValidationMaxAttempts()); - cdcStartupValidator.setMySqlValidationTimeoutMillis(cdcStartupValidatorConfigurationProperties.getMySqlValidationTimeoutMillis()); - cdcStartupValidator.setKafkaValidationMaxAttempts(cdcStartupValidatorConfigurationProperties.getKafkaValidationMaxAttempts()); - cdcStartupValidator.setKafkaValidationTimeoutMillis(cdcStartupValidatorConfigurationProperties.getKafkaValidationTimeoutMillis()); - - return cdcStartupValidator; - } - - @Bean - @Profile("EventuatePolling") - public CdcStartupValidator basicCdcStartupValidator(EventuateKafkaConfigurationProperties eventuateKafkaConfigurationProperties, - CdcStartupValidatorConfigurationProperties cdcStartupValidatorConfigurationProperties, - EventuateKafkaConsumerConfigurationProperties eventuateKafkaConsumerConfigurationProperties) { - - CdcStartupValidator cdcStartupValidator = new CdcStartupValidator(eventuateKafkaConfigurationProperties.getBootstrapServers(), - eventuateKafkaConsumerConfigurationProperties); - - cdcStartupValidator.setKafkaValidationMaxAttempts(cdcStartupValidatorConfigurationProperties.getKafkaValidationMaxAttempts()); - cdcStartupValidator.setKafkaValidationTimeoutMillis(cdcStartupValidatorConfigurationProperties.getKafkaValidationTimeoutMillis()); - - return cdcStartupValidator; - } - - @Bean - @Profile("EventuatePolling") - public EventPollingDao eventPollingDao(EventuateSchema eventuateSchema, - DataSource dataSource, - EventTableChangesToAggregateTopicRelayConfigurationProperties eventTableChangesToAggregateTopicRelayConfigurationProperties) { - - return new EventPollingDao(dataSource, - eventTableChangesToAggregateTopicRelayConfigurationProperties.getMaxEventsPerPolling(), - eventTableChangesToAggregateTopicRelayConfigurationProperties.getMaxAttemptsForPolling(), - eventTableChangesToAggregateTopicRelayConfigurationProperties.getPollingRetryIntervalInMilliseconds(), - eventuateSchema); - } - - @Bean(destroyMethod = "close") - public CuratorFramework curatorFramework(EventuateLocalZookeperConfigurationProperties eventuateLocalZookeperConfigurationProperties) { - String connectionString = eventuateLocalZookeperConfigurationProperties.getConnectionString(); - return makeStartedCuratorClient(connectionString); - } - - static CuratorFramework makeStartedCuratorClient(String connectionString) { - RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); - CuratorFramework client = CuratorFrameworkFactory. - builder().retryPolicy(retryPolicy) - .connectString(connectionString) - .build(); - client.start(); - return client; - } -} diff --git a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/EventTableChangesToAggregateTopicRelayConfigurationProperties.java b/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/EventTableChangesToAggregateTopicRelayConfigurationProperties.java deleted file mode 100644 index 182e3dcb..00000000 --- a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/EventTableChangesToAggregateTopicRelayConfigurationProperties.java +++ /dev/null @@ -1,69 +0,0 @@ -package io.eventuate.local.cdc.debezium; - -import org.springframework.beans.factory.annotation.Value; - -public class EventTableChangesToAggregateTopicRelayConfigurationProperties { - - @Value("${eventuatelocal.cdc.db.user.name:#{null}}") - private String dbUserName; - - @Value("${eventuatelocal.cdc.db.password:#{null}}") - private String dbPassword; - - @Value("${eventuatelocal.cdc.polling.interval.in.milliseconds:#{500}}") - private int pollingIntervalInMilliseconds; - - @Value("${eventuatelocal.cdc.max.events.per.polling:#{1000}}") - private int maxEventsPerPolling; - - @Value("${eventuatelocal.cdc.max.attempts.for.polling:#{100}}") - private int maxAttemptsForPolling; - - @Value("${eventuatelocal.cdc.polling.retry.interval.in.milliseconds:#{500}}") - private int pollingRetryIntervalInMilliseconds; - - @Value("${eventuatelocal.cdc.max.retries:#{5}}") - private int maxRetries; - - @Value("${eventuatelocal.cdc.retry.period.in.milliseconds:#{500}}") - private long retryPeriodInMilliseconds; - - @Value("${eventuatelocal.cdc.leadership.lock.path:#{\"/eventuatelocal/cdc/leader\"}}") - private String leadershipLockPath; - - public String getDbUserName() { - return dbUserName; - } - - public String getDbPassword() { - return dbPassword; - } - - public int getPollingIntervalInMilliseconds() { - return pollingIntervalInMilliseconds; - } - - public int getMaxEventsPerPolling() { - return maxEventsPerPolling; - } - - public int getMaxAttemptsForPolling() { - return maxAttemptsForPolling; - } - - public int getPollingRetryIntervalInMilliseconds() { - return pollingRetryIntervalInMilliseconds; - } - - public int getMaxRetries() { - return maxRetries; - } - - public long getRetryPeriodInMilliseconds() { - return retryPeriodInMilliseconds; - } - - public String getLeadershipLockPath() { - return leadershipLockPath; - } -} diff --git a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/EventToPublish.java b/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/EventToPublish.java deleted file mode 100644 index b265476c..00000000 --- a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/EventToPublish.java +++ /dev/null @@ -1,108 +0,0 @@ -package io.eventuate.local.cdc.debezium; - -import java.util.Optional; - -public class EventToPublish { - private String eventId; - private String eventType; - private String eventData; - private String entityType; - private String entityId; - private String triggeringEvent; - private String metadata; - - public EventToPublish() { - } - - public EventToPublish(String eventId, - String eventType, - String eventData, - String entityType, - String entityId, - String triggeringEvent, - String metadata - ) { - this.eventId = eventId; - this.eventType = eventType; - this.eventData = eventData; - this.entityType = entityType; - this.entityId = entityId; - this.triggeringEvent = triggeringEvent; - this.metadata = metadata; - } - - public EventToPublish(String eventId, - String eventType, - String eventData, - String entityType, - String entityId, - String triggeringEvent, - Optional metadata - ) { - this(eventId, eventType, eventData, entityType, entityId, triggeringEvent, metadata.orElse(null)); - } - - public String getEventId() { - return eventId; - } - - public void setEventId(String eventId) { - this.eventId = eventId; - } - - public String getEventType() { - return eventType; - } - - public void setEventType(String eventType) { - this.eventType = eventType; - } - - public String getEventData() { - return eventData; - } - - public void setEventData(String eventData) { - this.eventData = eventData; - } - - public String getEntityType() { - return entityType; - } - - public void setEntityType(String entityType) { - this.entityType = entityType; - } - - public String getEntityId() { - return entityId; - } - - public void setEntityId(String entityId) { - this.entityId = entityId; - } - - public String getTriggeringEvent() { - return triggeringEvent; - } - - public void setTriggeringEvent(String triggeringEvent) { - this.triggeringEvent = triggeringEvent; - } - - public String getMetadata() { - return metadata; - } - - public void setMetadata(String metadata) { - this.metadata = metadata; - } - - public Optional getMetadataOptional() { - return Optional.ofNullable(metadata); - } - - public void setMetadataOptional(Optional metadata) { - this.metadata = metadata.orElse(null); - } -} diff --git a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/EventuateLocalZookeperConfigurationProperties.java b/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/EventuateLocalZookeperConfigurationProperties.java deleted file mode 100644 index 5a3a9e61..00000000 --- a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/EventuateLocalZookeperConfigurationProperties.java +++ /dev/null @@ -1,13 +0,0 @@ -package io.eventuate.local.cdc.debezium; - -import org.springframework.beans.factory.annotation.Value; - -public class EventuateLocalZookeperConfigurationProperties { - - @Value("${eventuatelocal.zookeeper.connection.string}") - private String connectionString; - - public String getConnectionString() { - return connectionString; - } -} diff --git a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/JdbcUrl.java b/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/JdbcUrl.java deleted file mode 100644 index c688cc42..00000000 --- a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/JdbcUrl.java +++ /dev/null @@ -1,26 +0,0 @@ -package io.eventuate.local.cdc.debezium; - -public class JdbcUrl { - - String host; - int port; - String database; - - public JdbcUrl(String host, int port, String database) { - this.host = host; - this.port = port; - this.database = database; - } - - public String getHost() { - return host; - } - - public int getPort() { - return port; - } - - public String getDatabase() { - return database; - } -} diff --git a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/JdbcUrlParser.java b/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/JdbcUrlParser.java deleted file mode 100644 index c1844c2f..00000000 --- a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/JdbcUrlParser.java +++ /dev/null @@ -1,19 +0,0 @@ -package io.eventuate.local.cdc.debezium; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -public class JdbcUrlParser { - public static JdbcUrl parse(String dataSourceURL) { - Pattern p = Pattern.compile("jdbc:[a-zA-Z0-9]+://([^:/]+)(:[0-9]+)?/([^?]+)(\\?.*)?$"); - Matcher m = p.matcher(dataSourceURL); - - if (!m.matches()) - throw new RuntimeException(dataSourceURL); - - String host = m.group(1); - String port = m.group(2); - String database = m.group(3); - return new JdbcUrl(host, port == null ? 3306 : Integer.parseInt(port.substring(1)), database); - } -} diff --git a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/MySqlBinLogBasedEventTableChangesToAggregateTopicRelay.java b/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/MySqlBinLogBasedEventTableChangesToAggregateTopicRelay.java deleted file mode 100644 index 2cf442d7..00000000 --- a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/MySqlBinLogBasedEventTableChangesToAggregateTopicRelay.java +++ /dev/null @@ -1,167 +0,0 @@ -package io.eventuate.local.cdc.debezium; - - -import io.debezium.config.Configuration; -import io.debezium.embedded.EmbeddedEngine; -import io.eventuate.javaclient.spring.jdbc.EventuateSchema; -import io.eventuate.local.java.kafka.producer.EventuateKafkaProducer; -import io.eventuate.local.java.kafka.producer.EventuateKafkaProducerConfigurationProperties; -import org.apache.curator.framework.CuratorFramework; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.source.SourceRecord; -import org.apache.kafka.connect.storage.KafkaOffsetBackingStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -public class MySqlBinLogBasedEventTableChangesToAggregateTopicRelay extends EventTableChangesToAggregateTopicRelay { - - private Logger logger = LoggerFactory.getLogger(getClass()); - - private final JdbcUrl jdbcUrl; - private final String dbUser; - private final String dbPassword; - - private EmbeddedEngine engine; - - private EventuateSchema eventuateSchema; - private EventuateKafkaProducerConfigurationProperties eventuateKafkaProducerConfigurationProperties; - - public MySqlBinLogBasedEventTableChangesToAggregateTopicRelay(String kafkaBootstrapServers, - JdbcUrl jdbcUrl, - String dbUser, - String dbPassword, - CuratorFramework client, - CdcStartupValidator cdcStartupValidator, - TakeLeadershipAttemptTracker takeLeadershipAttemptTracker, - String leadershipLockPath, - EventuateSchema eventuateSchema, - EventuateKafkaProducerConfigurationProperties eventuateKafkaProducerConfigurationProperties) { - - super(kafkaBootstrapServers, client, cdcStartupValidator, takeLeadershipAttemptTracker, leadershipLockPath); - - this.jdbcUrl = jdbcUrl; - this.dbUser = dbUser; - this.dbPassword = dbPassword; - - this.eventuateSchema = eventuateSchema; - this.eventuateKafkaProducerConfigurationProperties = eventuateKafkaProducerConfigurationProperties; - } - - public CompletableFuture startCapturingChanges() throws InterruptedException { - logger.debug("Starting to capture changes"); - - cdcStartupValidator.validateEnvironment(); - - producer = new EventuateKafkaProducer(kafkaBootstrapServers, eventuateKafkaProducerConfigurationProperties); - - String connectorName = "my-sql-connector"; - Configuration config = Configuration.create() - /* begin engine properties */ - .with("connector.class", - "io.debezium.connector.mysql.MySqlConnector") - - .with("offset.storage", KafkaOffsetBackingStore.class.getName()) - .with("bootstrap.servers", kafkaBootstrapServers) - .with("offset.storage.topic", "eventuate.local.cdc." + connectorName + ".offset.storage") - - .with("poll.interval.ms", 50) - .with("offset.flush.interval.ms", 6000) - /* begin connector properties */ - .with("name", connectorName) - .with("database.hostname", jdbcUrl.getHost()) - .with("database.port", jdbcUrl.getPort()) - .with("database.user", dbUser) - .with("database.password", dbPassword) - .with("database.server.id", 85744) - .with("database.server.name", "my-app-connector") - // Unnecessary.with("database.whitelist", jdbcUrl.getDatabase()) - .with("table.whitelist", eventuateSchema.isEmpty() ? jdbcUrl.getDatabase() + ".events" : eventuateSchema.qualifyTable("events")) - .with("database.history", - io.debezium.relational.history.KafkaDatabaseHistory.class.getName()) - .with("database.history.kafka.topic", - "eventuate.local.cdc." + connectorName + ".history.kafka.topic") - .with("database.history.kafka.bootstrap.servers", - kafkaBootstrapServers) - .build(); - - CompletableFuture completion = new CompletableFuture<>(); - engine = EmbeddedEngine.create() - .using((success, message, throwable) -> { - if (success) { - completion.complete(null); - } - else - completion.completeExceptionally(new RuntimeException("Engine through exception" + message, throwable)); - }) - .using(config) - .notifying(this::receiveEvent) - .build(); - - Executor executor = Executors.newCachedThreadPool(); - executor.execute(() -> { - try { - engine.run(); - } catch (Throwable t) { - t.printStackTrace(); - } - }); - - logger.debug("Started engine"); - return completion; - } - - @Override - public void stopCapturingChanges() throws InterruptedException { - - logger.debug("Stopping to capture changes"); - - if (producer != null) - producer.close(); - - - if (engine != null) { - - logger.debug("Stopping Debezium engine"); - engine.stop(); - - try { - while (!engine.await(30, TimeUnit.SECONDS)) { - logger.debug("Waiting another 30 seconds for the embedded engine to shut down"); - } - } catch (InterruptedException e) { - Thread.interrupted(); - } - } - } - - private void receiveEvent(SourceRecord sourceRecord) { - logger.trace("Got record"); - String topic = sourceRecord.topic(); - if (String.format("my-app-connector.%s.events", eventuateSchema.isEmpty() ? jdbcUrl.getDatabase() : eventuateSchema.getEventuateDatabaseSchema()).equals(topic)) { - Struct value = (Struct) sourceRecord.value(); - - //event operation must be 'create' - if (value == null || !"c".equals(value.get("op"))) { - return; - } - - Struct after = value.getStruct("after"); - - String eventId = after.getString("event_id"); - String eventType = after.getString("event_type"); - String eventData = after.getString("event_data"); - String entityType = after.getString("entity_type"); - String entityId = after.getString("entity_id"); - String triggeringEvent = after.getString("triggering_event"); - Optional metadata = Optional.ofNullable(after.getString("metadata")); - - handleEvent(new EventToPublish(eventId, eventType, eventData, entityType, entityId, triggeringEvent, metadata)); - } - } -} diff --git a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/PollingBasedEventTableChangesToAggregateTopicRelay.java b/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/PollingBasedEventTableChangesToAggregateTopicRelay.java deleted file mode 100644 index 7e220719..00000000 --- a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/PollingBasedEventTableChangesToAggregateTopicRelay.java +++ /dev/null @@ -1,123 +0,0 @@ -package io.eventuate.local.cdc.debezium; - - -import io.eventuate.local.java.kafka.producer.EventuateKafkaProducer; -import io.eventuate.local.java.kafka.producer.EventuateKafkaProducerConfigurationProperties; -import org.apache.curator.framework.CuratorFramework; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; - -/** - * Monitors changes made to EVENTS table and publishes them to aggregate topics - */ -public class PollingBasedEventTableChangesToAggregateTopicRelay extends EventTableChangesToAggregateTopicRelay { - - private Logger logger = LoggerFactory.getLogger(getClass()); - private EventPollingDao eventPollingDao; - private int pollingIntervalInMilliseconds; - private final AtomicBoolean watcherRunning = new AtomicBoolean(); - private volatile CompletableFuture watcherFuture = new CompletableFuture<>(); - private EventuateKafkaProducerConfigurationProperties eventuateKafkaProducerConfigurationProperties; - - public PollingBasedEventTableChangesToAggregateTopicRelay( - EventPollingDao eventPollingDao, - int pollingIntervalInMilliseconds, - String kafkaBootstrapServers, - CuratorFramework client, - CdcStartupValidator cdcStartupValidator, - TakeLeadershipAttemptTracker takeLeadershipAttemptTracker, - String leadershipLockPath, - EventuateKafkaProducerConfigurationProperties eventuateKafkaProducerConfigurationProperties) { - - super(kafkaBootstrapServers, client, cdcStartupValidator, takeLeadershipAttemptTracker, leadershipLockPath); - this.eventPollingDao = eventPollingDao; - this.pollingIntervalInMilliseconds = pollingIntervalInMilliseconds; - this.eventuateKafkaProducerConfigurationProperties = eventuateKafkaProducerConfigurationProperties; - } - - public CompletableFuture startCapturingChanges() throws InterruptedException { - logger.debug("Starting to capture changes"); - watcherRunning.set(true); - - cdcStartupValidator.validateEnvironment(); - producer = new EventuateKafkaProducer(kafkaBootstrapServers, eventuateKafkaProducerConfigurationProperties); - - CompletableFuture completableFuture = new CompletableFuture<>(); - - new Thread() { - @Override - public void run() { - - while (watcherRunning.get()) { - try { - - List eventToPublishes = eventPollingDao.findEventsToPublish(); - - if (!eventToPublishes.isEmpty()) - logger.debug("Found {} events to publish", eventToPublishes.size()); - - eventToPublishes.forEach(eventToPublish -> handleEvent(eventToPublish)); - - if (!eventToPublishes.isEmpty()) { - - logger.debug("Marking {} events as published", eventToPublishes.size()); - - eventPollingDao.markEventsAsPublished(eventToPublishes - .stream() - .map(EventToPublish::getEventId) - .collect(Collectors.toList())); - } - - completableFuture.complete(null); - - if (eventToPublishes.isEmpty()) - try { - logger.debug("No events. Sleeping for {} msecs", pollingIntervalInMilliseconds); - Thread.sleep(pollingIntervalInMilliseconds); - } catch (Exception e) { - logger.error("error while sleeping", e); - } - } catch (Exception e) { - logger.error("Exception in polling loop", e); - completableFuture.completeExceptionally(new RuntimeException("Polling exception" + e.getMessage(), e)); - } - } - watcherFuture.complete(null); - watcherFuture = new CompletableFuture<>(); - } - }.start(); - - return completableFuture; - } - - @Override - public void stopCapturingChanges() throws InterruptedException { - - logger.debug("Stopping to capture changes"); - - if (!watcherRunning.get()) { - return; - } - - watcherRunning.set(false); - - if (producer != null) - producer.close(); - - try { - watcherFuture.get(60, TimeUnit.SECONDS); - } catch (ExecutionException | TimeoutException e) { - logger.error(e.getMessage(), e); - throw new RuntimeException(e); - } - } - -} diff --git a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/RelayStatus.java b/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/RelayStatus.java deleted file mode 100644 index d38f57cc..00000000 --- a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/RelayStatus.java +++ /dev/null @@ -1,5 +0,0 @@ -package io.eventuate.local.cdc.debezium; - -public enum RelayStatus { - RUNNING, FAILED, STOPPING, IDLE -} diff --git a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/TakeLeadershipAttemptTracker.java b/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/TakeLeadershipAttemptTracker.java deleted file mode 100644 index d6e728ba..00000000 --- a/eventuate-local-java-embedded-cdc/src/main/java/io/eventuate/local/cdc/debezium/TakeLeadershipAttemptTracker.java +++ /dev/null @@ -1,40 +0,0 @@ -package io.eventuate.local.cdc.debezium; - -import java.time.Duration; -import java.time.LocalDateTime; -import java.util.LinkedList; -import java.util.List; - -public class TakeLeadershipAttemptTracker { - private final int maxAttempts; - private final long retryPeriod; - private List lastAttemptTimes = new LinkedList<>(); - - public TakeLeadershipAttemptTracker(int maxAttempts, long retryPeriodInMilliseconds) { - this.maxAttempts = maxAttempts; - this.retryPeriod = retryPeriodInMilliseconds; - } - - public void attempting() { - lastAttemptTimes.add(now()); - lastAttemptTimes = lastAttemptTimes.subList(0, Math.min(lastAttemptTimes.size(), 30)); - } - - protected LocalDateTime now() { - return LocalDateTime.now(); - } - - boolean shouldAttempt() { - int attempts = 0; - LocalDateTime now = now(); - for (LocalDateTime attempt : lastAttemptTimes) { - long age = Duration.between(attempt, now).toMillis(); - if (age < retryPeriod) - attempts++; - else - break; - } - return attempts <= maxAttempts; - } - -} \ No newline at end of file diff --git a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/AbstractTopicRelayTest.java b/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/AbstractTopicRelayTest.java deleted file mode 100644 index 5fa25c23..00000000 --- a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/AbstractTopicRelayTest.java +++ /dev/null @@ -1,136 +0,0 @@ -package io.eventuate.local.cdc.debezium; - -import io.eventuate.Int128; -import io.eventuate.SubscriberOptions; -import io.eventuate.javaclient.commonimpl.AggregateCrud; -import io.eventuate.javaclient.commonimpl.EventTypeAndData; -import io.eventuate.javaclient.spring.jdbc.EventuateSchema; -import io.eventuate.local.java.jdbckafkastore.EventuateKafkaAggregateSubscriptions; -import io.eventuate.testutil.AsyncUtil; -import org.junit.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.jdbc.core.JdbcTemplate; - -import javax.sql.DataSource; -import java.util.*; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; - -public abstract class AbstractTopicRelayTest { - private Logger logger = LoggerFactory.getLogger(getClass()); - - @Autowired - private AggregateCrud eventuateJdbcEventStore; - - @Autowired - private EventuateKafkaAggregateSubscriptions eventuateKafkaAggregateSubscriptions; - - @Autowired - private EventTableChangesToAggregateTopicRelay eventTableChangesToAggregateTopicRelay; - - @Autowired - private DataSource dataSource; - - @Autowired - private EventuateSchema eventuateSchema; - - @Test - public void shouldCaptureAndPublishChange() throws ExecutionException, InterruptedException { - - String aggregateType = uniqueId("TestAggregate"); - String eventType = uniqueId("TestEvent"); - - List events = createTestEvent(eventType); - - long publishTime = System.currentTimeMillis(); - - Int128 expectedEventId = saveEvents(aggregateType, events); - - logger.debug("Looking for eventId {}", expectedEventId); - - subscribe(uniqueId("testSubscriber-" + getClass().getName()), aggregateType, eventType, expectedEventId) - .assertEventReceived(); - - long endTime = System.currentTimeMillis(); - - logger.debug("got the event I just published in msecs {}", endTime - publishTime); - } - - @Test - public void testEventTableCleanUp() throws ExecutionException, InterruptedException { - - String aggregateType = uniqueId("TestAggregate"); - String eventType = uniqueId("TestEvent"); - List event = createTestEvent(eventType); - - Int128 expectedEventId = saveEvents(aggregateType, event); - - subscribe(uniqueId("testSubscriber-" + getClass().getName()), aggregateType, eventType, expectedEventId) - .assertEventReceived(); - - deleteAllEvents(); - - expectedEventId = saveEvents(aggregateType, event); - - subscribe(uniqueId("testSubscriber-" + getClass().getName()), aggregateType, eventType, expectedEventId) - .assertEventReceived(); - } - - @Test - public void shouldStartup() throws InterruptedException { - TimeUnit.SECONDS.sleep(10); - } - - private List createTestEvent(String eventType) { - return Collections.singletonList(new EventTypeAndData(eventType, "{}", Optional.empty())); - } - - private Int128 saveEvents(String aggregateType, List events) { - return AsyncUtil.await(eventuateJdbcEventStore.save(aggregateType, events, Optional.empty())).getEntityVersion(); - } - - private String uniqueId(String prefix) { - return prefix + "-" + UUID.randomUUID(); - } - - private void deleteAllEvents() { - new JdbcTemplate(dataSource).update(String.format("delete from %s", eventuateSchema.qualifyTable("events"))); - } - - private TestSubscriber subscribe(String subscriber, String aggregateType, String eventType, Int128 expectedEventId) - throws InterruptedException, ExecutionException { - - TestSubscriber testSubscriber = new TestSubscriber(); - - eventuateKafkaAggregateSubscriptions.cleanUp(); - - eventuateKafkaAggregateSubscriptions.subscribe(subscriber, - Collections.singletonMap(aggregateType, Collections.singleton(eventType)), - SubscriberOptions.DEFAULTS, - se -> { - logger.debug("got se {}", se); - if (expectedEventId.equals(se.getId())) - testSubscriber.addReceivedEventId(se.getId()); - return CompletableFuture.completedFuture(null); - }).get(); - - return testSubscriber; - } - - private static class TestSubscriber { - private BlockingQueue eventIds = new LinkedBlockingDeque<>(); - - public void addReceivedEventId(Int128 id) { - eventIds.add(id); - } - - public void assertEventReceived() throws InterruptedException { - Assert.assertNotNull(eventIds.poll(30, TimeUnit.SECONDS)); - } - } -} diff --git a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/EventPollingDaoTest.java b/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/EventPollingDaoTest.java deleted file mode 100644 index 2961543f..00000000 --- a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/EventPollingDaoTest.java +++ /dev/null @@ -1,118 +0,0 @@ -package io.eventuate.local.cdc.debezium; - - -import io.eventuate.javaclient.spring.jdbc.EventuateSchema; -import io.eventuate.local.java.jdbckafkastore.EventuateLocalConfiguration; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; - -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Import; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.ActiveProfiles; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import java.util.stream.Collectors; - -@ActiveProfiles("EventuatePolling") -@RunWith(SpringJUnit4ClassRunner.class) -@SpringBootTest(classes = EventPollingDaoTest.EventPollingTestConfiguration.class) -@DirtiesContext -public class EventPollingDaoTest { - - @Autowired - private EventuateSchema eventuateSchema; - - @Autowired - private JdbcTemplate jdbcTemplate; - - @Autowired - private EventPollingDao eventPollingDao; - - @org.springframework.context.annotation.Configuration - @Import({EventuateLocalConfiguration.class, EventTableChangesToAggregateTopicRelayConfiguration.class}) - @EnableAutoConfiguration - public static class EventPollingTestConfiguration { - - @Bean - public EventuateSchema eventuateSchema(@Value("${eventuate.database.schema:#{null}}") String eventuateDatabaseSchem) { - return new EventuateSchema(eventuateDatabaseSchem); - } - } -// -// @Before -// public void init() throws Exception { -// eventTableChangesToAggregateTopicRelay.stopCapturingChanges(); -// } - - @Test - public void testFindAndPublish() throws Exception { - String idPrefix = createEvents(); - - eventPollingDao.setMaxEventsPerPolling(1000); - - List eventsToTest = new ArrayList<>(); - - List accumulator; - while (!(accumulator = eventPollingDao.findEventsToPublish()).isEmpty()) { - eventsToTest.addAll(accumulator.stream().filter(eventToPublish -> eventToPublish.getEventId().startsWith(idPrefix)).collect(Collectors.toList())); - eventPollingDao.markEventsAsPublished(accumulator.stream().map(EventToPublish::getEventId).collect(Collectors.toList())); - } - - Assert.assertEquals(2, eventsToTest.size()); - - EventToPublish event1 = eventsToTest.get(0); - - Assert.assertEquals(idPrefix + "_1", event1.getEventId()); - Assert.assertEquals("type1", event1.getEventType()); - Assert.assertEquals("data1", event1.getEventData()); - Assert.assertEquals("entityType1", event1.getEntityType()); - Assert.assertEquals("entityId1", event1.getEntityId()); - Assert.assertEquals("triggeringEvent1", event1.getTriggeringEvent()); - Assert.assertEquals("meta1", event1.getMetadata()); - - - EventToPublish event2 = eventsToTest.get(1); - - Assert.assertEquals(idPrefix + "_2", event2.getEventId()); - Assert.assertEquals("type2", event2.getEventType()); - Assert.assertEquals("data2", event2.getEventData()); - Assert.assertEquals("entityType2", event2.getEntityType()); - Assert.assertEquals("entityId2", event2.getEntityId()); - Assert.assertEquals("triggeringEvent2", event2.getTriggeringEvent()); - Assert.assertNull(event2.getMetadata()); - } - - @Test - public void testLimit() throws Exception { - createEvents(); - - eventPollingDao.setMaxEventsPerPolling(1); - - List eventsToPublish = eventPollingDao.findEventsToPublish(); - - Assert.assertEquals(1, eventsToPublish.size()); - } - - private String createEvents() throws Exception { - String idPrefix = UUID.randomUUID().toString(); - - String eventTable = eventuateSchema.qualifyTable("events"); - - jdbcTemplate.update(String.format("INSERT INTO %s VALUES (?, 'type1', 'data1', 'entityType1', 'entityId1', 'triggeringEvent1', 'meta1', 0)", eventTable), idPrefix + "_1"); - jdbcTemplate.update(String.format("INSERT INTO %s VALUES (?, 'type2', 'data2', 'entityType2', 'entityId2', 'triggeringEvent2', NULL, 0)", eventTable), idPrefix + "_2"); - jdbcTemplate.update(String.format("INSERT INTO %s VALUES (?, 'type3', 'data3', 'entityType3', 'entityId3', 'triggeringEvent3', 'meta3', 1)", eventTable), idPrefix + "_3"); - - return idPrefix; - } - -} diff --git a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/JdbcUrlParserTest.java b/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/JdbcUrlParserTest.java deleted file mode 100644 index b6afcecb..00000000 --- a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/JdbcUrlParserTest.java +++ /dev/null @@ -1,35 +0,0 @@ -package io.eventuate.local.cdc.debezium; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -public class JdbcUrlParserTest { - - @Test - public void shouldParseUrl() { - JdbcUrl jdbcUrl = JdbcUrlParser.parse("jdbc:mysql://192.168.99.101/eventuate"); - assertEquals("192.168.99.101", jdbcUrl.getHost()); - assertEquals(3306, jdbcUrl.getPort()); - assertEquals("eventuate", jdbcUrl.getDatabase()); - - } - - @Test - public void shouldParseUrlWithPort() { - JdbcUrl jdbcUrl = JdbcUrlParser.parse("jdbc:mysql://192.168.99.101:3306/eventuate"); - assertEquals("192.168.99.101", jdbcUrl.getHost()); - assertEquals(3306, jdbcUrl.getPort()); - assertEquals("eventuate", jdbcUrl.getDatabase()); - - } - - @Test - public void shouldParseUrlWithParameters() { - JdbcUrl jdbcUrl = JdbcUrlParser.parse("jdbc:mysql://192.168.99.101:3306/eventuate?useUnicode=true"); - assertEquals("192.168.99.101", jdbcUrl.getHost()); - assertEquals(3306, jdbcUrl.getPort()); - assertEquals("eventuate", jdbcUrl.getDatabase()); - } - -} \ No newline at end of file diff --git a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/LeadershipElectionTest.java b/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/LeadershipElectionTest.java deleted file mode 100644 index 4b2ca1e8..00000000 --- a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/LeadershipElectionTest.java +++ /dev/null @@ -1,116 +0,0 @@ -package io.eventuate.local.cdc.debezium; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.leader.LeaderSelector; -import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; -import org.apache.curator.framework.state.ConnectionState; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; - -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -import static org.junit.Assert.assertEquals; - -@RunWith(SpringJUnit4ClassRunner.class) -@SpringBootTest(classes = LeadershipElectionTest.LeadershipElectionTestConfiguration.class) -@DirtiesContext -public class LeadershipElectionTest { - - @Autowired - private EventuateLocalZookeperConfigurationProperties eventuateLocalZookeperConfigurationProperties; - - @Test - public void shouldElectLeader() throws InterruptedException { - CuratorFramework client1 = EventTableChangesToAggregateTopicRelayConfiguration.makeStartedCuratorClient(eventuateLocalZookeperConfigurationProperties.getConnectionString()); - CuratorFramework client2 = EventTableChangesToAggregateTopicRelayConfiguration.makeStartedCuratorClient(eventuateLocalZookeperConfigurationProperties.getConnectionString()); - - //String leaderPath = "/eventuatelocal/cdc/testleader"; - String leaderPath = "/foo"; - - MyLeaderSelectorListener myLeaderSelectorListener1 = new MyLeaderSelectorListener("1"); - LeaderSelector leaderSelector1 = new LeaderSelector(client1, leaderPath, myLeaderSelectorListener1); - leaderSelector1.start(); - - MyLeaderSelectorListener myLeaderSelectorListener2 = new MyLeaderSelectorListener("2"); - LeaderSelector leaderSelector2 = new LeaderSelector(client2, leaderPath, myLeaderSelectorListener2); - leaderSelector2.start(); - - TimeUnit.SECONDS.sleep(5); - assertEquals(1L, MyLeaderSelectorListener.leaderCounter.get()); - - } - - - @Configuration - static public class LeadershipElectionTestConfiguration { - @Bean - public EventuateLocalZookeperConfigurationProperties eventuateLocalZookeperConfigurationProperties() { - return new EventuateLocalZookeperConfigurationProperties(); - } - } -} - -class MyLeaderSelectorListener implements LeaderSelectorListener { - - AtomicBoolean leader = new AtomicBoolean(false); - static AtomicLong leaderCounter = new AtomicLong(0); - - private String label; - - public MyLeaderSelectorListener(String label) { - this.label = label; - } - - - @Override - public void takeLeadership(CuratorFramework client) throws Exception { - takeLeadership(); - } - - private void takeLeadership() { - System.out.println("take leadership: " + label); - leader.set(true); - leaderCounter.incrementAndGet(); - try { - TimeUnit.SECONDS.sleep(500); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - private void releaseLeadership() { - System.out.println("false leadership: " + label); - leader.set(false); - leaderCounter.decrementAndGet(); - } - - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) { - System.out.println("StateChanged: " + newState); - switch (newState) { - case SUSPENDED: - releaseLeadership(); - break; - - case RECONNECTED: - takeLeadership(); - break; - - case LOST: - releaseLeadership(); - break; - } - - } - -} - diff --git a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/MySqlBinLogBasedEventTableChangesToAggregateTopicRelayCustomEventuateDBTest.java b/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/MySqlBinLogBasedEventTableChangesToAggregateTopicRelayCustomEventuateDBTest.java deleted file mode 100644 index 6290bcda..00000000 --- a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/MySqlBinLogBasedEventTableChangesToAggregateTopicRelayCustomEventuateDBTest.java +++ /dev/null @@ -1,38 +0,0 @@ -package io.eventuate.local.cdc.debezium; - - -import io.eventuate.local.java.jdbckafkastore.EventuateLocalConfiguration; -import io.eventuate.local.testutil.CustomDBCreator; -import io.eventuate.local.testutil.CustomDBTestConfiguration; -import io.eventuate.local.testutil.SqlScriptEditor; -import org.junit.Before; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -@RunWith(SpringJUnit4ClassRunner.class) -@SpringBootTest(classes = MySqlBinLogBasedEventTableChangesToAggregateTopicRelayCustomEventuateDBTest.EventTableChangesToAggregateTopicRelayTestConfiguration.class) -@DirtiesContext -public class MySqlBinLogBasedEventTableChangesToAggregateTopicRelayCustomEventuateDBTest extends AbstractTopicRelayTest { - - @org.springframework.context.annotation.Configuration - @Import({CustomDBTestConfiguration.class, EventuateLocalConfiguration.class, EventTableChangesToAggregateTopicRelayConfiguration.class}) - @EnableAutoConfiguration - public static class EventTableChangesToAggregateTopicRelayTestConfiguration { - } - - @Autowired - private CustomDBCreator customDBCreator; - - @Autowired - private SqlScriptEditor eventuateLocalCustomDBSqlEditor; - - @Before - public void createCustomDB() { - customDBCreator.create(eventuateLocalCustomDBSqlEditor); - } -} diff --git a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/MySqlBinLogBasedEventTableChangesToAggregateTopicRelayEmptyEventuateDBTest.java b/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/MySqlBinLogBasedEventTableChangesToAggregateTopicRelayEmptyEventuateDBTest.java deleted file mode 100644 index f7759734..00000000 --- a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/MySqlBinLogBasedEventTableChangesToAggregateTopicRelayEmptyEventuateDBTest.java +++ /dev/null @@ -1,36 +0,0 @@ -package io.eventuate.local.cdc.debezium; - - -import io.eventuate.javaclient.spring.jdbc.EventuateSchema; -import io.eventuate.local.java.jdbckafkastore.EventuateLocalConfiguration; -import io.eventuate.local.testutil.EmptyDBTestConfiguration; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; - -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -@RunWith(SpringJUnit4ClassRunner.class) -@SpringBootTest(classes = MySqlBinLogBasedEventTableChangesToAggregateTopicRelayEmptyEventuateDBTest.EventTableChangesToAggregateTopicRelayTestConfiguration.class) -@DirtiesContext -public class MySqlBinLogBasedEventTableChangesToAggregateTopicRelayEmptyEventuateDBTest extends AbstractTopicRelayTest { - - @org.springframework.context.annotation.Configuration - @Import({EmptyDBTestConfiguration.class, EventuateLocalConfiguration.class, EventTableChangesToAggregateTopicRelayConfiguration.class}) - @EnableAutoConfiguration - public static class EventTableChangesToAggregateTopicRelayTestConfiguration { - } - - @Value("${eventuate.database.schema}") - private String eventuateDatabaseSchema; - - @Test - public void testProperty() { - Assert.assertEquals(EventuateSchema.EMPTY_SCHEMA, eventuateDatabaseSchema); - } -} diff --git a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/MySqlBinLogBasedEventTableChangesToAggregateTopicRelayTest.java b/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/MySqlBinLogBasedEventTableChangesToAggregateTopicRelayTest.java deleted file mode 100644 index 12bdaa91..00000000 --- a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/MySqlBinLogBasedEventTableChangesToAggregateTopicRelayTest.java +++ /dev/null @@ -1,24 +0,0 @@ -package io.eventuate.local.cdc.debezium; - - -import io.eventuate.local.java.jdbckafkastore.EventuateLocalConfiguration; -import org.junit.runner.RunWith; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; - -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -@RunWith(SpringJUnit4ClassRunner.class) -@SpringBootTest(classes = MySqlBinLogBasedEventTableChangesToAggregateTopicRelayTest.EventTableChangesToAggregateTopicRelayTestConfiguration.class) -@DirtiesContext -public class MySqlBinLogBasedEventTableChangesToAggregateTopicRelayTest extends AbstractTopicRelayTest { - - @org.springframework.context.annotation.Configuration - @Import({EventuateLocalConfiguration.class, EventTableChangesToAggregateTopicRelayConfiguration.class}) - @EnableAutoConfiguration - public static class EventTableChangesToAggregateTopicRelayTestConfiguration { - } - -} diff --git a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/PollingBasedEventTableChangesToAggregateTopicRelayCustomDBTest.java b/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/PollingBasedEventTableChangesToAggregateTopicRelayCustomDBTest.java deleted file mode 100644 index 5c76994b..00000000 --- a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/PollingBasedEventTableChangesToAggregateTopicRelayCustomDBTest.java +++ /dev/null @@ -1,70 +0,0 @@ -package io.eventuate.local.cdc.debezium; - - -import io.eventuate.local.java.jdbckafkastore.EventuateLocalConfiguration; -import io.eventuate.local.java.kafka.EventuateKafkaConfigurationProperties; -import io.eventuate.local.java.kafka.producer.EventuateKafkaProducerConfigurationProperties; -import io.eventuate.local.testutil.CustomDBCreator; -import io.eventuate.local.testutil.CustomDBTestConfiguration; -import io.eventuate.local.testutil.SqlScriptEditor; -import org.apache.curator.framework.CuratorFramework; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; - -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Import; -import org.springframework.context.annotation.Primary; -import org.springframework.context.annotation.Profile; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.ActiveProfiles; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -import java.util.Optional; - -@ActiveProfiles("EventuatePolling") -@RunWith(SpringJUnit4ClassRunner.class) -@SpringBootTest(classes = PollingBasedEventTableChangesToAggregateTopicRelayCustomDBTest.EventTableChangesToAggregateTopicRelayTestConfiguration.class) -@DirtiesContext -public class PollingBasedEventTableChangesToAggregateTopicRelayCustomDBTest extends AbstractTopicRelayTest { - - @org.springframework.context.annotation.Configuration - @Import({CustomDBTestConfiguration.class, EventuateLocalConfiguration.class, EventTableChangesToAggregateTopicRelayConfiguration.class}) - @EnableAutoConfiguration - @EnableConfigurationProperties(EventuateKafkaProducerConfigurationProperties.class) - public static class EventTableChangesToAggregateTopicRelayTestConfiguration { - - @Autowired - private CustomDBCreator customDBCreator; - - @Autowired - private SqlScriptEditor eventuateLocalCustomDBSqlEditor; - - @Bean - @Primary - @Profile("EventuatePolling") - public EventTableChangesToAggregateTopicRelay pollingCDC(EventPollingDao eventPollingDao, - EventTableChangesToAggregateTopicRelayConfigurationProperties eventTableChangesToAggregateTopicRelayConfigurationProperties, - EventuateKafkaConfigurationProperties eventuateKafkaConfigurationProperties, - CuratorFramework client, - CdcStartupValidator cdcStartupValidator, - EventuateKafkaProducerConfigurationProperties eventuateKafkaProducerConfigurationProperties) { - - customDBCreator.create(eventuateLocalCustomDBSqlEditor); - - return new PollingBasedEventTableChangesToAggregateTopicRelay(eventPollingDao, - eventTableChangesToAggregateTopicRelayConfigurationProperties.getPollingIntervalInMilliseconds(), - eventuateKafkaConfigurationProperties.getBootstrapServers(), - client, - cdcStartupValidator, - new TakeLeadershipAttemptTracker(eventTableChangesToAggregateTopicRelayConfigurationProperties.getMaxRetries(), - eventTableChangesToAggregateTopicRelayConfigurationProperties.getRetryPeriodInMilliseconds()), - eventTableChangesToAggregateTopicRelayConfigurationProperties.getLeadershipLockPath(), - eventuateKafkaProducerConfigurationProperties - ); - } - } - -} diff --git a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/PollingBasedEventTableChangesToAggregateTopicRelayEmptyDBTest.java b/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/PollingBasedEventTableChangesToAggregateTopicRelayEmptyDBTest.java deleted file mode 100644 index 8cf6c27d..00000000 --- a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/PollingBasedEventTableChangesToAggregateTopicRelayEmptyDBTest.java +++ /dev/null @@ -1,39 +0,0 @@ -package io.eventuate.local.cdc.debezium; - - -import io.eventuate.javaclient.spring.jdbc.EventuateSchema; -import io.eventuate.local.java.jdbckafkastore.EventuateLocalConfiguration; -import io.eventuate.local.testutil.EmptyDBTestConfiguration; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; - -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.ActiveProfiles; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -@ActiveProfiles("EventuatePolling") -@RunWith(SpringJUnit4ClassRunner.class) -@SpringBootTest(classes = PollingBasedEventTableChangesToAggregateTopicRelayEmptyDBTest.EventTableChangesToAggregateTopicRelayTestConfiguration.class) -@DirtiesContext -public class PollingBasedEventTableChangesToAggregateTopicRelayEmptyDBTest extends AbstractTopicRelayTest { - - @org.springframework.context.annotation.Configuration - @Import({EmptyDBTestConfiguration.class, EventuateLocalConfiguration.class, EventTableChangesToAggregateTopicRelayConfiguration.class}) - @EnableAutoConfiguration - public static class EventTableChangesToAggregateTopicRelayTestConfiguration { - } - - - @Value("${eventuate.database.schema}") - private String eventuateDatabaseSchema; - - @Test - public void testProperty() { - Assert.assertEquals(EventuateSchema.EMPTY_SCHEMA, eventuateDatabaseSchema); - } -} diff --git a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/PollingBasedEventTableChangesToAggregateTopicRelayTest.java b/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/PollingBasedEventTableChangesToAggregateTopicRelayTest.java deleted file mode 100644 index 11606f1e..00000000 --- a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/PollingBasedEventTableChangesToAggregateTopicRelayTest.java +++ /dev/null @@ -1,26 +0,0 @@ -package io.eventuate.local.cdc.debezium; - - -import io.eventuate.local.java.jdbckafkastore.EventuateLocalConfiguration; -import org.junit.runner.RunWith; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; - -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.ActiveProfiles; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -@ActiveProfiles("EventuatePolling") -@RunWith(SpringJUnit4ClassRunner.class) -@SpringBootTest(classes = PollingBasedEventTableChangesToAggregateTopicRelayTest.EventTableChangesToAggregateTopicRelayTestConfiguration.class) -@DirtiesContext -public class PollingBasedEventTableChangesToAggregateTopicRelayTest extends AbstractTopicRelayTest { - - @org.springframework.context.annotation.Configuration - @Import({EventuateLocalConfiguration.class, EventTableChangesToAggregateTopicRelayConfiguration.class}) - @EnableAutoConfiguration - public static class EventTableChangesToAggregateTopicRelayTestConfiguration { - } - -} diff --git a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/PrepareMigrationToNewCdcTest.java b/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/PrepareMigrationToNewCdcTest.java deleted file mode 100644 index 62e052ea..00000000 --- a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/PrepareMigrationToNewCdcTest.java +++ /dev/null @@ -1,73 +0,0 @@ -package io.eventuate.local.cdc.debezium; - -import io.eventuate.Int128; -import io.eventuate.SubscriberOptions; -import io.eventuate.javaclient.commonimpl.AggregateCrud; -import io.eventuate.javaclient.commonimpl.EntityIdVersionAndEventIds; -import io.eventuate.javaclient.commonimpl.EventTypeAndData; -import io.eventuate.local.java.jdbckafkastore.EventuateKafkaAggregateSubscriptions; -import io.eventuate.local.java.jdbckafkastore.EventuateLocalConfiguration; -import io.eventuate.testutil.AsyncUtil; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; - -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.*; - -@RunWith(SpringJUnit4ClassRunner.class) -@SpringBootTest(classes = PrepareMigrationToNewCdcTest.EventTableChangesToAggregateTopicRelayTestConfiguration.class) -@DirtiesContext -public class PrepareMigrationToNewCdcTest { - - @org.springframework.context.annotation.Configuration - @Import({EventuateLocalConfiguration.class, EventTableChangesToAggregateTopicRelayConfiguration.class}) - @EnableAutoConfiguration - public static class EventTableChangesToAggregateTopicRelayTestConfiguration { - } - - private Logger logger = LoggerFactory.getLogger(getClass()); - - @Autowired - private AggregateCrud eventuateJdbcEventStore; - - @Autowired - private EventuateKafkaAggregateSubscriptions eventuateKafkaAggregateSubscriptions; - - @Test - public void shouldCaptureAndPublishChange() throws ExecutionException, InterruptedException { - - String aggregateType = "TestAggregate_MIGRATION"; - String eventType = "TestEvent_MIGRATION"; - - List myEvents = Collections.singletonList(new EventTypeAndData(eventType, "{}", Optional.empty())); - - EntityIdVersionAndEventIds ewidv = AsyncUtil.await(eventuateJdbcEventStore.save(aggregateType, myEvents, Optional.empty())); - - Int128 expectedEventId = ewidv.getEntityVersion(); - BlockingQueue result = new LinkedBlockingDeque<>(); - - eventuateKafkaAggregateSubscriptions.subscribe("testSubscriber", - Collections.singletonMap(aggregateType, Collections.singleton(eventType)), - SubscriberOptions.DEFAULTS, - se -> { - logger.debug("got se {}", se); - if (se.getId().equals(expectedEventId)) - result.add(se.getId()); - return CompletableFuture.completedFuture(null); - }).get(); - - Assert.assertNotNull(result.poll(30, TimeUnit.SECONDS)); - } -} diff --git a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/TableWithDashInNameRelayTest.java b/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/TableWithDashInNameRelayTest.java deleted file mode 100644 index 6ed5fb13..00000000 --- a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/TableWithDashInNameRelayTest.java +++ /dev/null @@ -1,97 +0,0 @@ -package io.eventuate.local.cdc.debezium; - - -import io.eventuate.javaclient.spring.jdbc.EventuateSchema; -import io.eventuate.local.java.jdbckafkastore.EventuateLocalConfiguration; -import io.eventuate.local.java.kafka.EventuateKafkaConfigurationProperties; -import io.eventuate.local.java.kafka.producer.EventuateKafkaProducerConfigurationProperties; -import org.apache.curator.framework.CuratorFramework; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; - -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.core.io.ClassPathResource; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.jdbc.datasource.DriverManagerDataSource; -import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -import javax.sql.DataSource; - -@RunWith(SpringJUnit4ClassRunner.class) -@SpringBootTest(classes = TableWithDashInNameRelayTest.TableWithDashInNameRelayTestConfiguration.class) -@DirtiesContext -public class TableWithDashInNameRelayTest extends AbstractTopicRelayTest { - - @org.springframework.context.annotation.Configuration - @Import({EventuateLocalConfiguration.class}) - @EnableAutoConfiguration - @EnableConfigurationProperties(EventuateKafkaProducerConfigurationProperties.class) - public static class TableWithDashInNameRelayTestConfiguration extends EventTableChangesToAggregateTopicRelayConfiguration { - - @Autowired - private JdbcTemplate jdbcTemplate; - - @Autowired - private EventTableChangesToAggregateTopicRelayConfigurationProperties relayConfigProps; - - @Value("${spring.datasource.url}") - private String dataSourceURL; - - private DataSource makeDataSource() { - JdbcUrl jdbcUrl = JdbcUrlParser.parse(dataSourceURL); - DriverManagerDataSource dataSource = new DriverManagerDataSource(); - dataSource.setDriverClassName(com.mysql.jdbc.Driver.class.getName()); - dataSource.setUrl(dataSourceURL); - dataSource.setUsername(relayConfigProps.getDbUserName()); - dataSource.setPassword(relayConfigProps.getDbPassword()); - return dataSource; - } - - @Override - public EventTableChangesToAggregateTopicRelay embeddedDebeziumCDC(EventuateSchema eventuateSchema, - @Value("${spring.datasource.url}") String dataSourceURL, - EventTableChangesToAggregateTopicRelayConfigurationProperties eventTableChangesToAggregateTopicRelayConfigurationProperties, - EventuateKafkaConfigurationProperties eventuateKafkaConfigurationProperties, - CuratorFramework client, - CdcStartupValidator cdcStartupValidator, - EventuateKafkaProducerConfigurationProperties eventuateKafkaProducerConfigurationProperties) { - - ResourceDatabasePopulator rdp = new ResourceDatabasePopulator(new ClassPathResource("/cdc-test-schema.sql")); - rdp.execute(makeDataSource()); - - return super.embeddedDebeziumCDC(eventuateSchema, - dataSourceURL, - eventTableChangesToAggregateTopicRelayConfigurationProperties, - eventuateKafkaConfigurationProperties, - client, - cdcStartupValidator, - eventuateKafkaProducerConfigurationProperties); - } - - @Override - public EventTableChangesToAggregateTopicRelay pollingCDC(EventPollingDao eventPollingDao, - EventTableChangesToAggregateTopicRelayConfigurationProperties eventTableChangesToAggregateTopicRelayConfigurationProperties, - EventuateKafkaConfigurationProperties eventuateKafkaConfigurationProperties, - CuratorFramework client, - CdcStartupValidator cdcStartupValidator, - EventuateKafkaProducerConfigurationProperties eventuateKafkaProducerConfigurationProperties) { - - ResourceDatabasePopulator rdp = new ResourceDatabasePopulator(new ClassPathResource("/cdc-test-schema.sql")); - rdp.execute(makeDataSource()); - - return super.pollingCDC(eventPollingDao, - eventTableChangesToAggregateTopicRelayConfigurationProperties, - eventuateKafkaConfigurationProperties, - client, - cdcStartupValidator, - eventuateKafkaProducerConfigurationProperties); - } - } - -} diff --git a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/TakeLeadershipAttemptTrackerTest.java b/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/TakeLeadershipAttemptTrackerTest.java deleted file mode 100644 index ca559e14..00000000 --- a/eventuate-local-java-embedded-cdc/src/test/java/io/eventuate/local/cdc/debezium/TakeLeadershipAttemptTrackerTest.java +++ /dev/null @@ -1,73 +0,0 @@ -package io.eventuate.local.cdc.debezium; - -import org.junit.Test; - -import java.time.LocalDateTime; -import java.util.Arrays; -import java.util.List; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -public class TakeLeadershipAttemptTrackerTest { - - class MyTakeLeadershipAttemptTracker extends TakeLeadershipAttemptTracker { - - private List times; - - public MyTakeLeadershipAttemptTracker(int maxAttempts, long retryPeriodInMilliseconds, List times) { - super(maxAttempts, retryPeriodInMilliseconds); - this.times = times; - } - - @Override - protected LocalDateTime now() { - LocalDateTime x = times.get(0); - times = times.subList(1, times.size()); - return x; - } - } - - @Test - public void shouldAllowRetryAfterLongtime() { - List times = Arrays.asList(LocalDateTime.now(), LocalDateTime.now().plusSeconds(120)); - MyTakeLeadershipAttemptTracker t = new MyTakeLeadershipAttemptTracker(3, 60 * 1000, times); - - t.attempting(); - assertTrue(t.shouldAttempt()); - } - - @Test - public void shouldRejectTooManyRetries() { - LocalDateTime n = LocalDateTime.now(); - - List times = - Arrays.asList(n, - n.plusSeconds(1), - n.plusSeconds(2), - n.plusSeconds(3), - n.plusSeconds(4), - n.plusSeconds(5), - n.plusSeconds(6), - n.plusSeconds(7), - n.plusSeconds(8), - n.plusSeconds(9) - ); - - MyTakeLeadershipAttemptTracker t = new MyTakeLeadershipAttemptTracker(3, 60 * 1000, times); - - t.attempting(); - assertTrue(t.shouldAttempt()); - - t.attempting(); - assertTrue(t.shouldAttempt()); - - t.attempting(); - assertTrue(t.shouldAttempt()); - - t.attempting(); - assertFalse(t.shouldAttempt()); - - } - -} \ No newline at end of file diff --git a/eventuate-local-java-embedded-cdc/src/test/resources/cdc-test-schema.sql b/eventuate-local-java-embedded-cdc/src/test/resources/cdc-test-schema.sql deleted file mode 100644 index 3ec8c53f..00000000 --- a/eventuate-local-java-embedded-cdc/src/test/resources/cdc-test-schema.sql +++ /dev/null @@ -1,10 +0,0 @@ -/** - * Created by cer on 6/21/17. - */ - -drop database if exists `eventuate-database-with-dash`; -create database `eventuate-database-with-dash`; -drop table if exists `eventuate-database-with-dash`.`foobar`; -create table `eventuate-database-with-dash`.`foobar` (bar tinyint primary key); -drop table if exists `eventuate`.`eventuate-test-table-with-dash`; -create table `eventuate`.`eventuate-test-table-with-dash` (bar tinyint primary key); diff --git a/eventuate-local-java-embedded-cdc/src/test/resources/logback.xml b/eventuate-local-java-embedded-cdc/src/test/resources/logback.xml deleted file mode 100644 index aca4ecc3..00000000 --- a/eventuate-local-java-embedded-cdc/src/test/resources/logback.xml +++ /dev/null @@ -1,24 +0,0 @@ - - - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n%xEx - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/eventuate-local-java-jdbc-tests/build.gradle b/eventuate-local-java-jdbc-tests/build.gradle index 69eb866e..4db286a7 100644 --- a/eventuate-local-java-jdbc-tests/build.gradle +++ b/eventuate-local-java-jdbc-tests/build.gradle @@ -9,7 +9,7 @@ dependencies { testCompile project(":eventuate-local-java-jdbc-autoconfigure") if (!project.hasProperty("excludeCdcLibs") || !excludeCdcLibs.toBoolean()) { - testCompile project(":eventuate-local-java-embedded-cdc-autoconfigure") + testCompile project(":new-cdc:eventuate-local-java-cdc-autoconfigure") } testCompile "junit:junit:4.11" diff --git a/eventuate-local-java-migration/build.gradle b/eventuate-local-java-migration/build.gradle new file mode 100644 index 00000000..f60d373e --- /dev/null +++ b/eventuate-local-java-migration/build.gradle @@ -0,0 +1,21 @@ +apply plugin: PrivateModulePlugin + +dependencies { + compile 'commons-lang:commons-lang:2.6' + + compile ("io.debezium:debezium-embedded:$debeziumVersion") { + exclude group: "org.glassfish.jersey.containers", module: "jersey-container-servlet" + exclude group: "org.eclipse.jetty", module: "jetty-servlets" + exclude group: "org.eclipse.jetty", module: "jetty-server" + exclude group: "org.slf4j", module: "slf4j-log4j12" + } + + compile project(":new-cdc:eventuate-local-java-cdc-connector-mysql-binlog") + + testCompile "junit:junit:4.11" + testCompile "org.springframework.boot:spring-boot-starter-test:$springBootVersion" +} + +test { + forkEvery 1 +} \ No newline at end of file diff --git a/eventuate-local-java-migration/src/test/java/io/eventuate/local/cdc/debezium/migration/MigrationToNewCdcTest.java b/eventuate-local-java-migration/src/test/java/io/eventuate/local/cdc/debezium/migration/MigrationToNewCdcTest.java new file mode 100644 index 00000000..4834bd83 --- /dev/null +++ b/eventuate-local-java-migration/src/test/java/io/eventuate/local/cdc/debezium/migration/MigrationToNewCdcTest.java @@ -0,0 +1,142 @@ +package io.eventuate.local.cdc.debezium.migration; + +import com.google.common.collect.ImmutableMap; +import io.debezium.config.Configuration; +import io.debezium.embedded.EmbeddedEngine; +import io.eventuate.local.common.BinlogFileOffset; +import io.eventuate.local.java.kafka.EventuateKafkaConfigurationProperties; +import io.eventuate.local.java.kafka.consumer.EventuateKafkaConsumerConfigurationProperties; +import io.eventuate.local.mysql.binlog.DebeziumBinlogOffsetKafkaStore; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.KafkaOffsetBackingStore; +import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import java.lang.reflect.Field; +import java.util.concurrent.ExecutionException; + +@RunWith(SpringJUnit4ClassRunner.class) +@SpringBootTest(classes = MigrationToNewCdcTest.EventTableChangesToAggregateTopicRelayTestConfiguration.class) +@DirtiesContext +public class MigrationToNewCdcTest { + + @org.springframework.context.annotation.Configuration + @EnableAutoConfiguration + public static class EventTableChangesToAggregateTopicRelayTestConfiguration { + + @Bean + public EventuateKafkaConfigurationProperties eventuateKafkaConfigurationProperties() { + return new EventuateKafkaConfigurationProperties(); + } + } + + private final String connectorName = "my-sql-connector"; + private final String file = "binlog.000003"; + private final long offset = 10000; + + @Autowired + private EventuateKafkaConfigurationProperties eventuateKafkaConfigurationProperties; + + @Test + public void test() throws NoSuchFieldException, IllegalAccessException, InterruptedException, ExecutionException { + saveTestOffset(); + + BinlogFileOffset debeziumBinlogFileOffset = getDebeziumOffset(); + + Assert.assertEquals(file, debeziumBinlogFileOffset.getBinlogFilename()); + Assert.assertEquals(offset, debeziumBinlogFileOffset.getOffset()); + } + + private BinlogFileOffset getDebeziumOffset() { + DebeziumBinlogOffsetKafkaStore debeziumBinlogOffsetKafkaStore = new DebeziumBinlogOffsetKafkaStore("eventuate.local.cdc." + connectorName + ".offset.storage", + eventuateKafkaConfigurationProperties, + EventuateKafkaConsumerConfigurationProperties.empty()); + + return debeziumBinlogOffsetKafkaStore.getLastBinlogFileOffset().get(); + } + + private void saveTestOffset() throws NoSuchFieldException, IllegalAccessException, InterruptedException, ExecutionException { + + EmbeddedEngine embeddedEngine = createEmbeddedEngine(); + + WorkerConfig workerConfig = getWorkerConfig(embeddedEngine); + + KafkaOffsetBackingStore kafkaOffsetBackingStore = createKafkaOffsetBackingStore(workerConfig); + + Converter keyConverter = getKeyConverter(embeddedEngine); + Converter valueConverter = getValueConverter(embeddedEngine); + + OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(kafkaOffsetBackingStore, connectorName, keyConverter, valueConverter); + + offsetStorageWriter.offset(ImmutableMap.of("server", "my-app-connector"), ImmutableMap.of("file", file, "pos", offset)); + offsetStorageWriter.beginFlush(); + offsetStorageWriter.doFlush((error, result) -> {}).get(); + } + + private Converter getKeyConverter(EmbeddedEngine embeddedEngine) throws NoSuchFieldException, IllegalAccessException { + Field keyConverterField = embeddedEngine.getClass().getDeclaredField("keyConverter"); + keyConverterField.setAccessible(true); + return (Converter)keyConverterField.get(embeddedEngine); + } + + private Converter getValueConverter(EmbeddedEngine embeddedEngine) throws NoSuchFieldException, IllegalAccessException { + Field valueConverterField = embeddedEngine.getClass().getDeclaredField("valueConverter"); + valueConverterField.setAccessible(true); + return (Converter)valueConverterField.get(embeddedEngine); + } + + private KafkaOffsetBackingStore createKafkaOffsetBackingStore(WorkerConfig workerConfig) { + KafkaOffsetBackingStore kafkaOffsetBackingStore = new KafkaOffsetBackingStore(); + kafkaOffsetBackingStore.configure(workerConfig); + kafkaOffsetBackingStore.start(); + return kafkaOffsetBackingStore; + } + + private WorkerConfig getWorkerConfig(EmbeddedEngine embeddedEngine) throws NoSuchFieldException, IllegalAccessException { + Field workerConfigField = embeddedEngine.getClass().getDeclaredField("workerConfig"); + workerConfigField.setAccessible(true); + + return (WorkerConfig)workerConfigField.get(embeddedEngine); + } + + private EmbeddedEngine createEmbeddedEngine() { + Configuration configuration = createConfig(); + + return EmbeddedEngine + .create() + .using((success, message, throwable) -> {}) + .notifying(sourceRecord -> {}) + .using(configuration) + .build(); + } + + private Configuration createConfig() { + return Configuration.create() + .with("connector.class", + "io.debezium.connector.mysql.MySqlConnector") + .with("offset.storage", KafkaOffsetBackingStore.class.getName()) + .with("bootstrap.servers", eventuateKafkaConfigurationProperties.getBootstrapServers()) + .with("offset.storage.topic", "eventuate.local.cdc." + connectorName + ".offset.storage") + .with("poll.interval.ms", 50) + .with("offset.flush.interval.ms", 6000) + .with("name", connectorName) + .with("database.server.id", 85744) + .with("database.server.name", "my-app-connector") + .with("database.history", + io.debezium.relational.history.KafkaDatabaseHistory.class.getName()) + .with("database.history.kafka.topic", + "eventuate.local.cdc." + connectorName + ".history.kafka.topic") + .with("database.history.kafka.bootstrap.servers", + eventuateKafkaConfigurationProperties.getBootstrapServers()) + .build(); + } +} diff --git a/new-cdc/eventuate-local-java-cdc-connector-mysql-binlog/build.gradle b/new-cdc/eventuate-local-java-cdc-connector-mysql-binlog/build.gradle index f1ebf9f0..978941ca 100644 --- a/new-cdc/eventuate-local-java-cdc-connector-mysql-binlog/build.gradle +++ b/new-cdc/eventuate-local-java-cdc-connector-mysql-binlog/build.gradle @@ -10,12 +10,6 @@ dependencies { testCompile "org.springframework.boot:spring-boot-starter-test:$springBootVersion" } -test { - if (!project.hasProperty("testCdcMigration")) { - exclude '**/MySQLMigrationTest**' - } -} - test { forkEvery 1 } \ No newline at end of file diff --git a/new-cdc/eventuate-local-java-cdc-connector-mysql-binlog/src/test/java/io/eventuate/local/mysql/binlog/MySQLMigrationTest.java b/new-cdc/eventuate-local-java-cdc-connector-mysql-binlog/src/test/java/io/eventuate/local/mysql/binlog/MySQLMigrationTest.java deleted file mode 100644 index 6453e292..00000000 --- a/new-cdc/eventuate-local-java-cdc-connector-mysql-binlog/src/test/java/io/eventuate/local/mysql/binlog/MySQLMigrationTest.java +++ /dev/null @@ -1,71 +0,0 @@ -package io.eventuate.local.mysql.binlog; - -import io.eventuate.javaclient.commonimpl.EntityIdVersionAndEventIds; -import io.eventuate.javaclient.commonimpl.EventTypeAndData; -import io.eventuate.local.common.CdcProcessor; -import io.eventuate.local.common.PublishedEvent; -import io.eventuate.local.db.log.common.OffsetStore; -import io.eventuate.local.test.util.AbstractCdcTest; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; - -@RunWith(SpringJUnit4ClassRunner.class) -@SpringBootTest(classes = MySqlBinlogCdcIntegrationTestConfiguration.class) -public class MySQLMigrationTest extends AbstractCdcTest { - - @Value("${spring.datasource.url}") - private String dataSourceURL; - - @Autowired - private OffsetStore offsetStore; - - @Autowired - private DebeziumBinlogOffsetKafkaStore debeziumBinlogOffsetKafkaStore; - - @Autowired - private MySqlBinaryLogClient mySqlBinaryLogClient; - - @Test - public void test() throws Exception { - - final String event = "TestEvent_MIGRATION"; - - BlockingQueue publishedEvents = new LinkedBlockingDeque<>(); - CdcProcessor cdcProcessor = createMySQLCdcProcessor(); - cdcProcessor.start(publishedEvent -> { - publishedEvents.add(publishedEvent); - offsetStore.save(publishedEvent.getBinlogFileOffset()); - }); - - List events = Collections.singletonList(new EventTypeAndData(event, "{}", Optional.empty())); - EntityIdVersionAndEventIds entityIdVersionAndEventIds = localAggregateCrud.save("TestAggregate_MIGRATION", events, Optional.empty()); - - PublishedEvent publishedEvent; - - while((publishedEvent = publishedEvents.poll(10, TimeUnit.SECONDS)) != null) { - if (event.equals(publishedEvent.getEventType())) { - break; - } - } - - Assert.assertNotNull(publishedEvent); - Assert.assertEquals(event, publishedEvent.getEventType()); - Assert.assertEquals(entityIdVersionAndEventIds.getEntityVersion().asString(), publishedEvent.getId()); - } - - private CdcProcessor createMySQLCdcProcessor() { - return new MySQLCdcProcessor<>(mySqlBinaryLogClient, offsetStore, debeziumBinlogOffsetKafkaStore); - } -} diff --git a/scripts/_build-and-test-all-new-cdc-mysql.sh b/scripts/_build-and-test-all-new-cdc-mysql.sh index e08eff75..d38d518a 100755 --- a/scripts/_build-and-test-all-new-cdc-mysql.sh +++ b/scripts/_build-and-test-all-new-cdc-mysql.sh @@ -33,8 +33,6 @@ $DOCKER_COMPOSE up -d echo waiting for MySQL ./scripts/wait-for-mysql.sh -./scripts/mysql-cli.sh -i < eventuate-local-java-embedded-cdc/src/test/resources/cdc-test-schema.sql - ./scripts/wait-for-services.sh $DOCKER_HOST_IP 8099 ./gradlew $GRADLE_OPTIONS :eventuate-local-java-jdbc-tests:test diff --git a/scripts/build-and-test-all-cdc-mysql.sh b/scripts/build-and-test-all-cdc-mysql.sh deleted file mode 100755 index 24c91476..00000000 --- a/scripts/build-and-test-all-cdc-mysql.sh +++ /dev/null @@ -1,55 +0,0 @@ -#!/bin/bash - -set -e - -if [ -z "$DOCKER_COMPOSE" ]; then - echo setting DOCKER_COMPOSE - export DOCKER_COMPOSE="docker-compose -f docker-compose-mysql.yml -f docker-compose-cdc-mysql.yml" -else - echo using existing DOCKER_COMPOSE = $DOCKER_COMPOSE -fi - -export GRADLE_OPTIONS="-P excludeCdcLibs=true" - -./gradlew $GRADLE_OPTIONS $* :eventuate-local-java-cdc-service:clean :eventuate-local-java-cdc-service:assemble - -. ./scripts/set-env-mysql.sh - -$DOCKER_COMPOSE stop -$DOCKER_COMPOSE rm --force -v - -$DOCKER_COMPOSE build -$DOCKER_COMPOSE up -d mysql -$DOCKER_COMPOSE up -d - - -./gradlew $GRADLE_OPTIONS :eventuate-local-java-jdbc-tests:cleanTest - -# wait for MySQL - -echo waiting for MySQL - -./scripts/wait-for-mysql.sh - -./scripts/mysql-cli.sh -i < eventuate-local-java-embedded-cdc/src/test/resources/cdc-test-schema.sql - -./scripts/wait-for-services.sh $DOCKER_HOST_IP 8099 - -./gradlew $GRADLE_OPTIONS :eventuate-local-java-jdbc-tests:test - -# Assert healthcheck good - -echo testing restart MySQL restart scenario $(date) - -$DOCKER_COMPOSE stop mysql - -sleep 10 - -$DOCKER_COMPOSE start mysql - -./scripts/wait-for-mysql.sh - -./gradlew $GRADLE_OPTIONS :eventuate-local-java-jdbc-tests:cleanTest :eventuate-local-java-jdbc-tests:test - -$DOCKER_COMPOSE stop -$DOCKER_COMPOSE rm --force -v diff --git a/scripts/build-and-test-all-cdc-postgres-polling.sh b/scripts/build-and-test-all-cdc-postgres-polling.sh deleted file mode 100755 index ab7f30f8..00000000 --- a/scripts/build-and-test-all-cdc-postgres-polling.sh +++ /dev/null @@ -1,52 +0,0 @@ -#!/bin/bash - -set -e - -if [ -z "$DOCKER_COMPOSE" ]; then - echo setting DOCKER_COMPOSE - export DOCKER_COMPOSE="docker-compose -f docker-compose-postgres-polling.yml -f docker-compose-cdc-postgres-polling.yml" -else - echo using existing DOCKER_COMPOSE = $DOCKER_COMPOSE -fi - -export GRADLE_OPTIONS="-P excludeCdcLibs=true" - -./gradlew $GRADLE_OPTIONS $* :eventuate-local-java-cdc-service:clean :eventuate-local-java-cdc-service:assemble - -. ./scripts/set-env-postgres-polling.sh - -$DOCKER_COMPOSE stop -$DOCKER_COMPOSE rm --force -v - -$DOCKER_COMPOSE build -$DOCKER_COMPOSE up -d postgres -$DOCKER_COMPOSE up -d - - -./gradlew $GRADLE_OPTIONS :eventuate-local-java-jdbc-tests:cleanTest - -# wait for Postgres - -echo waiting for Postgres - -./scripts/wait-for-postgres.sh -./scripts/wait-for-services.sh $DOCKER_HOST_IP 8099 - -./gradlew $GRADLE_OPTIONS :eventuate-local-java-jdbc-tests:test - -# Assert healthcheck good - -echo testing restart Postgres restart scenario $(date) - -$DOCKER_COMPOSE stop postgres - -sleep 10 - -$DOCKER_COMPOSE start postgres - -./scripts/wait-for-postgres.sh - -./gradlew $GRADLE_OPTIONS :eventuate-local-java-jdbc-tests:cleanTest :eventuate-local-java-jdbc-tests:test - -$DOCKER_COMPOSE stop -$DOCKER_COMPOSE rm --force -v diff --git a/scripts/build-and-test-all-new-cdc-mysql-migration.sh b/scripts/build-and-test-all-new-cdc-mysql-migration.sh deleted file mode 100755 index 391e1297..00000000 --- a/scripts/build-and-test-all-new-cdc-mysql-migration.sh +++ /dev/null @@ -1,24 +0,0 @@ -#! /bin/bash - -export TERM=dumb - -set -e - -. ./scripts/set-env-mysql.sh - -./gradlew clean - -docker-compose -f docker-compose-mysql.yml stop -docker-compose -f docker-compose-mysql.yml rm --force -v - -docker-compose -f docker-compose-mysql.yml build -docker-compose -f docker-compose-mysql.yml up -d - -./scripts/wait-for-mysql.sh - -./gradlew eventuate-local-java-embedded-cdc:test --tests "io.eventuate.local.cdc.debezium.PrepareMigrationToNewCdcTest" -./gradlew new-cdc:eventuate-local-java-cdc-connector-mysql-binlog:test --tests "io.eventuate.local.mysql.binlog.MySQLMigrationTest" - - -docker-compose -f docker-compose-mysql.yml stop -docker-compose -f docker-compose-mysql.yml rm --force -v diff --git a/scripts/build-and-test-everything.sh b/scripts/build-and-test-everything.sh index 783926e0..a25e1990 100755 --- a/scripts/build-and-test-everything.sh +++ b/scripts/build-and-test-everything.sh @@ -2,7 +2,7 @@ set -o pipefail -SCRIPTS="./scripts/build-and-test-mysql.sh ./scripts/build-and-test-mariadb.sh ./scripts/build-and-test-postgres-wal.sh ./scripts/build-and-test-all-cdc-mysql.sh ./scripts/build-and-test-all-new-cdc-mysql.sh ./scripts/build-and-test-all-new-cdc-mariadb.sh ./scripts/build-and-test-all-new-cdc-postgres-polling.sh ./scripts/build-and-test-all-new-cdc-postgres-wal.sh" +SCRIPTS="./scripts/build-and-test-mysql.sh ./scripts/build-and-test-mariadb.sh ./scripts/build-and-test-postgres-wal.sh ./scripts/build-and-test-all-new-cdc-mysql.sh ./scripts/build-and-test-all-new-cdc-mariadb.sh ./scripts/build-and-test-all-new-cdc-postgres-polling.sh ./scripts/build-and-test-all-new-cdc-postgres-wal.sh" date > build-and-test-everything.log diff --git a/scripts/build-and-test-mysql.sh b/scripts/build-and-test-mysql.sh index 32da81aa..efa702ce 100755 --- a/scripts/build-and-test-mysql.sh +++ b/scripts/build-and-test-mysql.sh @@ -2,4 +2,4 @@ export database=mysql -./scripts/_build-and-test-mysql.sh -P testCdcMigration=true :eventuate-local-java-embedded-cdc:cleanTest :eventuate-local-java-embedded-cdc:test build +./scripts/_build-and-test-mysql.sh build diff --git a/settings.gradle b/settings.gradle index 6b1d507f..07cce8e9 100644 --- a/settings.gradle +++ b/settings.gradle @@ -3,11 +3,9 @@ include 'eventuate-local-java-test-util' include 'eventuate-local-java-jdbc' include 'eventuate-local-java-jdbc-autoconfigure' include 'eventuate-local-java-jdbc-tests' -include 'eventuate-local-java-embedded-cdc' -include 'eventuate-local-java-embedded-cdc-autoconfigure' -include 'eventuate-local-java-cdc-service' include 'eventuate-local-java-common-broker' include 'eventuate-local-java-kafka' +include 'eventuate-local-java-migration' include ':new-cdc:eventuate-local-java-cdc-autoconfigure' include ':new-cdc:eventuate-local-java-cdc-connector-common'