Skip to content

Canal Kafka QuickStart

rewerma edited this page Aug 18, 2018 · 12 revisions

环境版本

  • 操作系统:CentOS release 6.6 (Final)
  • java版本: jdk1.8
  • zookeeper版本: zookeeper-3.4.11
  • kafka 版本: kafka_2.11-1.1.1.tgz
  • canal.kafka 版本: 请下载最新的安装包,本文以当前v1.0.26 alpha 5 的canal.kafka-1.0.26-SNAPSHOT.tar.gz 为例
  • MySQL版本 :5.7.18
  • 注意 : 关闭所有机器的防火墙,同时注意启动可以相互telnet ip 端口

一、 安装jdk

此处省略

二、 安装zookeeper

2.1 下载源码包,并解压

官网下载地址:http://www.apache.org/dyn/closer.cgi/zookeeper

wget http://mirror.olnevhost.net/pub/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz
tar zxvf zookeeper-3.4.11.tar.gz  
mv zookeeper-3.4.11 /usr/local/zookeeper

2.2 修改环境变量

编辑 /etc/profile 文件, 在文件末尾添加以下环境变量配置:

# ZooKeeper Env
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin

运行以下命令使环境变量生效: source /etc/profile

2.3 重命名配置文件

初次使用 ZooKeeper 时,需要将$ZOOKEEPER_HOME/conf 目录下的 zoo_sample.cfg 重命名为 zoo.cfg, zoo.cfg

mv  $ZOOKEEPER_HOME/conf/zoo_sample.cfg $ZOOKEEPER_HOME/conf/zoo.cfg

2.4 单机模式--修改配置文件

创建目录/usr/local/zookeeper/data/usr/local/zookeeper/logs 修改配置文件

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
clientPort=2181

如果是多节点,配置文件中尾部增加

server.1=192.168.1.110:2888:3888
server.2=192.168.1.111:2888:3888
server.3=192.168.1.112:2888:3888

同时,增加

#master
echo "1">/usr/local/zookeeper/data/myid

#slave1
echo "2">/usr/local/zookeeper/data/myid

#slave2
echo "3">/usr/local/zookeeper/data/myid

2.5 启动 ZooKeeper 服务

# cd /usr/local/zookeeper/zookeeper-3.4.11/bin
# ./zkServer.sh  start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/zookeeper-3.4.11/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

zkServer.sh  status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower

2.6 验证zooKeeper服务

服务启动完成后,可以使用 telnet 和 stat 命令验证服务器启动是否正常:

# telnet 127.0.0.1 2181
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
stat
Zookeeper version: 3.4.11-37e277162d567b55a07d1755f0b31c32e93c01a0, built on 11/01/2017 18:06 GMT
Clients:
/127.0.0.1:48430[0](queued=0,recved=1,sent=0)

Latency min/avg/max: 0/0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0x0
Mode: standalone
Node count: 4
Connection closed by foreign host.

2.7 停止 ZooKeeper 服务

想要停止 ZooKeeper 服务, 可以使用如下命令:

# cd /usr/local/zookeeper/zookeeper-3.4.11/bin
# ./zkServer.sh  stop
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/zookeeper-3.4.11/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED

三、zk ui安装 (选装,页面查看zk的数据)

拉取代码

#git clone https://github.com/DeemOpen/zkui.git

源码编译需要安装 maven

# wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo
#cd zkui/
#yum install -y maven
#mvn clean install

修改配置文件默认值

#vim config.cfg
    serverPort=9090     #指定端口
    zkServer=192.168.1.110:2181
    sessionTimeout=300000

启动程序至后台

2.0-SNAPSHOT 会随软件的更新版本不同而不同,执行时请查看target 目录中真正生成的版本

 nohup java -jar target/zkui-2.0-SNAPSHOT-jar-with-dependencies.jar & 

用浏览器访问:

http://192.168.1.110:9090/

四、安装kafka

4.1 下载压缩包, 复制到固定目录并解压

