Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 95 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,63 @@ CanalSharp 是阿里巴巴开源项目 Canal 的 .NET 客户端。为 .NET 开

关于 Canal 的更多信息请访问 https://github.com/alibaba/canal/wiki

## 二.如何使用
## 二.应用场景

1.安装Canal
CanalSharp作为Canal的客户端,其应用场景就是Canal的应用场景。关于应用场景在Canal介绍一节已有概述。举一些实际的使用例子:

1.代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。

2.根据数据库的变更实时更新搜索引擎,比如电商场景下商品信息发生变更,实时同步到商品搜索引擎 Elasticsearch、solr等

3.根据数据库的变更实时更新缓存,比如电商场景下商品价格、库存发生变更实时同步到redis

4.数据库异地备份、数据同步

5.根据数据库变更触发某种业务,比如电商场景下,创建订单超过xx时间未支付被自动取消,我们获取到这条订单数据的状态变更即可向用户推送消息。

6.将数据库变更整理成自己的数据格式发送到kafka等消息队列,供消息队列的消费者进行消费。

## 三.工作原理

CanalSharp 是 Canal 的 .NET 客户端,它与 Canal 是采用的Socket来进行通信的,传输协议是TCP,交互协议采用的是 Google Protocol Buffer 3.0。

## 四.工作流程

1.Canal连接到mysql数据库,模拟slave

2.CanalSharp与Canal建立连接

2.数据库发生变更写入到binlog

5.Canal向数据库发送dump请求,获取binlog并解析

4.CanalSharp向Canal请求数据库变更

4.Canal发送解析后的数据给CanalSharp

5.CanalSharp收到数据,消费成功,发送回执。(可选)

6.Canal记录消费位置。

以一张图来表示:

![1537860226808](assets/668104-20180925182816462-2110152563.png)

## 五.快速入门

### 1.安装Canal

Canal的安装以及配置使用请查看 https://github.com/alibaba/canal/wiki/QuickStart

2.建立一个.NET Core App项目
### 2.建立一个.NET Core 控制台项目

3.为该项目从 Nuget 安装 CanalSharp
### 3.为该项目从 Nuget 安装 CanalSharp

````shell
Install-Package CanalSharp.Client
````

4.建立与Canal的连接
### 4.建立与Canal的连接

````csharp
//canal 配置的 destination,默认为 example
Expand All @@ -41,15 +83,59 @@ var connector = CanalConnectors.NewSingleConnector("127.0.0.1", 11111, destinati
connector.Connect();
//订阅,同时传入Filter,如果不传则以Canal的Filter为准。Filter是一种过滤规则,通过该规则的表数据变更才会传递过来
connector.Subscribe(".*\\\\..*");
//获取消息并且需要发送Ack表示消费成功
//获取数据但是不需要发送Ack来表示消费成功
connector.Get(batchSize);
//获取消息但是不需要发送Ack来表示消费成功
//获取数据并且需要发送Ack表示消费成功
connector.GetWithoutAck(batchSize);
````

更多详情请查看 Sample
更多详情请查看 [Sample](https://github.com/CanalSharp/CanalSharp/tree/master/sample/CanalSharp.SimpleClient)

## 六.通过docker方式快速运行CanalSharp

### 1.执行命令通过docker方式运行 mysql与canal

````shell
git clone https://github.com/CanalSharp/CanalSharp.git
cd docker
docker-compose up -d
````

### 2.使用navicat等数据库管理工具连接mysql

ip:运行docker的服务器ip

mysql用户:root

mysql密码:000000

mysql端口:4406

默认提供了一个test数据库,然后有一张名为test的表。

![1537866852816](assets/668104-20180925182815646-1209020640.png)

### 3.运行Sample项目

### 4.测试

执行下列sql:

````sql
insert into test values(1000,'111');
update test set name='222' where id=1000;
delete from test where id=1000;
````

![](assets/ys.gif)

可以看见我们分别执行 insert、update、delete 语句,我们的CanalSharp都获取到了数据库变更。

## 七.接下来的工作

CanalSharp集群支持

## .贡献代码
## .贡献代码

1.fork本项目

Expand Down
Binary file added assets/668104-20180925182815646-1209020640.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added assets/668104-20180925182816462-2110152563.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added assets/ys.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
18 changes: 16 additions & 2 deletions sample/CanalSharp.SimpleClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ static void Main(string[] args)
connector.Subscribe(".*\\\\..*");
while (true)
{
//获取消息数据
var message = connector.Get(5000);
//获取数据 1024表示数据大小 单位为字节
var message = connector.Get(1024);
//批次id 可用于回滚
var batchId = message.Id;
if (batchId == -1 || message.Entries.Count <= 0)
{
Expand All @@ -34,6 +35,10 @@ static void Main(string[] args)
}
}

/// <summary>
/// 输出数据
/// </summary>
/// <param name="entrys">一个entry表示一个数据库变更</param>
private static void PrintEntry(List<Entry> entrys)
{
foreach (var entry in entrys)
Expand All @@ -47,6 +52,7 @@ private static void PrintEntry(List<Entry> entrys)

try
{
//获取行变更
rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
}
catch (Exception e)
Expand All @@ -56,10 +62,13 @@ private static void PrintEntry(List<Entry> entrys)

if (rowChange != null)
{
//变更类型 insert/update/delete 等等
EventType eventType = rowChange.EventType;
//输出binlog信息 表名 数据库名 变更类型
Console.WriteLine(
$"================> binlog[{entry.Header.LogfileName}:{entry.Header.LogfileOffset}] , name[{entry.Header.SchemaName},{entry.Header.TableName}] , eventType :{eventType}");

//输出 insert/update/delete 变更类型列数据
foreach (var rowData in rowChange.RowDatas)
{
if (eventType == EventType.Delete)
Expand All @@ -83,10 +92,15 @@ private static void PrintEntry(List<Entry> entrys)
}
}

/// <summary>
/// 输出每个列的详细数据
/// </summary>
/// <param name="columns"></param>
private static void PrintColumn(List<Column> columns)
{
foreach (var column in columns)
{
//输出列明 列值 是否变更
Console.WriteLine($"{column.Name} : {column.Value} update= {column.Updated}");
}
}
Expand Down