Skip to content

Commit

Permalink
Initial commit 🌈
Browse files Browse the repository at this point in the history
  • Loading branch information
7le committed Jun 23, 2018
0 parents commit d8b42fc
Show file tree
Hide file tree
Showing 16 changed files with 842 additions and 0 deletions.
26 changes: 26 additions & 0 deletions .gitignore
@@ -0,0 +1,26 @@
target/
!.mvn/wrapper/maven-wrapper.jar

*.properties

### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans

### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr

### NetBeans ###
nbproject/private/
build/
nbbuild/
dist/
nbdist/
.nb-gradle/
148 changes: 148 additions & 0 deletions pom.xml
@@ -0,0 +1,148 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>top.arkstack</groupId>
<artifactId>shine-mq</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>shine-mq</name>
<url>http://maven.apache.org</url>

<parent>
<groupId>org.sonatype.oss</groupId>
<artifactId>oss-parent</artifactId>
<version>7</version>
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<guava.version>21.0</guava.version>
<fastJson.version>1.2.40</fastJson.version>
<junit.version>4.12</junit.version>
<spring-amqp.version>2.0.4.RELEASE</spring-amqp.version>
<lombok.version>1.16.22</lombok.version>
<rabbit-amqp-client.version>5.1.2</rabbit-amqp-client.version>
<spring.boot>2.0.3.RELEASE</spring.boot>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${spring.boot}</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<version>${spring.boot}</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
<version>${spring.boot}</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${rabbit-amqp-client.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>${spring-amqp.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>${spring-amqp.version}</version>
<exclusions>
<exclusion>
<artifactId>http-client</artifactId>
<groupId>com.rabbitmq</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastJson.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.4.4</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<encoding>UTF-8</encoding>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
</licenses>

<developers>
<developer>
<name>7le</name>
<email>silk.heqian@gmail.com</email>
<organization>ArkStack</organization>
<organizationUrl>7le.top</organizationUrl>
</developer>
</developers>

</project>
41 changes: 41 additions & 0 deletions src/main/java/top/arkstack/shine/mq/Factory.java
@@ -0,0 +1,41 @@
package top.arkstack.shine.mq;

import org.springframework.amqp.support.converter.MessageConverter;
import top.arkstack.shine.mq.processor.Processor;

/**
* @author 7le
* @version 1.0.0
*/
public interface Factory {

/**
* 启动方法
*/
void start();

/**
* 添加exchange和queue
* 生产者创建队列不需要增加processor,消费者需要添加processor
*
* @param queueName 队列
* @param exchangeName 交换器
* @param routingKey 路由密钥
* @param processor 处理器
* @return
*/
Factory add(String queueName, String exchangeName, String routingKey, Processor processor);

/**
* 添加exchange和queue
* 生产者创建队列不需要增加processor,消费者需要添加processor
*
* @param queueName 队列
* @param exchangeName 交换器
* @param routingKey 路由密钥
* @param processor 处理器
* @param messageConverter 序列化处理器
* @return
*/
Factory add(String queueName, String exchangeName, String routingKey, Processor processor, MessageConverter messageConverter);
}
73 changes: 73 additions & 0 deletions src/main/java/top/arkstack/shine/mq/MessageAdapterHandler.java
@@ -0,0 +1,73 @@
package top.arkstack.shine.mq;

import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.support.converter.MessageConverter;
import top.arkstack.shine.mq.bean.EventMessage;
import top.arkstack.shine.mq.processor.Processor;

import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* 消息适配处理器
*
* @author 7le
* @version 1.0.0
*/
public class MessageAdapterHandler {

private static final Logger logger = LoggerFactory.getLogger(MessageAdapterHandler.class);

private ConcurrentMap<String, ProcessorWrap> map;

protected MessageAdapterHandler() {
this.map = new ConcurrentHashMap<>();
}

protected void add(String queueName, String exchangeName, String routingKey,
Processor processor, MessageConverter messageConverter) {

Objects.requireNonNull(queueName, "The queueName is empty.");
Objects.requireNonNull(exchangeName, "The exchangeName is empty.");
Objects.requireNonNull(messageConverter, "The messageConverter is empty.");
Objects.requireNonNull(routingKey, "The routingKey is empty.");

ProcessorWrap pw = new ProcessorWrap(messageConverter, processor);
ProcessorWrap oldProcessorWrap = map.putIfAbsent(queueName + "_" + exchangeName + "_" + routingKey, pw);
if (oldProcessorWrap != null) {
logger.warn("The processor of this queue and exchange exists");
}
}

public void receive(byte[] msg) {
EventMessage message = JSON.parseObject(new String(msg), EventMessage.class);
ProcessorWrap wrap = map.get(message.getQueueName() + "_" + message.getExchangeName() + "_" + message.getRoutingKey());
wrap.process(message.getData());
}


protected Set<String> getAllBinding() {
Set<String> keySet = map.keySet();
return keySet;
}

protected static class ProcessorWrap {

private MessageConverter messageConverter;

private Processor processor;

protected ProcessorWrap(MessageConverter messageConverter, Processor processor) {
this.messageConverter = messageConverter;
this.processor = processor;
}

public Object process(Object msg) {
return processor.process(msg);
}
}
}
22 changes: 22 additions & 0 deletions src/main/java/top/arkstack/shine/mq/MessageErrorHandler.java
@@ -0,0 +1,22 @@
package top.arkstack.shine.mq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ErrorHandler;

/**
* 异常处理
*
* @author 7le
* @version 1.0.0
*/
public class MessageErrorHandler implements ErrorHandler {

private static final Logger logger = LoggerFactory.getLogger(MessageErrorHandler.class);

@Override
public void handleError(Throwable t) {
logger.error("MQ happen a error:" + t.getMessage(), t);
}

}
27 changes: 27 additions & 0 deletions src/main/java/top/arkstack/shine/mq/MqAutoConfiguration.java
@@ -0,0 +1,27 @@
package top.arkstack.shine.mq;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author 7le
* @version 1.0.0
*/
@Configuration
@EnableConfigurationProperties
public class MqAutoConfiguration {

@Bean
@ConditionalOnMissingBean
public RabbitmqProperties mqProperties() {
return new RabbitmqProperties();
}

@Bean
@ConditionalOnMissingBean
public RabbitmqFactory rabbitmqFactory(RabbitmqProperties mqProperties) {
return RabbitmqFactory.getInstance(mqProperties);
}
}

0 comments on commit d8b42fc

Please sign in to comment.