Permalink
Browse files

AVRO-450. HTTP IPC for ruby.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/avro/trunk@931026 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent f85120b commit 5c7b7486d1cbedbc006d5e9a7dfbcea7678a8311 Jeff Hodges committed Apr 6, 2010
Showing with 266 additions and 9 deletions.
  1. +2 −0 CHANGES.txt
  2. +100 −9 lang/ruby/lib/avro/ipc.rb
  3. +84 −0 lang/ruby/test/sample_ipc_http_client.rb
  4. +80 −0 lang/ruby/test/sample_ipc_http_server.rb
View
@@ -18,6 +18,8 @@ Avro 1.4.0 (unreleased)
AVRO-497. Minor changes to C++ autotools, makefiles, and code generator. (sbanacho)
+ AVRO-450. HTTP IPC for ruby. (jmhodges)
+
BUG FIXES
AVRO-461. Skipping primitives in the ruby side (jmhodges)
View
@@ -168,13 +168,13 @@ def read_handshake_response(decoder)
true
when 'CLIENT'
raise AvroError.new('Handshake failure. match == CLIENT') if send_protocol
- self.remote_protocol = handshake_response['serverProtocol']
+ self.remote_protocol = Avro::Protocol.parse(handshake_response['serverProtocol'])
self.remote_hash = handshake_response['serverHash']
self.send_protocol = false
false
when 'NONE'
raise AvroError.new('Handshake failure. match == NONE') if send_protocol
- self.remote_protocol = handshake_response['serverProtocol']
+ self.remote_protocol = Avro::Protocol.parse(handshake_response['serverProtocol'])
self.remote_hash = handshake_response['serverHash']
self.send_protocol = true
false
@@ -236,19 +236,18 @@ def initialize(local_protocol)
protocol_cache[local_hash] = local_protocol
end
- def respond(transport)
- # Called by a server to deserialize a request, compute and serialize
- # a response or error. Compare to 'handle()' in Thrift.
-
- call_request = transport.read_framed_message
+ # Called by a server to deserialize a request, compute and serialize
+ # a response or error. Compare to 'handle()' in Thrift.
+ def respond(call_request)
+ buffer_reader = StringIO.new(call_request)
buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_request))
buffer_writer = StringIO.new('', 'w+')
buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer)
error = nil
response_metadata = {}
begin
- remote_protocol = process_handshake(transport, buffer_decoder, buffer_encoder)
+ remote_protocol = process_handshake(buffer_decoder, buffer_encoder)
# handshake failure
unless remote_protocol
return buffer_writer.string
@@ -300,14 +299,15 @@ def respond(transport)
buffer_writer.string
end
- def process_handshake(transport, decoder, encoder)
+ def process_handshake(decoder, encoder)
handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder)
handshake_response = {}
# determine the remote protocol
client_hash = handshake_request['clientHash']
client_protocol = handshake_request['clientProtocol']
remote_protocol = protocol_cache[client_hash]
+
if !remote_protocol && client_protocol
remote_protocol = protocol.parse(client_protocol)
protocol_cache[client_hash] = remote_protocol
@@ -440,4 +440,95 @@ def close
sock.close
end
end
+
+ class ConnectionClosedError < StandardError; end
+
+ class FramedWriter
+ attr_reader :writer
+ def initialize(writer)
+ @writer = writer
+ end
+
+ def write_framed_message(message)
+ message_size = message.size
+ total_bytes_sent = 0
+ while message_size - total_bytes_sent > 0
+ if message_size - total_bytes_sent > BUFFER_SIZE
+ buffer_size = BUFFER_SIZE
+ else
+ buffer_size = message_size - total_bytes_sent
+ end
+ write_buffer(message[total_bytes_sent, buffer_size])
+ total_bytes_sent += buffer_size
+ end
+ write_buffer_size(0)
+ end
+
+ def to_s; writer.string; end
+
+ private
+ def write_buffer(chunk)
+ buffer_size = chunk.size
+ write_buffer_size(buffer_size)
+ writer << chunk
+ end
+
+ def write_buffer_size(n)
+ writer.write([n].pack('N'))
+ end
+ end
+
+ class FramedReader
+ attr_reader :reader
+
+ def initialize(reader)
+ @reader = reader
+ end
+
+ def read_framed_message
+ message = []
+ loop do
+ buffer = ""
+ buffer_size = read_buffer_size
+
+ return message.join if buffer_size == 0
+
+ while buffer.size < buffer_size
+ chunk = reader.read(buffer_size - buffer.size)
+ chunk_error?(chunk)
+ buffer << chunk
+ end
+ message << buffer
+ end
+ end
+
+ private
+ def read_buffer_size
+ header = reader.read(BUFFER_HEADER_LENGTH)
+ chunk_error?(header)
+ header.unpack('N')[0]
+ end
+
+ def chunk_error?(chunk)
+ raise ConnectionClosedError.new("Reader read 0 bytes") if chunk == ''
+ end
+ end
+
+ # Only works for clients. Sigh.
+ class HTTPTransceiver
+ attr_reader :remote_name, :host, :port
+ def initialize(host, port)
+ @host, @port = host, port
+ @remote_name = "#{host}:#{port}"
+ end
+
+ def transceive(message)
+ writer = FramedWriter.new(StringIO.new)
+ writer.write_framed_message(message)
+ resp = Net::HTTP.start(host, port) do |http|
+ http.post('/', writer.to_s, {'Content-Type' => 'avro/binary'})
+ end
+ FramedReader.new(StringIO.new(resp.body)).read_framed_message
+ end
+ end
end
@@ -0,0 +1,84 @@
+#!/usr/bin/env ruby
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'socket'
+require 'avro'
+
+MAIL_PROTOCOL_JSON = <<-JSON
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+ {"name": "Message", "type": "record",
+ "fields": [
+ {"name": "to", "type": "string"},
+ {"name": "from", "type": "string"},
+ {"name": "body", "type": "string"}
+ ]
+ }
+ ],
+
+ "messages": {
+ "send": {
+ "request": [{"name": "message", "type": "Message"}],
+ "response": "string"
+ },
+ "replay": {
+ "request": [],
+ "response": "string"
+ }
+ }
+}
+JSON
+
+MAIL_PROTOCOL = Avro::Protocol.parse(MAIL_PROTOCOL_JSON)
+
+def make_requestor(server_address, port, protocol)
+ transport = Avro::IPC::HTTPTransceiver.new(server_address, port)
+ Avro::IPC::Requestor.new(protocol, transport)
+end
+
+if $0 == __FILE__
+ if ![3, 4].include?(ARGV.length)
+ raise "Usage: <to> <from> <body> [<count>]"
+ end
+
+ # client code - attach to the server and send a message
+ # fill in the Message record
+ message = {
+ 'to' => ARGV[0],
+ 'from' => ARGV[1],
+ 'body' => ARGV[2]
+ }
+
+ num_messages = (ARGV[3] || 1).to_i
+
+ # build the parameters for the request
+ params = {'message' => message}
+ # send the requests and print the result
+
+ num_messages.times do
+ requestor = make_requestor('localhost', 9090, MAIL_PROTOCOL)
+ result = requestor.request('send', params)
+ puts("Result: " + result)
+ end
+
+ # try out a replay message
+ requestor = make_requestor('localhost', 9090, MAIL_PROTOCOL)
+ result = requestor.request('replay', {})
+ puts("Replay Result: " + result)
+end
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+require 'avro'
+require 'webrick'
+
+MAIL_PROTOCOL_JSON = <<-JSON
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+ {"name": "Message", "type": "record",
+ "fields": [
+ {"name": "to", "type": "string"},
+ {"name": "from", "type": "string"},
+ {"name": "body", "type": "string"}
+ ]
+ }
+ ],
+
+ "messages": {
+ "send": {
+ "request": [{"name": "message", "type": "Message"}],
+ "response": "string"
+ },
+ "replay": {
+ "request": [],
+ "response": "string"
+ }
+ }
+}
+JSON
+
+MAIL_PROTOCOL = Avro::Protocol.parse(MAIL_PROTOCOL_JSON)
+
+class MailResponder < Avro::IPC::Responder
+ def initialize
+ super(MAIL_PROTOCOL)
+ end
+
+ def call(message, request)
+ if message.name == 'send'
+ request_content = request['message']
+ "Sent message to #{request_content['to']} from #{request_content['from']} with body #{request_content['body']}"
+ elsif message.name == 'replay'
+ 'replay'
+ end
+ end
+end
+
+class MailHandler < WEBrick::HTTPServlet::AbstractServlet
+ def do_POST(req, resp)
+ responder = MailResponder.new
+ call_request = Avro::IPC::FramedReader.new(StringIO.new(req.body)).read_framed_message
+ unframed_resp = responder.respond(call_request)
+ writer = Avro::IPC::FramedWriter.new(StringIO.new)
+ writer.write_framed_message(unframed_resp)
+ resp.body = writer.to_s
+ raise WEBrick::HTTPStatus::OK
+ end
+end
+
+if $0 == __FILE__
+ server = WEBrick::HTTPServer.new(:Host => 'localhost', :Port => 9090)
+ server.mount '/', MailHandler
+ trap("INT") { server.shutdown }
+ server.start
+end

0 comments on commit 5c7b748

Please sign in to comment.