Skip to content

用户指南

Cai E edited this page Aug 26, 2016 · 7 revisions

Koper 用户指南

1. Koper Listener 模型编程

本节将展示如何将 Koper 集成到你自己的项目中。

在开始之前,你需要先用下面的命令将 Koper 库安装到本地仓库里:

mvn install

1.1 用例

下面将创建两个项目

项目 "MemberSignup" 是消息生产者程序
项目 "MemberListener" 是消息监听器(消费者)程序

1.2 开发消息生产者

先创建项目 "MemberSignup",按照下面步骤进行

1) 添加依赖

<dependency>
    <groupId>atterlab</groupId>
    <artifactId>koper-core</artifactId>
    <version>1.2.0</version>
</dependency>
<dependency>
    <groupId>atterlab</groupId>
    <artifactId>koper-kafka</artifactId>
    <version>1.2.0</version>
</dependency>

2) 生产者配置

context-data-producer.xml

<beans>
    <context:component-scan base-package="koper.*"/>

    <!-- 载入属性文件 -->
    <context:property-placeholder ignore-unresolvable="true" location="classpath:kafka/kafka-data-producer.properties"/>

    <bean id="producerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="properties">
            <props>
                <prop key="metadata.broker.list">${metadata.broker.list}</prop>
                <prop key="topic.metadata.refresh.interval.ms">${topic.metadata.refresh.interval.ms}</prop>
                <prop key="message.send.max.retries">${message.send.max.retries}</prop>
                <prop key="serializer.class">${serializer.class}</prop>
                <prop key="request.required.acks">${request.required.acks}</prop>
                <prop key="producer.type">${producer.type}</prop>
            </props>
        </property>
    </bean>

    <bean id="messageSender" class="koper.kafka.KafkaSender">
        <property name="properties" ref="producerProperties"/>
    </bean>
</beans>

${} 占位符从 kafka/kafka-data-producer.properties 文件中读取配置。

Tip

我们提供了一台公开的 Kafka 服务器(120.26.225.169:9092),无需修改就可以直接使用。 但如果你想使用自己的服务器,则需要修改metadata.broker.list属性。

metadata.broker.list=120.26.225.169:9092
topic.metadata.refresh.interval.ms=3600000
message.send.max.retries=5
request.required.acks=1
serializer.class=kafka.serializer.StringEncoder
# sync同步调用,async异步调用
producer.type=async

3) 创建业务服务

public interface MemberService {
    void signup(Member member);
}

4) 发送消息

用户注册后,我们会发消息到 MQ,作为解耦的设计模式,我们并不关心谁会收到和处理这些消息。

@Service
public class MemberServiceImpl implements MemberService {

    @Autowired
    private MemberMapper memberMapper;
    private static final String MEMBER_SIGNUP_TOPIC = "koper.demo.message.notifyMemberAfterSignup";

    @Autowired
    private MessageSender messageSender;

    public void signup(Member member) {
        memberMapper.createMember(member);
        messageSender.send(MEMBER_SIGNUP_TOPIC, "Signed up successfully! " + member.getPhoneNo());
    }
}

1.3 开发监听器 (消费者)

根据需求,在用户注册之后系统应当用短信通知用户,所以这里需要通过订阅注册消息来实现。

创建项目 "MemberSignupListener",并按以下步骤操作:

1) 添加依赖

<dependency>
    <groupId>atterlab</groupId>
    <artifactId>koper-core</artifactId>
    <version>1.2.0</version>
</dependency>
<dependency>
    <groupId>atterlab</groupId>
    <artifactId>koper-kafka</artifactId>
    <version>1.2.0</version>
</dependency>

2) 消费者配置

context-data-consumer.xml

