Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support Springboot style demo #39

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 66 additions & 49 deletions kafka-spring-demo/vpc-ssl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,18 @@
<!-- springframe 版本控制 -->
<spring.version>4.3.2.RELEASE</spring.version>
</properties>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.2.3.RELEASE</version>
<exclusions>
<!-- 建议这里排除掉Kafka依赖,避免版本问题带来困扰-->
<exclusion>
Expand All @@ -25,66 +32,71 @@
</exclusions>
</dependency>

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
<!-- springframe start -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<!--&lt;!&ndash; springframe start &ndash;&gt;-->
<!--<dependency>-->
<!--<groupId>org.springframework</groupId>-->
<!--<artifactId>spring-core</artifactId>-->
<!--<version>${spring.version}</version>-->
<!--</dependency>-->

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.springframework</groupId>-->
<!--<artifactId>spring-web</artifactId>-->
<!--<version>${spring.version}</version>-->
<!--</dependency>-->

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>${spring.version}</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.springframework</groupId>-->
<!--<artifactId>spring-oxm</artifactId>-->
<!--<version>${spring.version}</version>-->
<!--</dependency>-->

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.version}</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.springframework</groupId>-->
<!--<artifactId>spring-tx</artifactId>-->
<!--<version>${spring.version}</version>-->
<!--</dependency>-->

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.springframework</groupId>-->
<!--<artifactId>spring-jdbc</artifactId>-->
<!--<version>${spring.version}</version>-->
<!--</dependency>-->

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.springframework</groupId>-->
<!--<artifactId>spring-webmvc</artifactId>-->
<!--<version>${spring.version}</version>-->
<!--</dependency>-->

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${spring.version}</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.springframework</groupId>-->
<!--<artifactId>spring-aop</artifactId>-->
<!--<version>${spring.version}</version>-->
<!--</dependency>-->

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.springframework</groupId>-->
<!--<artifactId>spring-context-support</artifactId>-->
<!--<version>${spring.version}</version>-->
<!--</dependency>-->

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.springframework</groupId>-->
<!--<artifactId>spring-test</artifactId>-->
<!--<version>${spring.version}</version>-->
<!--</dependency>-->
<!-- springframe end -->
<dependency>
<groupId>org.slf4j</groupId>
Expand All @@ -104,8 +116,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
<source>1.8</source>
<target>1.8</target>
<encoding>utf-8</encoding>
</configuration>
</plugin>
Expand Down Expand Up @@ -136,6 +148,11 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.aliyun.openservices.kafka.ons;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.Optional;

@SpringBootApplication
public class KafkaSpringbootDemo {

public static void main(String[] args) throws Exception {
System.setProperty("java.security.auth.login.config", KafkaSpringbootDemo.class.getClassLoader().getResource("kafka_client_jaas.conf").getPath());
ConfigurableApplicationContext context = SpringApplication.run(KafkaSpringbootDemo.class, args);
//给Consumer初始化一些时间,以便从latest开始消费
Thread.sleep(6000);

context.getBean(MessageSendService.class).sendOneMessage();

Thread.sleep(3000);
System.exit(0);
}

@Component
public static class KafkaReceiveService {

@KafkaListener(topics = {"XXXX"})
public void listen(ConsumerRecord<?, ?> record) {

Optional<?> kafkaMessage = Optional.ofNullable(record.value());

if (kafkaMessage.isPresent()) {

Object message = kafkaMessage.get();

System.out.println("----------------- record =" + record);
System.out.println("------------------ message =" + message);
}

}
}

@Component
public static class MessageSendService {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendOneMessage() throws Exception {
ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("XXXXX", "hello aliKafka");
kafkaTemplate.flush();
System.out.println("producer send ok " + result.get().getProducerRecord());
}
}
}
30 changes: 30 additions & 0 deletions kafka-spring-demo/vpc-ssl/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
spring:
kafka:
bootstrap-servers: XXXXX

properties:
ssl.truststore.location: XXX/kafka.client.truststore.jks
ssl.truststore.password: KafkaOnsClient
security.protocol: SASL_SSL
sasl.mechanism: PLAIN

producer:
retries: 10
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
linger.ms: 1

consumer:
group-id: XXXX
enable-auto-commit: false
auto-commit-interval: 100
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
session.timeout.ms: 15000