New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Etcd Raft使用入门及原理解析 #30

Open
aCoder2013 opened this Issue Feb 3, 2019 · 0 comments

Comments

Projects
None yet
1 participant
@aCoder2013
Copy link
Owner

aCoder2013 commented Feb 3, 2019

什么是Raft

Raft是一个分布式一致性算法,充分的利用了可复制状态机以及日志。其最核心的设计目标就是易于理解。在性能、错误容错等方面来看有点类似Paxos,但不同之处在于,Raft论文较为清晰的描述了其主要流程以及其中一些细节问题,而Paxos我们知道非常难以理解。

当构建一个分布式系统时,一个非常重要的设计目标就是fault tolerance。如果系统基于Raft协议实现,那么当其中一个节点挂掉,或者发生了网络分区等异常情况时,只要大多数节点仍然能够正常通讯,整个集群就能够正常对外提供服务而不会挂掉。

关于Raft更多的细节,这里建议直接阅读论文: "In Search of an Understandable Consensus Algorithm"

介绍

Etcd的Raft库已经在生产环境得到了非常广泛的应用,有力的支撑了etcd、K8S、Docker Swarm、TiDB/TiKV等分布式系统的构建,当你能够熟练的使用一个成熟的Raft库、甚至如果能够自己实现一个,那会有种'有了锤子,干什么都是钉子'的感觉。

特性

Etcd raft基本上已经实现了Raft协议的完整特性,包括:

  • Leader选举
  • 日志复制
  • 日志压缩
  • 成员变更
  • Leader和Follower都支持高效的线性只读查询请求
  • 通过batch、pipeline等手段优化日志复制、网络IO的延迟

概览

etcd的raft实现都在etcd/raft目录下,但是大部分的实现都在下面几个比较核心的文件:

  • raft.go: 从名字也可以看出来,这个是最核心的部分,比如leader选择的逻辑、raft消息的处理逻辑等
  • node.go: 可以理解为raft集群的一个节点,客户端也主要是这个类打交道,比如心跳的逻辑、propose、状态机、成员变更等都是这个类负责处理。
  • log.go: raft日志相关的代码,比如保存日志记录
  • raft.proto: 定义了raft一些核心的RPC数据结构,由于protobuf是跨语言的,因此如果想用其他语言重写etcd raft,那么至少这部分内容都是可以复用的

用法

客户端主要使用Node和raft集群交互,首先需要启动一个raft集群,有两种方式:

  • 启动一个全新的raft集群
  • 加入一个已经存在的raft集群(节点重启、扩容、缩容)

启动一个三节点的集群:

 storage := raft.NewMemoryStorage()
  c := &Config{
    //代表一个节点的ID,必须唯一,并且不能为0,不能重复利用,和zookeeper的id类似
    ID:              0x01, 
    ElectionTick:    10, 
    HeartbeatTick:   1,
    Storage:         storage,
    MaxSizePerMsg:   4096,
    MaxInflightMsgs: 256,
  }

 //设置节点列表
  n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})

这里需要强调一个点,etcd的raft实现并不包括网络部分,网络通讯部分需要使用者自己实现,因此这里节点列表传入的是ID,而ip:port到id的映射需要库使用者自己实现。
如果让一个新的节点加入集群,那么就不需要传入节点列表,首先通过ProposeConfChange RPC发起一个成员变更请求,在任意一个raft集群节点都可以,然后启动这个节点:

  //配置参考上文中的代码段
  n := raft.StartNode(c, nil)

如果是重启一个节点,那么这里需要注意,我们需要恢复这个节点之前的状态,比如当前term、根据快照和日志恢复状态机等:

 storage := raft.NewMemoryStorage()

  // Recover the in-memory storage from persistent snapshot, state and entries.
  // 根据快照、entry日志等恢复当前raft节点到之前的状态
  storage.ApplySnapshot(snapshot)
  storage.SetHardState(state)
  storage.Append(entries)

  c := &Config{
    ID:              0x01,
    ElectionTick:    10,
    HeartbeatTick:   1,
    Storage:         storage,
    MaxSizePerMsg:   4096,
    MaxInflightMsgs: 256,
  }

  // Restart raft without peer information.
  // Peer information is already included in the storage.
  // 重启该raft节点,此时不用传入任何节点相关信息,因为已经在刚刚的恢复过程中填充好了
  n := raft.RestartNode(c)