到官网下载压缩包

wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.11-1.1.1.tgz
mkdir  -p /usr/local/kafka
cp   kafka_2.11-1.1.1.tgz   /usr/local/kafka
tar -zxvf kafka_2.11-1.1.1.tgz

4.2 修改配置文件

vim /usr/local/kafka/kafka_2.11-1.1.1/config/server.properties 修改参数

zookeeper.connect=192.168.1.110:2181
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.1.117:9092 #本机ip
# ...

4.3 启动server bin/kafka-server-start.sh -daemon config/server.properties &

  • 查看所有topic
# bin/kafka-topics.sh --list --zookeeper 192.168.1.110:2181
  • 查看指定topic 下面的数据
# bin/kafka-console-consumer.sh ---bootstrap-server 192.168.1.117:9092  --from-beginning --topic example_t
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

4.7 参考链接

https://kafka.apache.org/quickstart

五、 安装canal.kafka

5.1 下载压缩包

到官网下载最新压缩包,请下载 canal.kafka-latest.tar.gz。[下面以v1.0.26 alpha 5 的canal.kafka-1.0.26-SNAPSHOT.tar.gz 为例]

https://github.com/alibaba/canal/releases

5.2 将canal.kafka 复制到固定目录并解压

mkdir  -p /usr/local/canal
cp   canal.kafka-1.0.26-SNAPSHOT.tar.gz   /usr/local/canal
tar -zxvf canal.kafka-1.0.26-SNAPSHOT.tar.gz 

5.3 配置修改参数

修改instance 配置文件 vi conf/example/instance.properties

#  按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
#################################################

对应ip 地址的MySQL 数据库需进行相关初始化与设置 详情参考 QuickStart : https://github.com/alibaba/canal/wiki/QuickStart

修改canal 配置文件vim /usr/local/canal/conf/canal.properties

# ...
canal_ip =192.168.1.99
canal.withoutNetty =**true**
# zk server地址
canal.zkServers=192.168.1.110:2181,192.168.1.111:2181,192.168.1.112:2181 
canal.destinations= example
# ...
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

详细请参考 AdminGuide : https://github.com/alibaba/canal/wiki/AdminGuide

修改canal.kafka 配置文件vim /usr/local/canal/conf/kafka.yml

#kafka 集群用,进行分隔: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 
servers: 192.168.1.117:9092
retries: 0
batchSize: 16384
lingerMs: 1
bufferMemory: 33554432
# canal的批次大小,单位 k
canalBatchSize: 50
filterTransactionEntry: true

canalDestinations:
  - canalDestination: example
    topic: example
    partition:
    # 一个destination可以对应多个topic
# topics:
# - topic: example
#   partition:

参数说明

servers: 对应 kafka的 `ProducerConfig.BOOTSTRAP_SERVERS_CONFIG`
retries: 对应kafka的`ProducerConfig.RETRIES_CONFIG` 
batchSize: 对应kafka的`ProducerConfig.BATCH_SIZE_CONFIG` 
lingerMs: 对应kafka的`ProducerConfig.LINGER_MS_CONFIG` 
bufferMemory: 对应kafka的`ProducerConfig.BUFFER_MEMORY_CONFIG` 
filterTransactionEntry: 过滤事务头、尾

5.4 启动

cd /usr/local/canal/
sh bin/startup.sh

5.5 查看日志

a.查看 logs/canal/canal.log

vi logs/canal/canal.log

b. 查看instance的日志:

vi logs/example/example.log

5.6 关闭

cd /usr/local/canal/
sh bin/stop.sh

5.7 使用canal.kafka.client 查看kafka topic中的代码样例

# // 可参考 canal kafka client 中的实现
com.alibaba.otter.canal.kafka.client.running.CanalKafkaClientExample

5.8 参考链接

https://github.com/alibaba/canal/wiki/QuickStart
https://github.com/alibaba/canal/wiki/AdminGuide
Clone this wiki locally