Permalink
Browse files

add netty transport

  • Loading branch information...
1 parent 617994b commit 3028967e9f33a874726f6c6cf9f77bf867e0de82 @OSluchyk committed Mar 23, 2012
View
92 .idea/workspace.xml
@@ -2,8 +2,14 @@
<project version="4">
<component name="ChangeListManager">
<list default="true" id="67c94154-fa56-435f-8855-ad3baa3d7bdc" name="Default" comment="">
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/README" />
+ <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/src/main/java/demo/client/netty/ConnectOk.java" />
+ <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/src/main/java/demo/client/netty/HttpClientPipelineFactory.java" />
+ <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/src/main/java/demo/client/netty/HttpResponseHandler.java" />
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/.idea/workspace.xml" afterPath="$PROJECT_DIR$/.idea/workspace.xml" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/HttpClientBenchmarks.iml" afterPath="$PROJECT_DIR$/HttpClientBenchmarks.iml" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/pom.xml" afterPath="$PROJECT_DIR$/pom.xml" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/src/main/java/demo/client/AbstractHttpClient.java" afterPath="$PROJECT_DIR$/src/main/java/demo/client/AbstractHttpClient.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/src/main/java/demo/client/netty/NettyHttpClient.java" afterPath="$PROJECT_DIR$/src/main/java/demo/client/netty/NettyHttpClient.java" />
</list>
<ignored path="HttpClientBenchmarks.iws" />
<ignored path=".idea/workspace.xml" />
@@ -144,16 +150,45 @@
</provider>
</entry>
</file>
- <file leaf-file-name="NettyHttpClient.java" pinned="false" current="false" current-in-tab="false">
+ <file leaf-file-name="NettyHttpClient.java" pinned="false" current="true" current-in-tab="true">
<entry file="file://$PROJECT_DIR$/src/main/java/demo/client/netty/NettyHttpClient.java">
<provider selected="true" editor-type-id="text-editor">
- <state line="16" column="0" selection-start="317" selection-end="317" vertical-scroll-proportion="0.0">
+ <state line="52" column="53" selection-start="2155" selection-end="2155" vertical-scroll-proportion="0.9182692">
+ <folding>
+ <element signature="imports" expanded="true" />
+ </folding>
+ </state>
+ </provider>
+ </entry>
+ </file>
+ <file leaf-file-name="HttpResponseHandler.java" pinned="false" current="false" current-in-tab="false">
+ <entry file="file://$PROJECT_DIR$/src/main/java/demo/client/netty/HttpResponseHandler.java">
+ <provider selected="true" editor-type-id="text-editor">
+ <state line="15" column="13" selection-start="579" selection-end="579" vertical-scroll-proportion="0.0">
+ <folding />
+ </state>
+ </provider>
+ </entry>
+ </file>
+ <file leaf-file-name="ConnectOk.java" pinned="false" current="false" current-in-tab="false">
+ <entry file="file://$PROJECT_DIR$/src/main/java/demo/client/netty/ConnectOk.java">
+ <provider selected="true" editor-type-id="text-editor">
+ <state line="7" column="13" selection-start="237" selection-end="237" vertical-scroll-proportion="0.0">
+ <folding />
+ </state>
+ </provider>
+ </entry>
+ </file>
+ <file leaf-file-name="HttpClientPipelineFactory.java" pinned="false" current="false" current-in-tab="false">
+ <entry file="file://$PROJECT_DIR$/src/main/java/demo/client/netty/HttpClientPipelineFactory.java">
+ <provider selected="true" editor-type-id="text-editor">
+ <state line="31" column="3" selection-start="1296" selection-end="1296" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
</file>
- <file leaf-file-name="README" pinned="false" current="true" current-in-tab="true">
+ <file leaf-file-name="README" pinned="false" current="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/README">
<provider selected="true" editor-type-id="text-editor">
<state line="0" column="0" selection-start="0" selection-end="0" vertical-scroll-proportion="0.0">
@@ -198,11 +233,14 @@
<option value="$PROJECT_DIR$/src/main/resources/log4j.xml" />
<option value="$PROJECT_DIR$/src/main/resources/properties.xml" />
<option value="$PROJECT_DIR$/pom.xml" />
- <option value="$PROJECT_DIR$/src/main/java/demo/client/netty/NettyHttpClient.java" />
<option value="$PROJECT_DIR$/src/main/java/demo/client/gfacade/NetHttpClient.java" />
<option value="$PROJECT_DIR$/src/main/java/demo/client/gfacade/GFacadeHttpClient.java" />
<option value="$PROJECT_DIR$/src/main/java/demo/client/gfacade/ApacheHttpClient.java" />
<option value="$PROJECT_DIR$/src/main/java/demo/client/AbstractHttpClient.java" />
+ <option value="$PROJECT_DIR$/src/main/java/demo/client/netty/HttpClientPipelineFactory.java" />
+ <option value="$PROJECT_DIR$/src/main/java/demo/client/netty/ConnectOk.java" />
+ <option value="$PROJECT_DIR$/src/main/java/demo/client/netty/NettyHttpClient.java" />
+ <option value="$PROJECT_DIR$/src/main/java/demo/client/netty/HttpResponseHandler.java" />
</list>
</option>
</component>
@@ -400,6 +438,9 @@
<property name="options.splitter.details.proportions" value="0.2" />
</component>
<component name="RecentsManager">
+ <key name="CopyClassDialog.RECENTS_KEY">
+ <recent name="demo.client.netty" />
+ </key>
<key name="ExtractSuperBase.RECENT_KEYS">
<recent name="demo.client.gfacade" />
</key>
@@ -617,7 +658,7 @@
<option name="INCLUDE_TEXT_INTO_SHELF" value="false" />
<option name="CREATE_PATCH_EXPAND_DETAILS_DEFAULT" value="true" />
<option name="FORCE_NON_EMPTY_COMMENT" value="false" />
- <option name="LAST_COMMIT_MESSAGE" value="add gfacade transport" />
+ <option name="LAST_COMMIT_MESSAGE" value="add README" />
<option name="MAKE_NEW_CHANGELIST_ACTIVE" value="true" />
<option name="OPTIMIZE_IMPORTS_BEFORE_PROJECT_COMMIT" value="false" />
<option name="CHECK_FILES_UP_TO_DATE_BEFORE_COMMIT" value="false" />
@@ -632,6 +673,7 @@
<option name="FILE_HISTORY_SPLITTER_PROPORTION" value="0.6" />
<MESSAGE value="initial commit" />
<MESSAGE value="add gfacade transport" />
+ <MESSAGE value="add README" />
</component>
<component name="XDebuggerManager">
<breakpoint-manager />
@@ -641,13 +683,6 @@
<option name="FILTER_TARGETS" value="false" />
</component>
<component name="editorHistoryManager">
- <entry file="file://$PROJECT_DIR$/src/main/resources/log4j.xml">
- <provider selected="true" editor-type-id="text-editor">
- <state line="33" column="32" selection-start="1361" selection-end="1373" vertical-scroll-proportion="-16.115385">
- <folding />
- </state>
- </provider>
- </entry>
<entry file="file://$PROJECT_DIR$/src/main/resources/properties.xml">
<provider selected="true" editor-type-id="text-editor">
<state line="5" column="40" selection-start="218" selection-end="218" vertical-scroll-proportion="0.0">
@@ -727,20 +762,43 @@
</state>
</provider>
</entry>
- <entry file="file://$PROJECT_DIR$/src/main/java/demo/client/netty/NettyHttpClient.java">
+ <entry file="file://$PROJECT_DIR$/README">
<provider selected="true" editor-type-id="text-editor">
- <state line="16" column="0" selection-start="317" selection-end="317" vertical-scroll-proportion="0.0">
+ <state line="0" column="0" selection-start="0" selection-end="0" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
- <entry file="file://$PROJECT_DIR$/README">
+ <entry file="file://$PROJECT_DIR$/src/main/java/demo/client/netty/HttpClientPipelineFactory.java">
<provider selected="true" editor-type-id="text-editor">
- <state line="0" column="0" selection-start="0" selection-end="0" vertical-scroll-proportion="0.0">
+ <state line="31" column="3" selection-start="1296" selection-end="1296" vertical-scroll-proportion="0.0">
<folding />
</state>
</provider>
</entry>
+ <entry file="file://$PROJECT_DIR$/src/main/java/demo/client/netty/ConnectOk.java">
+ <provider selected="true" editor-type-id="text-editor">
+ <state line="7" column="13" selection-start="237" selection-end="237" vertical-scroll-proportion="0.0">
+ <folding />
+ </state>
+ </provider>
+ </entry>
+ <entry file="file://$PROJECT_DIR$/src/main/java/demo/client/netty/HttpResponseHandler.java">
+ <provider selected="true" editor-type-id="text-editor">
+ <state line="15" column="13" selection-start="579" selection-end="579" vertical-scroll-proportion="0.0">
+ <folding />
+ </state>
+ </provider>
+ </entry>
+ <entry file="file://$PROJECT_DIR$/src/main/java/demo/client/netty/NettyHttpClient.java">
+ <provider selected="true" editor-type-id="text-editor">
+ <state line="52" column="53" selection-start="2155" selection-end="2155" vertical-scroll-proportion="0.9182692">
+ <folding>
+ <element signature="imports" expanded="true" />
+ </folding>
+ </state>
+ </provider>
+ </entry>
</component>
<component name="masterDetails">
<states>
View
1 HttpClientBenchmarks.iml
@@ -37,6 +37,7 @@
<orderEntry type="library" name="Maven: com.google.guava:guava:11.0.1" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-core-asl:1.9.4" level="project" />
<orderEntry type="library" name="Maven: com.google.protobuf:protobuf-java:2.2.0" level="project" />
+ <orderEntry type="library" name="Maven: org.jboss.netty:netty:3.2.7.Final" level="project" />
</component>
</module>
View
5 pom.xml
@@ -48,6 +48,11 @@
<artifactId>google-http-client</artifactId>
<version>1.7.0-beta</version>
</dependency>
+ <dependency>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>3.2.7.Final</version>
+ </dependency>
</dependencies>
View
2 src/main/java/demo/client/AbstractHttpClient.java
@@ -21,5 +21,5 @@ public void run(final String uri, final byte[] content) throws IOException {
watch.stop("POST");
}
- protected abstract void execute(final String uri, final byte[] content) throws IOException;
+ protected abstract void execute(final String uri, final byte[] content) throws Exception;
}
View
24 src/main/java/demo/client/netty/ConnectOk.java
@@ -0,0 +1,24 @@
+package demo.client.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+
+public class ConnectOk implements ChannelFutureListener {
+ private HttpRequest request = null;
+
+ ConnectOk(HttpRequest req) {
+ this.request = req;
+ }
+
+ public void operationComplete(ChannelFuture future) {
+ if (!future.isSuccess()) {
+ future.getCause().printStackTrace();
+ return;
+ }
+ Channel channel = future.getChannel();
+ channel.write(request);
+// channel.close();
+ }
+}
View
33 src/main/java/demo/client/netty/HttpClientPipelineFactory.java
@@ -0,0 +1,33 @@
+package demo.client.netty;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
+import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
+import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timer;
+
+import java.util.concurrent.TimeUnit;
+
+public class HttpClientPipelineFactory implements ChannelPipelineFactory {
+ final int connectionTimeout;
+ private static Timer timer = new HashedWheelTimer();
+
+ public HttpClientPipelineFactory(final int connectionTimeout) {
+ this.connectionTimeout=connectionTimeout;
+ }
+
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = Channels.pipeline(new ReadTimeoutHandler(timer,connectionTimeout, TimeUnit.MILLISECONDS ));
+ pipeline.addLast("decoder", new HttpResponseDecoder());
+ pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
+ pipeline.addLast("encoder", new HttpRequestEncoder());
+ return pipeline;
+ }
+
+
+}
View
57 src/main/java/demo/client/netty/HttpResponseHandler.java
@@ -0,0 +1,57 @@
+package demo.client.netty;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandler;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.timeout.ReadTimeoutException;
+
+
+@ChannelHandler.Sharable
+public class HttpResponseHandler extends SimpleChannelUpstreamHandler {
+ private boolean messageWasReceived=false;
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+// if(messageWasReceived)return;
+ if (e.getCause() instanceof ReadTimeoutException) {
+ System.err.println("TIMEOUT "+ctx.getChannel().getRemoteAddress());
+ }
+ e.getCause().printStackTrace();
+ Channel ch = e.getChannel();
+ ch.close();
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e){
+ HttpResponse response = (HttpResponse) e.getMessage();
+ System.out.println("STATUS: " + response.getStatus());
+ System.out.println("VERSION: " + response.getProtocolVersion());
+ System.out.println();
+
+ if (!response.getHeaderNames().isEmpty()) {
+ for (String name: response.getHeaderNames()) {
+ for (String value: response.getHeaders(name)) {
+ System.out.println("HEADER: " + name + " = " + value);
+ }
+ }
+ System.out.println();
+ }
+ ChannelBuffer content = response.getContent();
+ if (content.readable()) {
+ System.out.println("CONTENT {");
+ System.out.println("len : " + HttpHeaders.getContentLength(response) );
+ System.out.println("content : "+content.toString("UTF-8"));
+ System.out.println("} END OF CONTENT");
+ }
+ messageWasReceived=true;
+ Channel ch = e.getChannel();
+ ch.close();
+ }
+}
+
View
91 src/main/java/demo/client/netty/NettyHttpClient.java
@@ -1,16 +1,101 @@
package demo.client.netty;
import demo.client.AbstractHttpClient;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.CookieEncoder;
+import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpVersion;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.Executors;
+
+public class NettyHttpClient extends AbstractHttpClient {
+ private final static String TAG_NAME = "Netty";
+ private ClientBootstrap bootstrap;
+ private final int connectionTimeout = 200;
+ private static final int defaultConnectionTimeout = 200;
+
+ ChannelGroup allChannels = null;
-public class NettyHttpClient extends AbstractHttpClient{
- private final static String TAG_NAME ="Netty";
public NettyHttpClient() {
super(TAG_NAME);
+ bootstrap = new ClientBootstrap(
+ new NioClientSocketChannelFactory(
+ Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool()));
+ bootstrap.setPipelineFactory(new HttpClientPipelineFactory(connectionTimeout));
+ bootstrap.setOption("tcpNoDelay", true);
+ Map<String, Object> options = bootstrap.getOptions();
+ for (Map.Entry<String, Object> entry : options.entrySet()) {
+ System.out.println(entry.getKey() + " : " + entry.getValue());
+ }
+ allChannels = new DefaultChannelGroup();
}
@Override
- public void execute(String uri, byte[] content) {
+ public void execute(String uri, byte[] content) throws Exception {
+ ChannelBuffer buffer = ChannelBuffers.copiedBuffer(content);
+ retrieve("POST", uri, buffer, null);
+ }
+
+ public ChannelPipeline retrieve(String method, String url, ChannelBuffer data, Map<String, String> cookie) throws Exception {
+ if (url == null) throw new Exception("url is null");
+ URI uri = new URI(url);
+ String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
+ String host = uri.getHost() == null ? "localhost" : uri.getHost();
+
+
+ if (!scheme.equals("http")) {
+ throw new Exception("just support http protocol");
+ }
+
+ HttpRequest request = new DefaultHttpRequest(
+ HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), uri.toASCIIString());
+ request.setHeader(HttpHeaders.Names.HOST, host);
+ request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+ request.setContent(data);
+
+
+ if (cookie != null) {
+ CookieEncoder httpCookieEncoder = new CookieEncoder(false);
+ for (Map.Entry<String, String> m : cookie.entrySet()) {
+ httpCookieEncoder.addCookie(m.getKey(), m.getValue());
+ request.setHeader(HttpHeaders.Names.COOKIE, httpCookieEncoder.encode());
+ }
+ }
+ return retrieve(request);
+
+ }
+
+ public ChannelPipeline retrieve(HttpRequest request) throws Exception {
+ URI uri = new URI(request.getUri());
+ int port = uri.getPort() == -1 ? 80 : uri.getPort();
+ ChannelFuture future = bootstrap.connect(new InetSocketAddress(request.getHeader(HttpHeaders.Names.HOST), port));
+ future.addListener(new ConnectOk(request));
+ Channel channel = future.getChannel();
+ channel.getConfig().setConnectTimeoutMillis(connectionTimeout);
+ allChannels.add(channel);
+ return channel.getPipeline();
+ }
+
+ public void close() {
+ allChannels.close().awaitUninterruptibly();
+ bootstrap.releaseExternalResources();
}
+
}

0 comments on commit 3028967

Please sign in to comment.