<beans>
    <context:component-scan base-package="koper.*"/>

    <!-- 载入属性文件 -->
    <context:property-placeholder ignore-unresolvable="true" location="classpath:kafka/kafka-data-consumer.properties"/>

    <!-- 消息监听者收集器 -->
    <bean id="messageListenerBeanPostProcessor" class="koper.MessageListenerBeanPostProcessor"/>

    <bean id="consumerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="properties">
            <props>
                <prop key="zookeeper.connect">${zookeeper.connect}</prop>
                <prop key="group.id">${group.id}</prop>
                <prop key="zookeeper.session.timeout.ms">${zookeeper.session.timeout.ms}</prop>
                <prop key="zookeeper.sync.time.ms">${zookeeper.sync.time.ms}</prop>
                <prop key="auto.commit.interval.ms">${auto.commit.interval.ms}</prop>
            </props>
        </property>
    </bean>

    <bean id="router" class="koper.router.RandomRouter"/>

    <bean id="messageCenter" class="koper.DefaultMessageCenter">
        <constructor-arg index="0" value="1"/>
        <constructor-arg index="1" ref="router"/>
    </bean>

    <bean id="consumerLauncher" class="koper.client.DefaultConsumerLauncher">
        <property name="properties" ref="consumerProperties"/>
        <property name="autoStart" value="true"/>
        <property name="partitions" value="${num.partitions}"/>
        <property name="dispatcherThreads" value="${dispatcherThreads}"/>
        <property name="messageCenter" ref="messageCenter"/>
        <property name="messageReceiverClass" value="koper.kafka.KafkaReceiver"/>
        <property name="messageDispatcherClass" value="koper.DefaultMessageDispatcher"/>
    </bean>
</beans>

${} 占位符从 kafka/kafka-data-consumer.properties 文件中读取配置。

如果你使用本地的 zookeeper,则需要修改zookeeper.connect参数。

zookeeper.connect=120.26.225.169:2181
group.id=zm_message_framework_test
zookeeper.session.timeout.ms=400000
zookeeper.sync.time.ms=500
auto.commit.interval.ms=1000
num.partitions=16
dispatcherThreads=20

3) 创建 Listener

@Component
public class MemberSignupListener extends AbstractMessageListener {

    @Autowired
    private SmsService smsService;

    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Listen(topic = "koper.demo.message.notifyMemberAfterSignup")
    @Override
    public void onMessage(String msg) {
        logger.info("Got msg, msg: {}", msg);
        smsService.sendSms(msg);
    }
}

Tip

我们使用了注入的 SmsService 服务发送短信,你可以替换成自己的服务。

1.4 运行 Listener 程序

为了方便观察,我们先启动 Listener 程序,这样等 "MemberSignup" 程序启动并发送消息后,Listener 就能立即收到。

创建一个启动类用于启动 Listener:

public class MemberSignUpListenerDemo {

    public static void main(String[] args) {
        final ApplicationContext context =
                new ClassPathXmlApplicationContext("classpath:kafka/context-data-consumer.xml");

        final ConsumerLauncher consumerLauncher = context.getBean(ConsumerLauncher.class);
        // we have close the switch in context-data-consumer.xml profile(autoStart) temporary
        consumerLauncher.start();

    }
}

然后在 IDE 中运行该 MemberSignUpListenerDemo 类.

1.5 运行消息生产者

1) 创建生产者启动类 MemberSignUpDemo

public class MemberSignUpDemo {

    public static void main(String[] args) {
        final ApplicationContext context =
                new ClassPathXmlApplicationContext("classpath:kafka/context-data-producer.xml");
        final MemberService memberService = context.getBean(MemberService.class);
        Member member = new Member();
        member.setName("LeBron James");
        member.setPhoneNo("15097300863");
        memberService.signup(member);
    }
}
  1. 在 IDE 中运行 MemberSignUpDemo,它会调用 memberService.signup(member) 并发送消息到 MQ,然后回到消费者进程的控制台中,如果一切正常,控制台将会打印出:

"Got msg, msg: Signed up successfully! 15097300863".

U GOT IT?! 我们已经实现了一个完整的包含消息生产者和消费者的应用。

2. Koper 数据事件模型编程

数据事件模型 是 Koper 的另一个重要特性,它提供了一种更为自然和友好的编程模型。

通常来讲,数据事件模型主要用于数据访问对象(DAO)操作。

利用 AOP 机制,Koper 可以拦截对象(比如 DAO)的方法调用,并封装消息发送至 Kafka。

方法 Kafka Topic
updateOrder (Before) com.xxx.xxx.OrderMapperImpl.updateOrder_B
updateOrder (After) com.xxx.xxx.OrderMapperImpl.updateOrder
updateOrder (AfterThrowing) com.xxx.xxx.OrderMapperImpl.updateOrder_X

如果 DataEventListener 要监听 updateOrder(Before) 事件,响应方法可以这样写:

public void onUpdateOrder_B(Order order)
{       }

如果 DataEventListener 要监听 updateOrder(After) 事件,响应方法可以这样写:

 public void onUpdateOrder(Order order) 
{}