当raft集群启动完成后,对于一个raft节点,用户需要做几件事情,伪码如下:

for {
    select {
    case <-s.Ticker:
      n.Tick()
    case rd := <-s.Node.Ready():
      saveToStorage(rd.HardState, rd.Entries, rd.Snapshot)
      send(rd.Messages)
      if !raft.IsEmptySnap(rd.Snapshot) {
        processSnapshot(rd.Snapshot)
      }
      for _, entry := range rd.CommittedEntries {
        process(entry)
        if entry.Type == raftpb.EntryConfChange {
          var cc raftpb.ConfChange
          cc.Unmarshal(entry.Data)
          s.Node.ApplyConfChange(cc)
        }
      }
      s.Node.Advance()
    case <-s.done:
      return
    }
  }
case <-s.Ticker

库使用者需要定时调用tick()方法,根据节点当前的角色调用对应的逻辑:

  • 心跳, leader需要定时发送心跳包给follower
  • 选举,如果一定时间没有收到leader的心跳,则转换为候选者,竞选leader
case rd := <-s.Node.Ready(): 处理Ready

Ready封装了可以准备开始读取的entries、messages,需要保存到持久化介质、同步给其他节点:

type Ready struct {
	// The current volatile state of a Node.
	// SoftState will be nil if there is no update.
	// It is not required to consume or store SoftState.
	*SoftState

	// The current state of a Node to be saved to stable storage BEFORE
	// Messages are sent.
	// HardState will be equal to empty state if there is no update.
	pb.HardState

	// ReadStates can be used for node to serve linearizable read requests locally
	// when its applied index is greater than the index in ReadState.
	// Note that the readState will be returned when raft receives msgReadIndex.
	// The returned is only valid for the request that requested to read.
	ReadStates []ReadState

	// Entries specifies entries to be saved to stable storage BEFORE
	// Messages are sent.
	Entries []pb.Entry

	// Snapshot specifies the snapshot to be saved to stable storage.
	Snapshot pb.Snapshot

	// CommittedEntries specifies entries to be committed to a
	// store/state-machine. These have previously been committed to stable
	// store.
	CommittedEntries []pb.Entry

	// Messages specifies outbound messages to be sent AFTER Entries are
	// committed to stable storage.
	// If it contains a MsgSnap message, the application MUST report back to raft
	// when the snapshot has been received or has failed by calling ReportSnapshot.
	Messages []pb.Message

	// MustSync indicates whether the HardState and Entries must be synchronously
	// written to disk or if an asynchronous write is permissible.
	MustSync bool
}
  • 调用 Node.Ready(),处理当前raft节点的状态,其中有些步骤可以并行执行
    • 将entries、HardState、快照按照顺序写到持久化介质中,底层存储介质支持原子写入,那么也可以一次性将他们写入
    • 将所有的消息发送给远程节点,但一定要先将最近的HardState、上一轮Ready中的entries都持久化之后(可以和同一轮的entries持久化并行执行)。如果有类型为MsgSnap的消息,在这个消息发送成功之后,需要调用Node.ReportSnapshot()
    • 如果有快照的话需要和已提交的entries一起应用到状态机(库使用者提供),如果已经提交的entries中包含EntryConfChange,那么需要调用Node.ApplyConfChange() 将节点的变更信息同步到本节点
  • 调用Node.Advance()通知节点,表明本轮Ready已经处理完毕,可以开始处理下一轮。

另外还需要注意,由于网络部分需要库使用者自己实现,因此当收到一条消息的时候,需要将该消息转发给raft节点:

	func recvRaftRPC(ctx context.Context, m raftpb.Message) {
		n.Step(ctx, m)
	}

