Skip to content

Commit

Permalink
Add shared-kafka-db sample
Browse files Browse the repository at this point in the history
  • Loading branch information
Dave Syer committed Feb 5, 2020
1 parent afc0f73 commit 88bc07b
Show file tree
Hide file tree
Showing 15 changed files with 628 additions and 1 deletion.
Expand Up @@ -17,7 +17,7 @@ public class FailureSimulator {
private KafkaTemplate<Object, String> kafkaTemplate;

@Autowired
public void setJmsTemplate(KafkaTemplate<Object, String> kafkaTemplate) {
public void setKafkaTemplate(KafkaTemplate<Object, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

Expand Down
1 change: 1 addition & 0 deletions pom.xml
Expand Up @@ -16,6 +16,7 @@

<modules>
<module>shared-jms-db</module>
<module>shared-kafka-db</module>
<module>atomikos-db</module>
<module>best-jms-db</module>
<module>best-kafka-db</module>
Expand Down
36 changes: 36 additions & 0 deletions shared-kafka-db/docker-compose.yml
@@ -0,0 +1,36 @@
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.3.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-enterprise-kafka:5.3.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
62 changes: 62 additions & 0 deletions shared-kafka-db/pom.xml
@@ -0,0 +1,62 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.springsource.open</groupId>
<artifactId>spring-shared-kafka-db</artifactId>
<version>2.0.0.CI-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<packaging>jar</packaging>
<name>Spring Best Efforts JMS-DB</name>
<description><![CDATA[Sample project showing message driven transaction
processing with Spring with best efforts 1PC.
]]> </description>
<properties>
<maven.test.failure.ignore>true</maven.test.failure.ignore>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<scope>test</scope>
</dependency>
<!-- Spring Dependencies -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-commons</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
@@ -0,0 +1,63 @@
package com.springsource.open.foo;

import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;

import javax.sql.DataSource;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class FooHandler implements Handler {

private static final Log log = LogFactory.getLog(FooHandler.class);

private final JdbcTemplate jdbcTemplate;

private final AtomicInteger count = new AtomicInteger(0);

public FooHandler(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
}

@Override
@KafkaListener(id = "group", topics = "async", autoStartup = "false")
@Transactional
public void handle(@Payload String msg, @Header(KafkaHeaders.OFFSET) long offset) {

log.debug("Received message: [" + msg + "]");

Date date = new Date();
jdbcTemplate.update("INSERT INTO T_FOOS (ID, name, foo_date) values (?, ?,?)", count.getAndIncrement(), msg,
date);

int updated = jdbcTemplate.update("UPDATE T_OFFSETS set topic=?, part=0, offset=? where ID=0", "async",
offset);
if (updated < 1) {
jdbcTemplate.update("INSERT into T_OFFSETS (ID, topic, part, offset) values (?,?,?,?)", 0, "async", 0,
offset);
}

log.debug(String.format("Inserted foo with name=%s, date=%s, offset=%d", msg, date, offset));

}

@Override
public void resetItemCount() {
count.set(0);
}

@Override
public int getItemCount() {
return count.get();
}

}
@@ -0,0 +1,11 @@
package com.springsource.open.foo;

public interface Handler {

void handle(String msg, long offset);

void resetItemCount();

int getItemCount();

}
@@ -0,0 +1,99 @@
/*
* Copyright 2012-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.springsource.open.foo;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import javax.sql.DataSource;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartition;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;

@SpringBootApplication
public class ListenerApplication {

@Bean
public NewTopic asyncTopic() {
return new NewTopic("async", 1, (short) 1);
}

@Bean
public ConsumerAwareRebalanceListener listenerContanerCustomizer(ConsumerConfiguration config) {
return new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsAssigned(org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
long offset = config.getOffset(partition.topic(), partition.partition()) + 1;
System.err.println("Seeking: " + partition + " to offset=" + offset + " from position="
+ consumer.position(partition));
consumer.seek(partition, offset);
}
}
};
}

@Bean
ConsumerConfiguration config(DataSource dataSource) {
return new ConsumerConfiguration(dataSource);
}

public static void main(String[] args) throws Exception {
SpringApplication.run(ListenerApplication.class, args);
}

}

class ConsumerConfiguration {
private Map<String, Long> cache = new HashMap<>();
private final JdbcTemplate jdbcTemplate;

public ConsumerConfiguration(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
}

public long getOffset(String topic, int partition) {
init(topic);
return this.cache.get(topic);
}

private void init(String topic) {
Long initialized = this.cache.get(topic);
if (initialized != null) {
return;
}
try {
Map<String, Object> offset = jdbcTemplate.queryForMap("SELECT * FROM T_OFFSETS WHERE ID=0 and TOPIC=?",
topic);
this.cache.put(topic, (Long) offset.get("offset"));
} catch (EmptyResultDataAccessException e) {
Long offset = -1L;
jdbcTemplate.update("INSERT into T_OFFSETS (ID, topic, part, offset) values (?, ?, ?, ?)", 0, topic, 0,
offset);
this.cache.put(topic, offset);
}
}

}
5 changes: 5 additions & 0 deletions shared-kafka-db/src/main/resources/application.properties
@@ -0,0 +1,5 @@
spring.datasource.continue-on-error=true
spring.kafka.consumer.enable-auto-commit=false

logging.level.com.springsource=debug
logging.level.org.apache.kafka=warn
@@ -0,0 +1,27 @@
package com.springsource.open.foo;

import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Component;

@Aspect
@Component
public class FailureSimulator {

/**
* Just throws a {@link DataIntegrityViolationException}.
*/
public void simulateBusinessProcessingFailure() {
throw new DataIntegrityViolationException("Simulated failure.");
}

@AfterReturning("execution(* *..*Handler+.handle(String,..)) && args(msg, ..)")
public void maybeFail(String msg) {
if (msg.contains("fail")) {
System.err.println("Failing...");
simulateBusinessProcessingFailure();
}
}

}

0 comments on commit 88bc07b

Please sign in to comment.