如果 DataEventListener 要监听 updateOrder(AfterThrowing) 事件,响应方法可以这样写:

 public void onUpdateOrder_X(Order order,DataEvent event)
 {}

2.2 开发消息生产者

典型的用法是 OrderController -> OrderService->OrderDao,这里我们以 OrderDao 上的数据事件为例:

1) 添加依赖

<dependency>
   <groupId>atterlab</groupId>
   <artifactId>koper</artifactId>
   <version>1.2.0</version>
</dependency>
<dependency>
   <groupId>atterlab</groupId>
   <artifactId>koper-dataevent</artifactId>
   <version>1.2.0</version>
</dependency>
<dependency>
   <groupId>atterlab</groupId>
   <artifactId>koper-kafka</artifactId>
   <version>1.2.0</version>
</dependency>

2) 生产者配置

context-data-producer.xml

<beans>
   <context:component-scan base-package="koper.*"/>
   <!-- 载入属性文件 -->
   <context:property-placeholder ignore-unresolvable="true" location="classpath:kafka/kafka-data-producer.properties"/>

   <bean id="producerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
       <property name="properties">
           <props>
               <prop key="metadata.broker.list">${metadata.broker.list}</prop>
               <prop key="topic.metadata.refresh.interval.ms">${topic.metadata.refresh.interval.ms}</prop>
               <prop key="message.send.max.retries">${message.send.max.retries}</prop>
               <prop key="serializer.class">${serializer.class}</prop>
               <prop key="request.required.acks">${request.required.acks}</prop>
               <prop key="producer.type">${producer.type}</prop>
           </props>
       </property>
   </bean>

   <bean id="messageSender" class="koper.kafka.KafkaSender">
       <property name="properties" ref="producerProperties"/>
   </bean>
</beans>

context-data-message.xml AOP 机制用于在方法调用时触发数据事件:

<beans>
    <!-- 启动AspectJ支持 -->
    <aop:aspectj-autoproxy/>

    <bean id="beforeSendMessageAdvice" class="koper.aop.SendMsgBeforeInvokeAdvice"/>
    <bean id="afterSendMessageAdvice" class="koper.aop.SendMsgAfterInvokeAdvice"/>
    <bean id="exceptionSendMessageAdvice" class="koper.aop.SendMsgAfterExceptionAdvice"/>

    <aop:config>
        <!-- 方法调用后发送消息 -->
        <aop:pointcut id="insertAfterSendMsgPointCut"
                      expression="execution(* koper..mapper.impl.*MapperImpl.insert*(..))"/>

        <!-- 2. after 切面 -->
        <aop:aspect id="insertAfterSendMsgPointCutAspect" ref="afterSendMessageAdvice">
            <aop:around pointcut-ref="insertAfterSendMsgPointCut" method="around"/>
        </aop:aspect>

    </aop:config>
</beans>

${} 占位符从 kafka/kafka-data-producer.properties 文件中读取配置

如果使用本地 Kafka 服务器,则需修改metadata.broker.list参数。

metadata.broker.list=120.26.225.169:9092
topic.metadata.refresh.interval.ms=3600000
message.send.max.retries=5
request.required.acks=1
serializer.class=kafka.serializer.StringEncoder
# sync同步调用,async异步调用
producer.type=async

3) 创建业务服务

public interface OrderMapper {
    Integer insertOrder(Order order);
}

4) 发送消息

当调用 OrderMapperImpl.insertOrder 方法时,Koper 会封装消息并发送至 Kafka 服务器

  • Topic 格式是 包名 + 类名 + 方法名
  • 消息是方法的参数 (在本例中,消息就是 order)
@Repository
public class OrderMapperImpl implements OrderMapper {
    @Override
    public Integer insertOrder(Order order) {
        sqlSessionTemplate.insert("OrderMapper.insert", order);
        return 1;
    }
}

2.3 开发 Listener(消费者)

然后需要响应这一事件

1) 添加依赖

<dependency>
   <groupId>atterlab</groupId>
   <artifactId>koper-core</artifactId>
   <version>1.2.0</version>
</dependency>
<dependency>
   <groupId>atterlab</groupId>
   <artifactId>koper-dataevent</artifactId>
   <version>1.2.0</version>
</dependency>
<dependency>
   <groupId>atterlab</groupId>
   <artifactId>koper-kafka</artifactId>
   <version>1.2.0</version>
</dependency>

2) 消费者配置

context-data-consumer.xml