发起提议

如果需要向raft集群发起一个提议,那么需要用下面这种方式:

        // 协议的数据持久化成字节数组
	n.Propose(ctx, data)

如果找个提议处理完成(已经持久化到持久化介质并同步到其他节点),那么就可以通过Ready的comitedEntries获取到,类型是raftpb.EntryNormal, 然后用户就可以根据自己的业务逻辑,将其应用到状态机中。

raft集群不保证该协议一定能够处理成功,若一定超时时间内,还未收到响应,那么需要根据业务场景考虑是否需要重试。

节点变更

如果需要对raft集群扩容或缩容,那么需要构造ConfChange,并调用:

	n.ProposeConfChange(ctx, cc)

如果该变更请求处理成功,那么在commitedEntries中会有一条类型为 raftpb.EntryConfChange的记录,

        var cc raftpb.ConfChange
	cc.Unmarshal(data)
	n.ApplyConfChange(cc)

需要自己实现的部分

etcd的raft已经实现了大部分的功能,但是还是有几个组件需要使用者自己根据业务场景实现:

  • 网络通讯部分
  • Write ahead log
  • 快照

网络通讯部分

网络部分说白了就是消息的收发,你可以理解为raft只依赖了接口,这个接口实现了两个方法: sendreceive,但是具体的实现需要库使用者自己写,这部分相对比较简单,使用RPC、HTTP、自定义协议都可以,具体的实现逻辑可以参考etcd自己的代码

Write-Ahead-Log(WAL)

如上文中提到的,用户需要保存Ready中的一些状态,比如entries、hardstate等,WAL有很多分布式系统都实现了,基本上参考他们的实现,结合自己的业务实现一个难度不会很大,如果是直接使用etcd raft库,那么可以直接基于etcd中wal的实现,另外也可以基于RocksDB等嵌入式KV实现,但是对于key-value的结构设计要考虑好,wal的原理后面有时间再叙述。

快照

快照应该都知道,比如说Redis的持久化,有一种模式是保存用户发过来的命令,但时间长了之后,这个日志会变的越来越大,这个时候当你扩容、重启节点的时候,加载这个文件会耗费很长时间,导致服务不可用,因此需要将内存中的状态持久化到磁盘中。
比如:

incr index 
incr index

这个时候index的值为2,当然这个例子只有两条命令,但假如说有一千万条记录,那么重放日志需要耗费很长时间,因此我们可以直接将index:2这个kv对写到磁盘中,那么这个时候之前对这个key的一千万条操作日志就变成了这一条记录。
那么raft的快照其实也类似,应用需要将自己状态机的当前快照,持久化成一个快照文件,并写入磁盘中,我们知道这个过程会非常慢,因此可以考虑和其他过程并行执行,以及其他的一些性能优化,这个后面的博客再写。
简单来实现的话,我们直接将状态机用json序列化成一个字节数组,并写入到本地文件中,后续读取的时候。

如何基于raft实现一个简单的分布式KV存储

这里简单描述一下流程,只是为了更容易理解etcd raft的使用方法,后面会再写篇博客详细记录:

  • 应用实现自己的状态机,处理快照、已提交日志、WAL等
  • 当用户发起一个put请求时,将该请求序列化成字节数组,propose到raft集群
  • 处理成功后,会出现在commitedEntries中,解析该entry,回放到状态机中,这个时候该请求的结果已经可以在所有的raft节点上查询到了
  • 用户发起查询请求,直接在用户封装的状态机中查询,并返回给用户

总结

本文只是简单描述了下etcd raft的使用方法,总的来说etcd raft的实现已经非常完善,但还是需要用户自己处理非常多的细节,比如网络、write aheadlog等,如果对raft不熟悉,相信会很难上手,我的想法是能够在其之上再封装一层,提供一个状态机接口,用户只需要关心自己的业务逻辑,其他的全部都交给库来处理。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment