Skip to content

Commit

Permalink
AVRO-129. Add HTTP-based RPC client and server.
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/hadoop/avro/trunk@820442 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
cutting committed Sep 30, 2009
1 parent 8289fbd commit 6b2a226
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Expand Up @@ -33,6 +33,8 @@ Trunk (unreleased changes)
written with a different version of the schema than is current.
(cutting)

AVRO-129. Add HTTP-based RPC client and server. (cutting)

IMPROVEMENTS

AVRO-99. Use Boost framework for C++ unit tests.
Expand Down
2 changes: 2 additions & 0 deletions ivy.xml
Expand Up @@ -39,6 +39,8 @@
rev="1.5"/>
<dependency org="com.thoughtworks.paranamer" name="paranamer-ant"
rev="1.5"/>
<dependency org="org.mortbay.jetty" name="jetty"
rev="6.1.14"/>
<dependency org="junit" name="junit" rev="4.5" conf="test->default"/>
<dependency org="checkstyle" name="checkstyle" rev="5.0"
conf="test->default"/>
Expand Down
76 changes: 76 additions & 0 deletions src/java/org/apache/avro/ipc/HttpServer.java
@@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.avro.ipc;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.avro.AvroRuntimeException;

import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;

public class HttpServer extends HttpServlet implements Server {
private Responder responder;
private org.mortbay.jetty.Server server;

public HttpServer(Responder responder, int port) throws IOException {
this.responder = responder;
this.server = new org.mortbay.jetty.Server(port);
new Context(server,"/").addServlet(new ServletHolder(this), "/*");
try {
server.start();
} catch (Exception e) {
throw new AvroRuntimeException(e);
}
}

@Override
public int getPort() { return server.getConnectors()[0].getLocalPort(); }

@Override
public void close() {
try {
server.stop();
} catch (Exception e) {
throw new AvroRuntimeException(e);
}
}

public void doPost(HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
response.setContentType("avro/binary");
List<ByteBuffer> requestBuffers =
HttpTransceiver.readBuffers(request.getInputStream());
try {
List<ByteBuffer> responseBuffers =
responder.respond(requestBuffers);
response.setContentLength(HttpTransceiver.getLength(responseBuffers));
HttpTransceiver.writeBuffers(responseBuffers, response.getOutputStream());
} catch (AvroRuntimeException e) {
throw new ServletException(e);
}
}
}
116 changes: 116 additions & 0 deletions src/java/org/apache/avro/ipc/HttpTransceiver.java
@@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.avro.ipc;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.net.URL;
import java.net.URLConnection;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** An HTTP-based {@link Transceiver} implementation. */
public class HttpTransceiver extends Transceiver {
private static final Logger LOG
= LoggerFactory.getLogger(HttpTransceiver.class);

private URL url;
private URLConnection connection;

public HttpTransceiver(URL url) { this.url = url; }

public String getRemoteName() { return this.url.toString(); }

@Override
public synchronized List<ByteBuffer> transceive(List<ByteBuffer> request)
throws IOException {
this.connection = url.openConnection();
connection.setRequestProperty("Content-Type", "avro/binary");
connection.setRequestProperty("Content-Length",
Integer.toString(getLength(request)));
connection.setDoOutput(true);
LOG.info("Connecting to: "+url);
return super.transceive(request);
}

public synchronized List<ByteBuffer> readBuffers() throws IOException {
return readBuffers(connection.getInputStream());
}

public synchronized void writeBuffers(List<ByteBuffer> buffers)
throws IOException {
writeBuffers(buffers, connection.getOutputStream());
}

static int getLength(List<ByteBuffer> buffers) {
int length = 0;
for (ByteBuffer buffer : buffers) {
length += 4;
length += buffer.remaining();
}
length += 4;
return length;
}

static List<ByteBuffer> readBuffers(InputStream in)
throws IOException {
List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
while (true) {
int length = (in.read()<<24)+(in.read()<<16)+(in.read()<<8)+in.read();
if (length == 0) { // end of buffers
return buffers;
}
ByteBuffer buffer = ByteBuffer.allocate(length);
while (buffer.hasRemaining()) {
int p = buffer.position();
int i = in.read(buffer.array(), p, buffer.remaining());
if (i < 0)
throw new EOFException("Unexpected EOF");
buffer.position(p+i);
}
buffer.flip();
buffers.add(buffer);
}
}

static void writeBuffers(List<ByteBuffer> buffers, OutputStream out)
throws IOException {
for (ByteBuffer buffer : buffers) {
writeLength(buffer.limit(), out); // length-prefix
out.write(buffer.array(), buffer.position(), buffer.remaining());
buffer.position(buffer.limit());
}
writeLength(0, out); // null-terminate
}

private static void writeLength(int length, OutputStream out)
throws IOException {
out.write(0xff & (length >>> 24));
out.write(0xff & (length >>> 16));
out.write(0xff & (length >>> 8));
out.write(0xff & length);
}
}

41 changes: 41 additions & 0 deletions src/test/java/org/apache/avro/TestProtocolHttp.java
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro;

import java.util.Random;
import org.apache.avro.ipc.HttpServer;
import org.apache.avro.ipc.HttpTransceiver;
import org.apache.avro.specific.SpecificRequestor;
import org.apache.avro.specific.SpecificResponder;
import org.apache.avro.test.Simple;
import org.junit.Before;

import java.net.URL;

public class TestProtocolHttp extends TestProtocolSpecific {

@Before
public void testStartServer() throws Exception {
server =
new HttpServer(new SpecificResponder(Simple.class, new TestImpl()), 0);
client =
new HttpTransceiver(new URL("http://127.0.0.1:"+server.getPort()+"/"));
proxy = (Simple)SpecificRequestor.getClient(Simple.class, client);
}

}

0 comments on commit 6b2a226

Please sign in to comment.