Permalink
Browse files

Refactor to be an index only transport (no response, with push / pull…

… ZMQ pattern)
  • Loading branch information...
Bertrand Paquet
Bertrand Paquet committed Oct 6, 2012
1 parent 65de03b commit 703e537a83927ddc7ec3464333288003dcdcc4e7
View
@@ -0,0 +1,4 @@
+.project
+.classpath
+.settings
+target
View
@@ -5,12 +5,12 @@
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>transport-zeromq</artifactId>
<packaging>jar</packaging>
- <version>0.0.4-SNAPSHOT</version>
+ <version>0.0.4.1-SNAPSHOT</version>
<name>ØMQ transport layer plugin for Elasticsearch</name>
<url>https://github.com/tlrx/transport-zeromq</url>
<properties>
- <elasticsearch.version>0.19.9</elasticsearch.version>
+ <elasticsearch.version>0.19.10</elasticsearch.version>
</properties>
<scm>
@@ -20,9 +20,7 @@
package org.elasticsearch.zeromq;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@@ -45,51 +43,33 @@ public ZMQRestImpl(Settings settings, RestController restController) {
this.restController = restController;
}
- public ZMQRestResponse process(ZMQRestRequest request){
-
+ public void process(ZMQRestRequest request) {
+
final CountDownLatch latch = new CountDownLatch(1);
- final AtomicReference<ZMQRestResponse> ref = new AtomicReference<ZMQRestResponse>();
-
+
this.restController.dispatchRequest(request, new RestChannel() {
-
+
@Override
public void sendResponse(RestResponse response) {
try {
- if(logger.isTraceEnabled()){
- logger.info("Response to ØMQ client: {}", new String(response.content()));
+ if (logger.isTraceEnabled()) {
+ logger.info("Response: {}, {}", response.status(), new String(response.content()));
+ }
+ if (response.status().getStatus() >= 300) {
+ logger.warn("Wrong retrun code when sending rest request : {}, {}", response.status(), new String(response.content()));
}
- ref.set(convert(response));
} catch (IOException e) {
// ignore
}
latch.countDown();
}
});
-
+
try {
- latch.await();
- return ref.get();
- } catch (Exception e) {
- throw new ZMQException("failed to generate response", 0);
- }
+ latch.await();
+ } catch (Exception e) {
+ throw new ZMQException("failed to generate response", 0);
+ }
}
-
- private ZMQRestResponse convert(RestResponse response) throws IOException {
- ZMQRestResponse zmqResponse = new ZMQRestResponse(response.status());
- if(response.contentType() != null){
- zmqResponse.setContentType(response.contentType());
- }
- if (response.contentLength() > 0) {
- if (response.contentThreadSafe()) {
- zmqResponse.setBody(ByteBuffer.wrap(response.content(), 0, response.contentLength()));
- } else {
- // argh!, we need to copy it over since we are not on the same thread...
- byte[] body = new byte[response.contentLength()];
- System.arraycopy(response.content(), 0, body, 0, response.contentLength());
- zmqResponse.setBody(ByteBuffer.wrap(body));
- }
- }
- return zmqResponse;
- }
}
@@ -1,43 +1,34 @@
package org.elasticsearch.zeromq;
-import org.elasticsearch.common.Bytes;
-import org.elasticsearch.common.Unicode;
-import org.elasticsearch.common.bytes.ByteBufferBytesReference;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.rest.support.AbstractRestRequest;
import org.elasticsearch.rest.support.RestUtils;
import org.elasticsearch.zeromq.exception.NoURIFoundZMQException;
import org.elasticsearch.zeromq.exception.UnsupportedMethodZMQException;
import org.elasticsearch.zeromq.exception.ZMQTransportException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
/**
* @author tlrx
*
*/
public class ZMQRestRequest extends AbstractRestRequest {
- private final List<byte[]> parts;
-
private Method method;
private String uri;
private String rawPath;
private final Map<String, String> params;
-
- public ByteBuffer body;
- public BytesReference bytesReference;
+ public BytesArray body;
- public ZMQRestRequest(String payload, List<byte[]> parts) {
+ public ZMQRestRequest(String payload) {
super();
- this.parts = parts;
this.params = new HashMap<String, String>();
parse(payload);
@@ -89,8 +80,7 @@ private void parse(String payload) {
// Content
int indexContent = payload.indexOf(ZMQSocket.SEPARATOR, m.length() + uri.length());
- body = ByteBuffer.wrap(payload.substring(indexContent+1).getBytes());
- bytesReference = new ByteBufferBytesReference(body);
+ body = new BytesArray(payload.substring(indexContent+1).getBytes());
}
}
@@ -111,7 +101,7 @@ public String rawPath() {
@Override
public boolean hasContent() {
- return ((body != null) && (body.remaining() > 0));
+ return ((body != null) && (body.length() > 0));
}
@Override
@@ -152,7 +142,7 @@ public String param(String key, String defaultValue) {
@Override
public BytesReference content() {
- return bytesReference;
+ return body;
}
}
@@ -1,95 +0,0 @@
-package org.elasticsearch.zeromq;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.elasticsearch.common.Bytes;
-import org.elasticsearch.rest.AbstractRestResponse;
-import org.elasticsearch.rest.RestStatus;
-
-/**
- * @author tlrx
- *
- */
-public class ZMQRestResponse extends AbstractRestResponse {
-
- private final RestStatus status;
-
- public ByteBuffer body;
-
- private String contentType;
-
- public ZMQRestResponse(RestStatus status) {
- super();
- this.status = status;
- }
-
- @Override
- public String contentType() {
- return contentType;
- }
-
- public ZMQRestResponse setBody(ByteBuffer body) {
- this.body = body;
- return this;
- }
-
- @Override
- public byte[] content() throws IOException {
- if (body == null) {
- return Bytes.EMPTY_ARRAY;
- }
- return body.array();
- }
-
- @Override
- public int contentLength() throws IOException {
- if (body == null) {
- return 0;
- }
- return body.remaining();
- }
-
- @Override
- public RestStatus status() {
- return status;
- }
-
- @Override
- public boolean contentThreadSafe() {
- return false;
- }
-
- public void setContentType(String contentType) {
- this.contentType = contentType;
- }
-
- /**
- * @return the payload to reply to the client
- * @throws IOException
- */
- public byte[] payload() {
-
- // TODO optimise & challenge thoses lines...
- ByteBuffer bStatusCode = ByteBuffer.wrap(Integer.toString(this.status.getStatus()).getBytes());
- ByteBuffer bStatusName = ByteBuffer.wrap(this.status.name().getBytes());
- ByteBuffer bSep1 = ByteBuffer.wrap(ZMQSocket.SEPARATOR.getBytes());
- ByteBuffer bSep2 = ByteBuffer.wrap(ZMQSocket.SEPARATOR.getBytes());
- ByteBuffer bContent = null;
-
- try {
- bContent = ByteBuffer.wrap(content());
- } catch (Exception e) {
- bContent = ByteBuffer.wrap(e.getMessage().getBytes());
- }
-
- ByteBuffer payload = ByteBuffer.allocate(bStatusCode.limit() + bSep1.limit() + bStatusName.limit() + bSep2.limit() + bContent.limit());
- payload.put(bStatusCode);
- payload.put(bSep1);
- payload.put(bStatusName);
- payload.put(bSep2);
- payload.put(bContent);
-
- return payload.array();
- }
-}
@@ -19,36 +19,30 @@
package org.elasticsearch.zeromq;
+import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
+
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.node.service.NodeService;
-
-import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
/**
* @author tlrx
*/
public class ZMQServer extends AbstractLifecycleComponent<ZMQServer> {
- private final NodeService nodeService;
-
private volatile ZMQServerTransport transport;
@Inject
- public ZMQServer(Settings settings, NodeService nodeService,
- ZMQServerTransport transport, ZMQRestImpl client) {
-
+ public ZMQServer(Settings settings, ZMQServerTransport transport, ZMQRestImpl client) {
super(settings);
this.transport = transport;
- this.nodeService = nodeService;
}
@Override
protected void doStart() throws ElasticSearchException {
- logger.debug("Starting ØMQ server...");
+ logger.debug("Starting Zeromq server...");
daemonThreadFactory(settings, "zeromq_server").newThread(
new Runnable() {
@Override
@@ -62,8 +56,7 @@ public void run() {
@Override
protected void doStop() throws ElasticSearchException {
- nodeService.removeAttribute("zeromq_address");
- transport.stop();
+ transport.stop();
}
@Override
@@ -21,23 +21,20 @@
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.zeromq.impl.ZMQQueueServerImpl;
+import org.elasticsearch.zeromq.impl.ZMQPullServerImpl;
/**
* @author tlrx
*/
public class ZMQServerModule extends AbstractModule {
- private final Settings settings;
-
public ZMQServerModule(Settings settings) {
- this.settings = settings;
}
@Override
protected void configure() {
bind(ZMQRestImpl.class).asEagerSingleton();
- bind(ZMQServerTransport.class).to(ZMQQueueServerImpl.class).asEagerSingleton();
+ bind(ZMQServerTransport.class).to(ZMQPullServerImpl.class).asEagerSingleton();
bind(ZMQServer.class).asEagerSingleton();
}
}
@@ -4,13 +4,11 @@
package org.elasticsearch.zeromq;
import org.elasticsearch.common.component.LifecycleComponent;
-import org.elasticsearch.common.transport.BoundTransportAddress;
/**
* @author tlrx
*
*/
public interface ZMQServerTransport extends LifecycleComponent<ZMQServerTransport> {
- BoundTransportAddress boundAddress();
}
Oops, something went wrong.

0 comments on commit 703e537

Please sign in to comment.