Skip to content

Commit

Permalink
使用SpringBoot1.5.1自动配置Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
QDPeng committed Apr 20, 2017
1 parent f75f7e6 commit db9fe40
Show file tree
Hide file tree
Showing 12 changed files with 290 additions and 0 deletions.
3 changes: 3 additions & 0 deletions 29-SpringbootKafka/.gitignore
@@ -0,0 +1,3 @@
target/
.idea
*.iml
13 changes: 13 additions & 0 deletions 29-SpringbootKafka/README.md
@@ -0,0 +1,13 @@
#SpringBoot1.5.1版本集成spring-kafka自动配置

## 1、在官网下载安装 kafka_2.11-0.10.2.0

## 2、进入conf/server.properties 开启监听:listeners=PLAINTEXT://localhost:9092

## 3、通过cmd进入bin/windows目录,开启zookeeper server:

### zookeeper-server-start.bat D:\kafka_2.11-0.10.2.0\config\zookeeper.properties

## 3、通过cmd进入bin/windows目录,开启kafka server:

### kafka-server-start.bat D:\kafka_2.11-0.10.2.0\config\server.properties
37 changes: 37 additions & 0 deletions 29-SpringbootKafka/pom.xml
@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.penck</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>

<!-- Inherit defaults from Spring Boot -->
<parent>
<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.1.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,14 @@
package com.penck.kafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
* Created by peng on 2017/4/20.
*/
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
}
@@ -0,0 +1,22 @@
package com.penck.kafka.consumer;

import com.penck.kafka.domain.Message;
import com.penck.kafka.util.JSONUtils;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
* Created by peng on 2017/4/20.
*/
@Component
public class Consumer {
@KafkaListener(topics = "test1")
public void processMessage(String content) {
try {
Message m = JSONUtils.json2pojo(content, Message.class);
System.out.println(m.getId() + ",msg=" + m.getMsg());
} catch (Exception e) {
e.printStackTrace();
}
}
}
@@ -0,0 +1,36 @@
package com.penck.kafka.domain;

import java.util.Date;

/**
* Created by peng on 2017/4/20.
*/
public class Message {
private Long id;
private String msg;
private Date sendTime;

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

public String getMsg() {
return msg;
}

public void setMsg(String msg) {
this.msg = msg;
}

public Date getSendTime() {
return sendTime;
}

public void setSendTime(Date sendTime) {
this.sendTime = sendTime;
}
}
@@ -0,0 +1,31 @@
package com.penck.kafka.producer;

import com.penck.kafka.domain.Message;
import com.penck.kafka.util.JSONUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.UUID;

/**
* Created by peng on 2017/4/20.
*/
@Component
public class Producer {
@Autowired
private KafkaTemplate kafkaTemplate;

public void sendMessage() {
Message m = new Message();
m.setId(System.currentTimeMillis());
m.setMsg(UUID.randomUUID().toString());
m.setSendTime(new Date());
try {
kafkaTemplate.send("test1", JSONUtils.obj2json(m));
} catch (Exception e) {
e.printStackTrace();
}
}
}
@@ -0,0 +1,86 @@
package com.penck.kafka.util;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Created by peng on 2017/4/20.
*/
public class JSONUtils {

private final static ObjectMapper objectMapper = new ObjectMapper();

private JSONUtils() {

}

public static ObjectMapper getInstance() {

return objectMapper;
}

/**
* javaBean,list,array convert to json string
*/
public static String obj2json(Object obj) throws Exception {
return objectMapper.writeValueAsString(obj);
}

/**
* json string convert to javaBean
*/
public static <T> T json2pojo(String jsonStr, Class<T> clazz)
throws Exception {
return objectMapper.readValue(jsonStr, clazz);
}

/**
* json string convert to map
*/
public static <T> Map<String, Object> json2map(String jsonStr)
throws Exception {
return objectMapper.readValue(jsonStr, Map.class);
}

/**
* json string convert to map with javaBean
*/
public static <T> Map<String, T> json2map(String jsonStr, Class<T> clazz)
throws Exception {
Map<String, Map<String, Object>> map = objectMapper.readValue(jsonStr,
new TypeReference<Map<String, T>>() {
});
Map<String, T> result = new HashMap<String, T>();
for (Map.Entry<String, Map<String, Object>> entry : map.entrySet()) {
result.put(entry.getKey(), map2pojo(entry.getValue(), clazz));
}
return result;
}

/**
* json array string convert to list with javaBean
*/
public static <T> List<T> json2list(String jsonArrayStr, Class<T> clazz)
throws Exception {
List<Map<String, Object>> list = objectMapper.readValue(jsonArrayStr,
new TypeReference<List<T>>() {
});
List<T> result = new ArrayList<T>();
for (Map<String, Object> map : list) {
result.add(map2pojo(map, clazz));
}
return result;
}

/**
* map convert to javaBean
*/
public static <T> T map2pojo(Map map, Class<T> clazz) {
return objectMapper.convertValue(map, clazz);
}
}
@@ -0,0 +1,21 @@
package com.penck.kafka.web;

import com.penck.kafka.producer.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* Created by peng on 2017/4/20.
*/
@RestController
public class TestController {

@Autowired
private Producer producer;

@RequestMapping("/produce")
public void produce() {
producer.sendMessage();
}
}
14 changes: 14 additions & 0 deletions 29-SpringbootKafka/src/main/resources/config/application-dev.yml
@@ -0,0 +1,14 @@
spring:
thymeleaf:
cache: false


kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: "penck"
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
@@ -0,0 +1,5 @@
spring:
thymeleaf:
cache: true


8 changes: 8 additions & 0 deletions 29-SpringbootKafka/src/main/resources/config/application.yml
@@ -0,0 +1,8 @@
spring:
application:
name: kafka
profiles:
active: dev

server:
port: 8200

0 comments on commit db9fe40

Please sign in to comment.