<beans>
    <context:component-scan base-package="koper.*">
        <!-- ignore this class because this file is prepared for consumer.  -->
        <context:exclude-filter type="assignable" expression="koper.demo.message.service.MemberService"/>
    </context:component-scan>

    <!-- 载入属性文件 -->
    <context:property-placeholder ignore-unresolvable="true" location="classpath:kafka/kafka-data-consumer.properties"/>

    <!-- 消息监听者收集器-->
    <bean id="messageListenerBeanPostProcessor" class="koper.event.DataEventListenerPostProcessor"/>

    <bean id="consumerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="properties">
            <props>
                <prop key="zookeeper.connect">${zookeeper.connect}</prop>
                <prop key="group.id">${group.id}</prop>
                <prop key="zookeeper.session.timeout.ms">${zookeeper.session.timeout.ms}</prop>
                <prop key="zookeeper.sync.time.ms">${zookeeper.sync.time.ms}</prop>
                <prop key="auto.commit.interval.ms">${auto.commit.interval.ms}</prop>
            </props>
        </property>
    </bean>

    <bean id="router" class="koper.router.RandomRouter"/>

    <bean id="messageCenter" class="koper.DefaultMessageCenter">
        <constructor-arg index="0" value="1"/>
        <constructor-arg index="1" ref="router"/>
    </bean>

    <bean id="consumerLauncher" class="koper.client.DefaultConsumerLauncher">
        <property name="properties" ref="consumerProperties"/>
        <property name="autoStart" value="true"/>
        <property name="partitions" value="${num.partitions}"/>
        <property name="dispatcherThreads" value="${dispatcherThreads}"/>
        <property name="messageCenter" ref="messageCenter"/>
        <property name="messageReceiverClass" value="koper.kafka.KafkaReceiver"/>
        <property name="messageDispatcherClass" value="koper.event.DataEventMessageDispatcher"/>
    </bean>
</beans>

${} 占位符从 kafka/kafka-data-consumer.properties 文件中读取配置

如果使用本地 zookeeper 服务器,需要修改zookeeper.connect参数。

zookeeper.connect=120.26.225.169:2181
group.id=zm_message_framework_test
zookeeper.session.timeout.ms=400000
zookeeper.sync.time.ms=500
auto.commit.interval.ms=1000
num.partitions=16
dispatcherThreads=20
  1. 创建 Listener
@Component
@DataListener(dataObject = "koper.demo.dataevent.mapper.impl.OrderMapperImpl")
public class OrderListener {
    public void onInsertOrder(Order order) {
        System.out.println("orderNo : " + order.getOrderNo());
        System.out.println("create time : " + order.getCreatedTime());
    }
}

3.4 运行 Listener 程序

1) 创建 Listener 启动类.

public class OrderMapperDataEventDemo {

  public static void main(String[] args) {
      final ApplicationContext context =
              new ClassPathXmlApplicationContext("classpath:kafka/context-data-consumer.xml");

      final ConsumerLauncher consumerLauncher = context.getBean(ConsumerLauncher.class);
      // we have close the switch in context-data-consumer.xml profile(autoStart) temporary
      consumerLauncher.start();

  }
}
  1. 运行 OrderMapperDataEventDemo.main 方法
  • 这将会启动消费者并等待消息到来

3.5 启动消息生产者

1) 创建 Listener 启动类

public class OrderDataEventDemo {

 public static void main(String[] args) {
     final ApplicationContext context =
             new ClassPathXmlApplicationContext(
                     "classpath:kafka/context-data-message.xml",
                     "classpath:kafka/context-data-producer.xml");
     final OrderService orderService = context.getBean(OrderService.class);

     final String orderNo = "201608161919222";
     final String createdTime = LocalDateTime.now().toString();
     Order order = new Order();
     order.setId(100);
     order.setOrderNo(orderNo);
     order.setCreatedTime(createdTime);
     //---call data operation, it will trigger a data event
     orderService.insertOrder(order);
 }
}
  1. 运行 OrderDataEventDemo
  • 将会在方法调用后发送消息到 Kafka
  • Topic: koper.demo.dataevent.mapper.impl.OrderMapperImpl.insertOrder

Listener 的控制台将会打印

orderNo : 201608161919222
create time : 2016-08-17T14:10:13.971

U GOT IT!
这样我们就完成了数据事件驱动 demo 的开发。

3. 使用两种模型的高级编程

TODO.....