Skip to content
This repository has been archived by the owner on Dec 7, 2018. It is now read-only.

Commit

Permalink
Raft::Peer is now transport-independent.
Browse files Browse the repository at this point in the history
Adds a dependency on celluloid-io for now as it features a nice
`Celluloid::IO::Stream::Latch`. :-)

The ZMQ RPC implementation is now in Raft::RPC::ZMQ.

Raft::RPC::InMemory is a RPC implementation for communication between
actors in the same process - probably only useful for testing and
development.
  • Loading branch information
aflatter committed May 21, 2013
1 parent 993eef5 commit a406c1c
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 108 deletions.
1 change: 1 addition & 0 deletions .rspec
@@ -0,0 +1 @@
--fail-fast --backtrace --require spec_helper
5 changes: 4 additions & 1 deletion lib/raft.rb
@@ -1,5 +1,8 @@
require 'celluloid'
require "raft/version"

module Raft
# Your code goes here...
class Error < StandardError; end
class TimeoutError < Error; end
class ServerUnavailableError < Error; end
end
2 changes: 1 addition & 1 deletion lib/raft/node.rb
Expand Up @@ -78,7 +78,7 @@ def id
# Returns peers in the cluster.
# @return [Array<Raft::Peer>]
def peers
@peers ||= options[:peers].map { |peer| link(Raft::Peer.new(peer)) }
@peers ||= options[:peers].map { |peer| Raft::Peer.new(peer) }
end

# Returns the cluster's quorum.
Expand Down
41 changes: 9 additions & 32 deletions lib/raft/peer.rb
@@ -1,53 +1,30 @@
require 'raft'
require 'celluloid/zmq'

# A peer is a remote node within the same cluster.
class Raft::Peer
include Celluloid::ZMQ
include Celluloid::Logger

exclusive :call

# @return [String] Remote address of the peer.
attr_accessor :id

# @return [Celluloid::ZMQ::ReqSocket]
attr_accessor :socket
# attr_accessor :socket

def initialize(id)
self.id = id
connect
end
# @return [Raft::RPC::Client]
attr_accessor :client

def connect
socket = ReqSocket.new
socket.connect(id)
self.socket = socket
end
def initialize(id, opts = {})
self.id = id

def disconnect
socket.close if socket
self.socket = nil
client_class = opts[:client_class] || Raft::RPC::Client::ZMQ
self.client = client_class.new(id)
end

def append_entries(payload)
call(:append_entries, payload)
client.call(:append_entries, payload)
end

def request_vote(payload)
call(:vote_request, payload)
end

def call(command, payload)
socket.send(encode_request(command, payload))
decode_response(socket.read)
end

def encode_request(command, payload)
"#{command}:#{Marshal.dump(payload)}"
end

def decode_response(response)
Marshal.load(response)
client.call(:vote_request, payload)
end
end
17 changes: 16 additions & 1 deletion lib/raft/rpc.rb
@@ -1,5 +1,20 @@
require 'raft'

module Raft::RPC
require 'raft/rpc/server'
class Client
def call(command, payload)
raise NotImplementedError
end
end

# Listens to a ZMQ Socket and handles commands from peers.
class Server
attr_accessor :address
attr_accessor :handler

def initialize(address, &handler)
self.address = address
self.handler = handler
end
end
end
39 changes: 39 additions & 0 deletions lib/raft/rpc/in_memory.rb
@@ -0,0 +1,39 @@
require 'raft/rpc'

class Raft::RPC::InMemory
class Client < Raft::RPC::Client
attr_accessor :address

def initialize(address)
self.address = address
end

def call(command, payload)
begin
actor.execute(command, payload)
rescue Celluloid::DeadActorError
raise Raft::ServerUnavailableError
end
end

def actor
Celluloid::Actor[address]
end
end

class Server < Raft::RPC::Server
include Celluloid

execute_block_on_receiver :initialize

def initialize(address, &handler)
super

Actor[address] = Actor.current
end

def execute(command, payload)
handler.call(command, payload)
end
end
end
73 changes: 0 additions & 73 deletions lib/raft/rpc/server.rb

This file was deleted.

101 changes: 101 additions & 0 deletions lib/raft/rpc/zmq.rb
@@ -0,0 +1,101 @@
require 'raft/rpc'
require 'celluloid/io/stream'
require 'celluloid/zmq'

