Skip to content

farseer-go/etcd

Repository files navigation

etcd client

包:"github.com/farseer-go/etcd"

模块:etcd.Module

go-version Build

概述

etcd是比较流行的分布式组件之一,另外还有Zookeeper、Eureka、Nacos、Consul

常用于在分布式平台中的服务注册与发现的场景。

它有部署简单、使用方便、轻量等特性

在GO阵营里面,一般会优先选择etcd,因为它是采用GO语言编写的。

farseer-go/etcd可以让应用系统更加优雅的使用etcd:

  • 配置自动化:通过配置,可以很方便的快速接入ETCD服务
  • 容器化操作:使用IOC,我们可以很方便就能取到client。
  • 优雅的使用:将常用的操作,整合到几个方法中,更加简单使用。

配置

./farseer.yaml

Etcd:
  default1: "Server=127.0.0.1:2379|127.0.0.1:2379,DialTimeout=5000,Username=test,Password=test"
  default2: "Server=127.0.0.1:2379|127.0.0.1:2379,DialTimeout=5000,Username=test,Password=test"

配置文件支持同时配置多个不同服务端、不同的交换器设置。default1、default2是自定义的名称,同时也是IOC的别名

配置说明:

type etcdConfig struct {
	Server               string // 服务端地址
	DialTimeout          int    // 连接超时时间(ms)
	DialKeepAliveTime    int    // 对服务器进行ping的时间(ms)
	DialKeepAliveTimeout int    // 客户端等待响应的超时时间(ms)
	MaxCallSendMsgSize   int    // 客户端的请求发送限制,单位是字节。0,则默认为2.0 MiB(2 * 1024 * 1024)。
	MaxCallRecvMsgSize   int    // 客户端的响应接收限制,单位是字节。0,则默认不限制
	Username             string // 用户名
	Password             string // 密码
	RejectOldCluster     bool   // 拒绝过时的集群创建客户端。
	PermitWithoutStream  bool   // 允许客户端在没有任何活动流(RPC)的情况下向服务器发送keepalive pings。
}

配置的属性之间用,隔开组合成一个字符串,将被解析成etcdConfig对象。

Put

保存KV

client := container.Resolve[etcd.IClient]("default1") // 取出default1的配置服务端
putRsp, err := client.Put("/test/a1", "1")

通过container.Resolve容器取出etcd.IClient接口的实现。

参数值default1,是在./farseer.yaml中配置节点,意味着使用default1的配置服务端

Get

可以支持按KEY完整匹配,或者按KEY的前缀匹配。

// 根据KEY完整切尔西
client := container.Resolve[etcd.IClient]("default1")
result, err := client.Get("/test/a1")
flog.Info(result.Value) // print:1

// 根据KEY前缀匹配
results, err := client.GetPrefixKey("/test")
flog.Info(results["/test/a1"].Value)    // print:1

Exists

判断KEY是否存在

client := container.Resolve[etcd.IClient]("default1")
client.Exists("/test/a1")

Delete

删除KEY

client := container.Resolve[etcd.IClient]("default1")
_, _ = client.Delete("/test/a1")

Watch

监控指定的KEY(即使KEY还没有创建也可以先监控起来)

当这个KEY有任何的变化时,我们都可以拿到最新的数据状态

client := container.Resolve[etcd.IClient]("default1")
ctx, cancelFunc := context.WithCancel(context.Background())
// 指定KEY
client.Watch(ctx, "/test/a1", func(event etcd.WatchEvent) {
    flog.Info(event.Kv.Value)   // value
    flog.Info(event.IsModify())
    flog.Info(event.IsCreate())
})

// 指定KEY前缀
client.WatchPrefixKey(ctx, "/test/", func(event etcd.WatchEvent) {
    flog.Info(event.Kv.Value)   // value
    flog.Info(event.IsModify())
    flog.Info(event.IsCreate())
})

Lock

分布式锁

client := container.Resolve[etcd.IClient]("default1")

unLock1, _ := client.Lock("/lock/1", 3)
flog.Info("上锁:unLock1")

go func() {
    time.Sleep(1000 * time.Millisecond)
    flog.Info("解锁:unLock1")
    unLock1()
}()

unLock2, _ := client.Lock("/lock/1", 3)
flog.Info("上锁:unLock2")

unLock2()
flog.Info("解锁:unLock2")

打印结果:

2023-01-22 01:44:33 [Info] 上锁:unLock1
2023-01-22 01:44:34 [Info] 解锁:unLock1
2023-01-22 01:44:34 [Info] 上锁:unLock2
2023-01-22 01:44:34 [Info] 解锁:unLock2

同一时刻同一个KEY,只能有一个客户端能上锁成功。使用完后,需调用unLock()解锁

租约

client := container.Resolve[etcd.IClient]("default1")

// 先创建一个保持10秒的租约,拿到租约ID
leaseID, _ := client.LeaseGrant(10)

// 在给KV保存的时候,带上这个leaseID
_, _ = client.PutLease("/test/lease1", "1", leaseID)

此后这个租约将在11秒后过期。(TTL + 1 秒)

自动持续续约:

ctx, cancelFunc := context.WithCancel(context.Background())
_ = client.LeaseKeepAlive(ctx, leaseID)

此时,KEY:/test/lease1,将会自动续约,不再需要我们手动去执行续约操作。

只续约一次:

_ = client.LeaseKeepAliveOnce(leaseID)

此时,只会续约一次,每次续约为10秒(创建租约时传了10)

使用原生客户端

有时候我们需要原生的client执行更多操作时,可以使用Original方法

client := container.Resolve[etcd.IClient]("default")
client.Original()

将返回:etcdV3.Client对象

最后

以上是常用的方法。完整的方法列表,可以查看:etcd.IClient接口