Skip to content

Commit

Permalink
[Compatibility] Multiple Node's support, part 1.
Browse files Browse the repository at this point in the history
  • Loading branch information
3Hren committed Jun 11, 2015
1 parent b625997 commit 9d680e3
Showing 1 changed file with 86 additions and 63 deletions.
149 changes: 86 additions & 63 deletions lib/cocaine/cocaine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,53 @@ def endpoints=(endpoints)
end

module RPC
CONTROL_CHANNEL = 1
module Version1
CONTROL_CHANNEL = 1

HANDSHAKE = 0
class Dispatcher
def handshake(uuid)
[CONTROL_CHANNEL, 0, [@uuid]]
end

def heartbeat
[CONTROL_CHANNEL, 1, []]
end

def terminate(errno, reason)
[CONTROL_CHANNEL, 2, [errno, reason]]
end

def process(span, id)
case id
when 1
:heartbeat
when 2
:terminate
when 3
:invoke
when 4
:chunk
when 5
:error
when 6
:choke
else
:unknown
end
end
end
end

module Version2
end

HEARTBEAT = 0
TERMINATE = 1
def self.dispatcher(version)
if version == 0
Version1::Dispatcher.new
end

INVOKE = 0
raise Exception.new 'unsupported version number'
end

CHUNK = 0
ERROR = 1
Expand Down Expand Up @@ -203,7 +242,7 @@ class DefinedService < Meta

def initialize(name, endpoints, dispatch)
@name = name
@dispatch = dispatch
@framing = dispatch
@counter = 1
@sessions = Hash.new

Expand Down Expand Up @@ -264,7 +303,7 @@ def received(span, id, payload, *extra)
def invoke(id, *args)
reinitialize if @socket.nil?

method, txtree, rxtree = @dispatch[id]
method, txtree, rxtree = @framing[id]
LOG.debug "Invoking #{@name} '#{method}' method with #{id} id and #{args} args"

txchan = TxChannel.new txtree, @counter, @socket
Expand Down Expand Up @@ -343,6 +382,8 @@ def initialize(options)
@uuid = options[:uuid]
@endpoint = options[:endpoint]

@framing = RPC::dispatcher options[:protocol]

@actors = Hash.new
@sessions = Hash.new

Expand All @@ -361,7 +402,8 @@ def on(event, &block)

def run
LOG.debug "Starting worker '#{@app}' with uuid '#{@uuid}' at '#{@endpoint}'"
@socket = UNIXSocket.open(@endpoint)

@socket = UNIXSocket.open @endpoint
async.handshake
async.health
async.serve
Expand All @@ -370,20 +412,24 @@ def run
private
def handshake
LOG.debug '<- Handshake'
@socket.write MessagePack::pack([RPC::CONTROL_CHANNEL, RPC::HANDSHAKE, [@uuid]])

@socket.write MessagePack::pack @framing.handshake
end

def health
heartbeat = MessagePack::pack([RPC::CONTROL_CHANNEL, RPC::HEARTBEAT, []])
heartbeat = MessagePack::pack @framing.heartbeat

loop do
LOG.debug '<- Heartbeat'

@socket.write heartbeat
sleep 5.0
end
end

def serve
unpacker = MessagePack::Unpacker.new

loop do
data = @socket.readpartial 4096
unpacker.feed_each data do |decoded|
Expand All @@ -392,62 +438,31 @@ def serve
end
end

def received(session, id, payload)
LOG.debug "-> Message(#{session}, #{id}, #{payload})"

if session == RPC::CONTROL_CHANNEL
control session, id, payload
else
rpc session, id, payload
end
end

def control(session, id, payload)
LOG.debug "Processing control [#{session}, #{id}, #{payload}] message"

case id
when RPC::HEARTBEAT
@disown.reset
when RPC::TERMINATE
terminate *payload
else
LOG.warn "Received unknown message: [#{session}, #{id}, #{payload}]"
end
end

def rpc(session, id, payload)
LOG.debug "Processing RPC [#{session}, #{id}, #{payload}] message"

channels = @sessions.keys
if channels.empty?
min = max = 1
else
min, max = channels.min, channels.max
end

if session < min
LOG.debug "Dropping session #{session} as unexpected"
return
end

if session > max
LOG.debug "Invoking new channel #{session}"
invoke session, *payload
return
end
def received(span, id, payload)
LOG.debug "-> Message(#{span}, #{id}, #{payload})"

case id
when RPC::CHUNK, RPC::ERROR
push session, id, *payload
when RPC::CHOKE
LOG.debug "Closing #{session} session"
@sessions.delete session
else
LOG.warn "Received unknown message: [#{session}, #{id}, #{payload}]"
@framing.process span, id do event
case event
when :heartbeat
@disown.reset
when :terminate
terminate *payload
when :invoke
invoke span, *payload
when :chunk
push span, id, *payload
when :error, :choke
push span, id, *payload
revoke span
else
LOG.warn "Received unknown message: [#{span}, #{id}, #{payload}]"
end
end
end

def invoke(session, event)
LOG.debug "Invoking new #{session} channel with #{event} event"

actor = @actors[event]
txchan = TxChannel.new RPC::TXTREE, session, @socket
rxchan = RxChannel.new RPC::RXTREE, session do |session_|
Expand Down Expand Up @@ -475,9 +490,15 @@ def push(session, id, *payload)
end
end

def revoke(span)
LOG.debug "Closing #{span} channel"
@sessions.delete span
end

def terminate(errno, reason)
LOG.warn "Terminating [#{errno}]: #{reason}"
@socket.write MessagePack::pack([RPC::CONTROL_CHANNEL, RPC::TERMINATE, [errno, reason]])

@socket.write MessagePack::pack @framing.terminate errno, reason
exit errno
end

Expand All @@ -492,6 +513,8 @@ def finalize
class WorkerFactory
def self.create
options = {}
options[:protocol] = 0

OptionParser.new do |opts|
opts.banner = 'Usage: <worker.rb> --app NAME --locator ADDRESS --uuid UUID --endpoint ENDPOINT'

Expand All @@ -511,7 +534,7 @@ def self.create
options[:endpoint] = endpoint
end

opts.on('--protocol VERSION', 'Worker protocol version') do |protocol|
opts.on('--protocol VERSION', Integer, 'Worker protocol version') do |protocol|
options[:protocol] = protocol
end

Expand Down

0 comments on commit 9d680e3

Please sign in to comment.