diff --git a/README.md b/README.md index 255b892..76c165c 100644 --- a/README.md +++ b/README.md @@ -16,21 +16,63 @@ CanalSharp 是阿里巴巴开源项目 Canal 的 .NET 客户端。为 .NET 开 关于 Canal 的更多信息请访问 https://github.com/alibaba/canal/wiki -## 二.如何使用 +## 二.应用场景 -1.安装Canal +CanalSharp作为Canal的客户端,其应用场景就是Canal的应用场景。关于应用场景在Canal介绍一节已有概述。举一些实际的使用例子: + +1.代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。 + +2.根据数据库的变更实时更新搜索引擎,比如电商场景下商品信息发生变更,实时同步到商品搜索引擎 Elasticsearch、solr等 + +3.根据数据库的变更实时更新缓存,比如电商场景下商品价格、库存发生变更实时同步到redis + +4.数据库异地备份、数据同步 + +5.根据数据库变更触发某种业务,比如电商场景下,创建订单超过xx时间未支付被自动取消,我们获取到这条订单数据的状态变更即可向用户推送消息。 + +6.将数据库变更整理成自己的数据格式发送到kafka等消息队列,供消息队列的消费者进行消费。 + +## 三.工作原理 + +CanalSharp 是 Canal 的 .NET 客户端,它与 Canal 是采用的Socket来进行通信的,传输协议是TCP,交互协议采用的是 Google Protocol Buffer 3.0。 + +## 四.工作流程 + +1.Canal连接到mysql数据库,模拟slave + +2.CanalSharp与Canal建立连接 + +2.数据库发生变更写入到binlog + +5.Canal向数据库发送dump请求,获取binlog并解析 + +4.CanalSharp向Canal请求数据库变更 + +4.Canal发送解析后的数据给CanalSharp + +5.CanalSharp收到数据,消费成功,发送回执。(可选) + +6.Canal记录消费位置。 + +以一张图来表示: + +![1537860226808](assets/668104-20180925182816462-2110152563.png) + +## 五.快速入门 + +### 1.安装Canal Canal的安装以及配置使用请查看 https://github.com/alibaba/canal/wiki/QuickStart -2.建立一个.NET Core App项目 +### 2.建立一个.NET Core 控制台项目 -3.为该项目从 Nuget 安装 CanalSharp +### 3.为该项目从 Nuget 安装 CanalSharp ````shell Install-Package CanalSharp.Client ```` -4.建立与Canal的连接 +### 4.建立与Canal的连接 ````csharp //canal 配置的 destination,默认为 example @@ -41,15 +83,59 @@ var connector = CanalConnectors.NewSingleConnector("127.0.0.1", 11111, destinati connector.Connect(); //订阅,同时传入Filter,如果不传则以Canal的Filter为准。Filter是一种过滤规则,通过该规则的表数据变更才会传递过来 connector.Subscribe(".*\\\\..*"); -//获取消息并且需要发送Ack表示消费成功 +//获取数据但是不需要发送Ack来表示消费成功 connector.Get(batchSize); -//获取消息但是不需要发送Ack来表示消费成功 +//获取数据并且需要发送Ack表示消费成功 connector.GetWithoutAck(batchSize); ```` -更多详情请查看 Sample +更多详情请查看 [Sample](https://github.com/CanalSharp/CanalSharp/tree/master/sample/CanalSharp.SimpleClient) + +## 六.通过docker方式快速运行CanalSharp + +### 1.执行命令通过docker方式运行 mysql与canal + +````shell +git clone https://github.com/CanalSharp/CanalSharp.git +cd docker +docker-compose up -d +```` + +### 2.使用navicat等数据库管理工具连接mysql + +ip:运行docker的服务器ip + +mysql用户:root + +mysql密码:000000 + +mysql端口:4406 + +默认提供了一个test数据库,然后有一张名为test的表。 + +![1537866852816](assets/668104-20180925182815646-1209020640.png) + +### 3.运行Sample项目 + +### 4.测试 + +执行下列sql: + +````sql +insert into test values(1000,'111'); +update test set name='222' where id=1000; +delete from test where id=1000; +```` + +![](assets/ys.gif) + +可以看见我们分别执行 insert、update、delete 语句,我们的CanalSharp都获取到了数据库变更。 + +## 七.接下来的工作 + +CanalSharp集群支持 -## 三.贡献代码 +## 八.贡献代码 1.fork本项目 diff --git a/assets/668104-20180925182815646-1209020640.png b/assets/668104-20180925182815646-1209020640.png new file mode 100644 index 0000000..176e825 Binary files /dev/null and b/assets/668104-20180925182815646-1209020640.png differ diff --git a/assets/668104-20180925182816462-2110152563.png b/assets/668104-20180925182816462-2110152563.png new file mode 100644 index 0000000..de72293 Binary files /dev/null and b/assets/668104-20180925182816462-2110152563.png differ diff --git a/assets/ys.gif b/assets/ys.gif new file mode 100644 index 0000000..b100741 Binary files /dev/null and b/assets/ys.gif differ diff --git a/sample/CanalSharp.SimpleClient/Program.cs b/sample/CanalSharp.SimpleClient/Program.cs index 21d5f54..f835abf 100644 --- a/sample/CanalSharp.SimpleClient/Program.cs +++ b/sample/CanalSharp.SimpleClient/Program.cs @@ -21,8 +21,9 @@ static void Main(string[] args) connector.Subscribe(".*\\\\..*"); while (true) { - //获取消息数据 - var message = connector.Get(5000); + //获取数据 1024表示数据大小 单位为字节 + var message = connector.Get(1024); + //批次id 可用于回滚 var batchId = message.Id; if (batchId == -1 || message.Entries.Count <= 0) { @@ -34,6 +35,10 @@ static void Main(string[] args) } } + /// + /// 输出数据 + /// + /// 一个entry表示一个数据库变更 private static void PrintEntry(List entrys) { foreach (var entry in entrys) @@ -47,6 +52,7 @@ private static void PrintEntry(List entrys) try { + //获取行变更 rowChange = RowChange.Parser.ParseFrom(entry.StoreValue); } catch (Exception e) @@ -56,10 +62,13 @@ private static void PrintEntry(List entrys) if (rowChange != null) { + //变更类型 insert/update/delete 等等 EventType eventType = rowChange.EventType; + //输出binlog信息 表名 数据库名 变更类型 Console.WriteLine( $"================> binlog[{entry.Header.LogfileName}:{entry.Header.LogfileOffset}] , name[{entry.Header.SchemaName},{entry.Header.TableName}] , eventType :{eventType}"); + //输出 insert/update/delete 变更类型列数据 foreach (var rowData in rowChange.RowDatas) { if (eventType == EventType.Delete) @@ -83,10 +92,15 @@ private static void PrintEntry(List entrys) } } + /// + /// 输出每个列的详细数据 + /// + /// private static void PrintColumn(List columns) { foreach (var column in columns) { + //输出列明 列值 是否变更 Console.WriteLine($"{column.Name} : {column.Value} update= {column.Updated}"); } }