Skip to content
master
Go to file
Code

Latest commit

 

Git stats

Files

Permalink
Failed to load latest commit information.
Type
Name
Latest commit message
Commit time
 
 
 
 
biz
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
web
 
 
 
 
 
 
 
 

README.md

项目简介

随着网站业务不断发展,各业务对数据的实时性,数据库的可用性要求越来越高。

本系统可以实时获得数据库的变更并通过消息方式发布出来,供各业务线订阅。

同时,本系统还会实现数据库同步(同构和异构),以满足数据库冗余备份,数据迁移的需求。

 

项目依赖

<dependency>
    <groupId>com.dianping.puma</groupId>
    <artifactId>puma-client</artifactId>
    <version>${version}</version>
</dependency>

最新版本为2.0.0

 

接入申请

发邮件给xiaotian.li@dianping.com

未申请的 ClientName 无法正常使用。

邮件标题:PumaClient 申请

邮件内容:

  • ClientName
  • 需要监听的库和表
  • 上线时间

 

申请完成在本地开发时,可以用test结尾的ClientName来做开发,避免相互影响。

 

使用方法简介

PumaClient client = new PumaClientConfig()
	.setClientName("your-client-name")
	.setDatabase("database")
	.setTables(Lists.newArrayList("table0", "table1"))
	.buildClusterPumaClient();

while(!Thread.currentThread().isInterrupted()) {
	try {
		BinlogMessage binlogMessage = client.get(10, 1, TimeUnit.SECOND);
		//todo: 处理数据
		client.ack(binlogMessage.getLastBinlogInfo());
	} catch(Exception e) {
		//这里的异常主要是用来打点的,便于及时发现问题
	}
}

PumaClient所有操作都是同步操作,并且线程不安全。

如果是job项目可以直接写上述代码。但是如果是在service或者web项目中,一定要新启一个线程来跑上述代码。

 

PumaClientConfig API && PumaClient API

代码即是文档。

建议直接在项目中看源码,或者到这里看源码:

 

最佳实践

import com.dianping.cat.Cat;
import com.dianping.puma.api.PumaClient;
import com.dianping.puma.api.PumaClientConfig;
import com.dianping.puma.api.PumaClientException;
import com.dianping.puma.core.dto.BinlogMessage;
import com.dianping.puma.core.event.Event;
import com.dianping.puma.core.event.RowChangedEvent;
import com.google.common.collect.Lists;

import java.util.concurrent.TimeUnit;

public class Example1 {

    /**
     * 假设这是一个 web 或 service 项目
     * 可以尝试运行多个,并随机关掉其中正在运行的那个,来模拟 failover
     *
     * @param args
     */
    public static final void main(String... args) {

        //不要阻塞主线程,需要自己令起线程
        Thread pumaClientThread = new Thread(new Runnable() {
            @Override
            public void run() {

                PumaClient client = new PumaClientConfig()
                        .setClientName("puma-client-example-test")
                        .setDatabase("Puma")
                        .setTables(Lists.newArrayList("PumaServer"))
                        .buildClusterPumaClient();

                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        BinlogMessage message = client.get(100, 1, TimeUnit.SECONDS);

                        for (Event event : message.getBinlogEvents()) {
                            if (event instanceof RowChangedEvent) {

                                RowChangedEvent rowChangedEvent = (RowChangedEvent) event;
                                System.out.println(rowChangedEvent.toString());

                                //todo: 处理数据
                            }
                        }

                        client.ack(message.getLastBinlogInfo());
                    } catch (PumaClientException e) {
                        Cat.logError(e.getMessage(), e);
                    }
                }
            }
        });


        pumaClientThread.setName("puma-client-example-test");
        pumaClientThread.setDaemon(true);
        pumaClientThread.start();


        while (true) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException ignore) {
            }
        }
    }
}

 

FAQ

 

ack 方法有什么用

PumaClient的同步进度会通过ack方法同步到云端,重启时,会读取最后一次ack的位置来进行同步。

 

rollback 方法有什么用

rollback方法可以用来强制定位后续数据的起始位置。

例如 DW 团队不需要实时监听数据,而是需要在每天凌晨同步昨天的数据,那么可以在启动的时候调用rollback方法将任务的起始位置定位到昨天0点,然后开始同步。

使用方法是:client.rollback(new BinlogInfo().setTimestamp(1447800000))

备注:这里的timestamp是秒数,不是毫秒数。(mysql底层binlog只精确到秒)

You can’t perform that action at this time.