class Raft::RPC::ZMQ
class Client < Raft::RPC::Client
# @return [Celluloid::IO::Stream::Latch]
attr_accessor :latch

# @return [Celluloid::ZMQ::ReqSocket]
attr_accessor :socket

def initialize(address)
self.latch = Celluloid::IO::Stream::Latch.new

This comment has been minimized.

Copy link
@halorgium

halorgium May 28, 2013

Contributor

We can move this to Celluloid-proper.
It is a good primitive.

This comment has been minimized.

Copy link
@aflatter

aflatter May 28, 2013

Author Member

Would love to see that.

self.socket = Celluloid::ZMQ::ReqSocket.new
socket.connect(address)
end

def call(command, payload)
request = encode_request(command, payload)
response = latch.synchronize do
socket.send(request)
socket.read
end
decode_response(response)
end

def encode_request(command, payload)
"#{command}:#{Marshal.dump(payload)}"
end

def decode_response(response)
Marshal.load(response)
end

def disconnect
socket.close if socket
end
end

class Server < Raft::RPC::Server
include Celluloid::ZMQ
include Celluloid::Logger

attr_accessor :socket

execute_block_on_receiver :initialize
finalizer :finalize

def initialize(address, &handler)
super
async.run
end

def run
self.socket = RepSocket.new

begin
info("Binding to #{address}")
socket.bind(address)
rescue IOError
socket.close
raise
end

async.loop!
end

def loop!
loop { handle(socket.read) }
end

# @param [String] request A request string containing command and payload separated by a colon.
def handle(request)
p request
command, payload = decode_request(request)
response = handler.call(command, payload)
socket.send(encode_response(response))
end

def encode_response(response)
Marshal.dump(response)
end

def decode_request(request)
command, payload = request.split(':', 2)
payload = Marshal.load(payload)

[command.to_sym, payload]
end

def finalize
socket.close if socket
end

def terminate
super
socket.close if socket
end
end
end
Empty file added log/.gitkeep
Empty file.
1 change: 1 addition & 0 deletions raft.gemspec
Expand Up @@ -17,4 +17,5 @@ Gem::Specification.new do |gem|

gem.add_development_dependency 'rspec'
gem.add_runtime_dependency 'celluloid-zmq'
gem.add_runtime_dependency 'celluloid-io'
end
71 changes: 71 additions & 0 deletions spec/functional/rpc_spec.rb
@@ -0,0 +1,71 @@
require 'raft/rpc'
require 'raft/rpc/zmq'
require 'raft/rpc/in_memory'

class TestActor
include Celluloid

execute_block_on_receiver :exec

def exec
yield
end
end

shared_examples 'an RPC implementation' do
def actor_run(&block)
actor = TestActor.new
result = actor.exec(&block)
actor.terminate
result
end

let(:server_class) { described_class::Server }
let(:client_class) { described_class::Client }

let(:command) { :command }
let(:payload) { Hash[key: 'value'] }

it 'executes calls' do
server = server_class.new(address) { |command, payload| [command, payload] }
client = client_class.new(address)
actor_run { client.call(command, payload) }.should eq([command, payload])
end

it 'executes multiple calls sequentially' do
calls = 3.times.map { |i| [:command, i] }
server = server_class.new(address) { |command, payload| [command, payload] }
client = client_class.new(address)
actor_run { calls.map { |args| client.call(*args) } }.should eq(calls)
end

it 'raises an error if server is unavailable' do
p 'foo'
expect do
client = client_class.new(address)
actor_run { client.call(:command, :payload) }
end.to raise_error(Raft::ServerUnavailableError)
end
end

describe Raft::RPC::InMemory do
let(:address) { :node1 }

before(:each) do
Celluloid.shutdown
Celluloid.boot
end

it_should_behave_like 'an RPC implementation'
end

describe Raft::RPC::ZMQ do
let(:address) { 'tcp://127.0.0.1:12345' }

before(:each) do
Celluloid.shutdown
Celluloid.boot
end

it_should_behave_like 'an RPC implementation'
end

3 comments on commit a406c1c

@halorgium
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious about the Raft::RPC::ZMQ::Server instance in Raft::Node.
Is this transport-independent?

@aflatter
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're saying that this should be configurable, right? That's one of the things on the clean-up list. Maybe a ':transport' option...

@halorgium
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, OK. I had thought that the Client and Server would be bound together.
All good.

Please sign in to comment.