Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Initial spike of a netty integration module.

  • Loading branch information...
commit bc41ebc6f123a5b6382ee3255a0f6539300c0863 1 parent 4c05fed
Hiram Chirino authored January 15, 2013
94  hawtdispatch-netty/pom.xml
... ...
@@ -0,0 +1,94 @@
  1
+<?xml version="1.0" encoding="UTF-8"?>
  2
+<!--
  3
+
  4
+    Copyright (C) 2012 FuseSource, Inc.
  5
+    http://fusesource.com
  6
+
  7
+    Licensed under the Apache License, Version 2.0 (the "License");
  8
+    you may not use this file except in compliance with the License.
  9
+    You may obtain a copy of the License at
  10
+
  11
+       http://www.apache.org/licenses/LICENSE-2.0
  12
+
  13
+    Unless required by applicable law or agreed to in writing, software
  14
+    distributed under the License is distributed on an "AS IS" BASIS,
  15
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  16
+    See the License for the specific language governing permissions and
  17
+    limitations under the License.
  18
+
  19
+-->
  20
+
  21
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  22
+
  23
+  <modelVersion>4.0.0</modelVersion>
  24
+
  25
+  <parent>
  26
+    <groupId>org.fusesource.hawtdispatch</groupId>
  27
+    <artifactId>hawtdispatch-project</artifactId>
  28
+    <version>1.14-SNAPSHOT</version>
  29
+  </parent>
  30
+    
  31
+  <groupId>org.fusesource.hawtdispatch</groupId>
  32
+  <artifactId>hawtdispatch-netty</artifactId>
  33
+  <version>1.14-SNAPSHOT</version>
  34
+  <packaging>bundle</packaging>
  35
+
  36
+  <description>HawtDispatch Transport: Transport abstractions for HawtDispatch</description>
  37
+
  38
+  <properties>
  39
+    <junit-version>4.7</junit-version>
  40
+    <asm-version>3.1</asm-version>
  41
+    <log4j-version>1.2.14</log4j-version>
  42
+    <osgi-version>4.2.0</osgi-version>
  43
+  </properties>
  44
+
  45
+  <dependencies>
  46
+
  47
+    <dependency>
  48
+      <groupId>org.fusesource.hawtdispatch</groupId>
  49
+      <artifactId>hawtdispatch</artifactId>
  50
+      <version>1.14-SNAPSHOT</version>
  51
+    </dependency>
  52
+    <dependency>
  53
+      <groupId>io.netty</groupId>
  54
+      <artifactId>netty-transport</artifactId>
  55
+      <version>4.0.0.Beta1-SNAPSHOT</version>
  56
+    </dependency>
  57
+
  58
+    <dependency>
  59
+      <groupId>junit</groupId>
  60
+      <artifactId>junit</artifactId>
  61
+      <version>${junit-version}</version>
  62
+      <scope>test</scope>
  63
+    </dependency>
  64
+    <dependency>
  65
+      <groupId>log4j</groupId>
  66
+      <artifactId>log4j</artifactId>
  67
+      <version>${log4j-version}</version>
  68
+      <scope>test</scope>
  69
+    </dependency>
  70
+  </dependencies>
  71
+  
  72
+  <build>
  73
+    <plugins>
  74
+      <plugin>
  75
+        <groupId>org.apache.maven.plugins</groupId>
  76
+        <artifactId>maven-compiler-plugin</artifactId>
  77
+        <configuration>
  78
+          <source>1.6</source>
  79
+          <target>1.6</target>
  80
+        </configuration>
  81
+      </plugin>
  82
+      <plugin>
  83
+        <groupId>org.apache.felix</groupId>
  84
+        <artifactId>maven-bundle-plugin</artifactId>
  85
+        <extensions>true</extensions>
  86
+        <inherited>true</inherited>
  87
+        <configuration>
  88
+          <instructions>
  89
+          </instructions>
  90
+        </configuration>
  91
+      </plugin>
  92
+    </plugins>
  93
+  </build>
  94
+</project>
114  hawtdispatch-netty/src/main/java/org/fusesource/hawtdispatch/netty/HawtAbstractChannel.java
... ...
@@ -0,0 +1,114 @@
  1
+/*
  2
+ * Copyright 2012 The Netty Project
  3
+ * Copyright 2013 Red Hat, Inc.
  4
+ *
  5
+ * The Netty Project licenses this file to you under the Apache License,
  6
+ * version 2.0 (the "License"); you may not use this file except in compliance
  7
+ * with the License. You may obtain a copy of the License at:
  8
+ *
  9
+ *   http://www.apache.org/licenses/LICENSE-2.0
  10
+ *
  11
+ * Unless required by applicable law or agreed to in writing, software
  12
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  13
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  14
+ * License for the specific language governing permissions and limitations
  15
+ * under the License.
  16
+ */
  17
+package org.fusesource.hawtdispatch.netty;
  18
+
  19
+import io.netty.channel.AbstractChannel;
  20
+import io.netty.channel.Channel;
  21
+import io.netty.channel.ChannelPromise;
  22
+import io.netty.channel.EventLoop;
  23
+
  24
+import java.net.ConnectException;
  25
+import java.net.InetSocketAddress;
  26
+import java.net.SocketAddress;
  27
+import java.nio.channels.SocketChannel;
  28
+import java.util.concurrent.ScheduledFuture;
  29
+import java.util.concurrent.TimeUnit;
  30
+
  31
+/**
  32
+ * Abstract base class for {@link io.netty.channel.Channel} implementations that use
  33
+ * HawtDispatch.
  34
+ */
  35
+abstract class HawtAbstractChannel extends AbstractChannel {
  36
+
  37
+    protected volatile java.nio.channels.Channel ch;
  38
+
  39
+    /**
  40
+     * The future of the current connection attempt.  If not null, subsequent
  41
+     * connection attempts will fail.
  42
+     */
  43
+    protected ScheduledFuture<?> connectTimeoutFuture;
  44
+    private ConnectException connectTimeoutException;
  45
+
  46
+    /**
  47
+     * Creates a new instance.
  48
+     *
  49
+     * @param id
  50
+     *        the unique non-negative integer ID of this channel.
  51
+     *        Specify {@code null} to auto-generate a unique negative integer
  52
+     *        ID.
  53
+     * @param parent
  54
+     *        the parent of this channel. {@code null} if there's no parent.
  55
+     * @param ch
  56
+     *        the {@link SocketChannel} which will handle the IO or {@code null} if not created yet.
  57
+     */
  58
+    protected HawtAbstractChannel(Channel parent, Integer id, java.nio.channels.Channel ch) {
  59
+        super(parent, id);
  60
+        this.ch = ch;
  61
+    }
  62
+
  63
+    @Override
  64
+    public InetSocketAddress localAddress() {
  65
+        return (InetSocketAddress) super.localAddress();
  66
+    }
  67
+
  68
+    @Override
  69
+    public InetSocketAddress remoteAddress() {
  70
+        return (InetSocketAddress) super.remoteAddress();
  71
+    }
  72
+
  73
+    /**
  74
+     * Return the underlying {@link SocketChannel}. Be aware this should only be called after it was set as
  75
+     * otherwise it will throw an {@link IllegalStateException}.
  76
+     */
  77
+    protected java.nio.channels.Channel javaChannel() {
  78
+        if (ch == null) {
  79
+            throw new IllegalStateException("Try to access Channel before eventLoop was registered");
  80
+        }
  81
+        return ch;
  82
+    }
  83
+
  84
+    @Override
  85
+    public boolean isOpen() {
  86
+        if (ch == null) {
  87
+            return true;
  88
+        }
  89
+        return ch.isOpen();
  90
+    }
  91
+
  92
+    @Override
  93
+    protected boolean isCompatible(EventLoop loop) {
  94
+        return loop instanceof HawtEventLoop;
  95
+    }
  96
+
  97
+    @Override
  98
+    protected AbstractUnsafe newUnsafe() {
  99
+        return new HawtAbstractUnsafe();
  100
+    }
  101
+
  102
+    /**
  103
+     * Connect to the remote peer using the given localAddress if one is specified or {@code null} otherwise.
  104
+     */
  105
+    protected abstract void doConnect(SocketAddress remoteAddress,
  106
+            SocketAddress localAddress, ChannelPromise connectPromise);
  107
+
  108
+    class HawtAbstractUnsafe extends AbstractUnsafe {
  109
+        @Override
  110
+        public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
  111
+            doConnect(remoteAddress, localAddress, promise);
  112
+        }
  113
+    }
  114
+}
192  hawtdispatch-netty/src/main/java/org/fusesource/hawtdispatch/netty/HawtEventLoop.java
... ...
@@ -0,0 +1,192 @@
  1
+/*
  2
+ * Copyright 2012 The Netty Project
  3
+ * Copyright 2013 Red Hat, Inc.
  4
+ *
  5
+ * The Netty Project licenses this file to you under the Apache License,
  6
+ * version 2.0 (the "License"); you may not use this file except in compliance
  7
+ * with the License. You may obtain a copy of the License at:
  8
+ *
  9
+ *   http://www.apache.org/licenses/LICENSE-2.0
  10
+ *
  11
+ * Unless required by applicable law or agreed to in writing, software
  12
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  13
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  14
+ * License for the specific language governing permissions and limitations
  15
+ * under the License.
  16
+ */
  17
+package org.fusesource.hawtdispatch.netty;
  18
+
  19
+import io.netty.channel.*;
  20
+import org.fusesource.hawtdispatch.DispatchQueue;
  21
+
  22
+import java.util.*;
  23
+import java.util.concurrent.*;
  24
+
  25
+/**
  26
+ * {@link io.netty.channel.EventLoop} implementations which will
  27
+ * handle HawtDispatch based {@link io.netty.channel.Channel}s.
  28
+ */
  29
+final class HawtEventLoop extends AbstractExecutorService implements EventLoop {
  30
+
  31
+    EventLoopGroup parent;
  32
+    DispatchQueue queue;
  33
+
  34
+    @Override
  35
+    public EventLoopGroup parent() {
  36
+        return parent;
  37
+    }
  38
+
  39
+    @Override
  40
+    public boolean inEventLoop() {
  41
+        return queue.isExecuting();
  42
+    }
  43
+
  44
+    @Override
  45
+    public EventLoop next() {
  46
+        return this;
  47
+    }
  48
+
  49
+    boolean  shutdown = false;
  50
+    @Override
  51
+    public void shutdown() {
  52
+        shutdown = true;
  53
+    }
  54
+
  55
+    @Override
  56
+    public List<Runnable> shutdownNow() {
  57
+        shutdown = true;
  58
+        return Collections.emptyList();
  59
+    }
  60
+
  61
+    @Override
  62
+    public boolean isShutdown() {
  63
+        return shutdown;
  64
+    }
  65
+
  66
+    @Override
  67
+    public boolean isTerminated() {
  68
+        return shutdown;
  69
+    }
  70
+
  71
+    @Override
  72
+    public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException {
  73
+        return shutdown;
  74
+    }
  75
+
  76
+    @Override
  77
+    public <V> ScheduledFuture<V> schedule(Callable<V> vCallable, long delay, TimeUnit timeUnit) {
  78
+        return new ScheduledFutureTask(vCallable, timeUnit.toNanos(delay)).schedule();
  79
+    }
  80
+
  81
+    @Override
  82
+    public ScheduledFuture<?> schedule(final Runnable runnable, long delay, TimeUnit timeUnit) {
  83
+        return new ScheduledFutureTask(runnable, timeUnit.toNanos(delay)).schedule();
  84
+    }
  85
+
  86
+    @Override
  87
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long delay, long period, TimeUnit timeUnit) {
  88
+        return new ScheduledFutureTask(runnable, timeUnit.toNanos(delay), timeUnit.toNanos(period)).schedule();
  89
+    }
  90
+
  91
+    @Override
  92
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable, long initialDelay, long delay, TimeUnit timeUnit) {
  93
+        throw new UnsupportedOperationException();
  94
+    }
  95
+
  96
+    private class ScheduledFutureTask<V> extends FutureTask<V> implements ScheduledFuture<V> {
  97
+
  98
+        private long deadlineNanos;
  99
+        private long periodNanos;
  100
+
  101
+        ScheduledFutureTask(Runnable runnable,long nanoTime) {
  102
+            super(runnable, null);
  103
+            deadlineNanos = nanoTime;
  104
+            periodNanos = 0;
  105
+        }
  106
+
  107
+        ScheduledFutureTask(Runnable runnable, long nanoTime, long period) {
  108
+            super(runnable, null);
  109
+            if (period == 0) {
  110
+                throw new IllegalArgumentException("period: 0 (expected: != 0)");
  111
+            }
  112
+            deadlineNanos = nanoTime;
  113
+            periodNanos = period;
  114
+        }
  115
+
  116
+        ScheduledFutureTask(Callable<V> callable, long nanoTime) {
  117
+            super(callable);
  118
+            deadlineNanos = nanoTime;
  119
+            periodNanos = 0;
  120
+        }
  121
+
  122
+        public long delayNanos() {
  123
+            return Math.max(0, deadlineNanos - System.nanoTime());
  124
+        }
  125
+
  126
+        @Override
  127
+        public long getDelay(TimeUnit unit) {
  128
+            return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
  129
+        }
  130
+
  131
+        @Override
  132
+        public int compareTo(Delayed o) {
  133
+            if (this == o) {
  134
+                return 0;
  135
+            }
  136
+            ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
  137
+            long d = deadlineNanos - that.deadlineNanos;
  138
+            if (d < 0) {
  139
+                return -1;
  140
+            } else if (d > 0) {
  141
+                return 1;
  142
+            } else {
  143
+                return 1;
  144
+            }
  145
+        }
  146
+
  147
+        @Override
  148
+        public void run() {
  149
+            if (periodNanos == 0) {
  150
+                super.run();
  151
+            } else {
  152
+                boolean reset = runAndReset();
  153
+                if (reset && !isShutdown()) {
  154
+                    long p = periodNanos;
  155
+                    if (p > 0) {
  156
+                        deadlineNanos += p;
  157
+                    } else {
  158
+                        deadlineNanos = System.nanoTime() - p;
  159
+                    }
  160
+                    schedule();
  161
+                }
  162
+            }
  163
+        }
  164
+
  165
+        public ScheduledFuture<V> schedule() {
  166
+            queue.executeAfter(delayNanos(), TimeUnit.NANOSECONDS, this);
  167
+            return this;
  168
+        }
  169
+    }
  170
+
  171
+
  172
+    @Override
  173
+    public void execute(Runnable runnable) {
  174
+        queue.execute(runnable);
  175
+    }
  176
+
  177
+    @Override
  178
+    public boolean inEventLoop(Thread thread) {
  179
+        throw new UnsupportedOperationException();
  180
+    }
  181
+
  182
+    @Override
  183
+    public ChannelFuture register(Channel channel) {
  184
+        throw new UnsupportedOperationException();
  185
+    }
  186
+
  187
+    @Override
  188
+    public ChannelFuture register(Channel channel, ChannelPromise promise) {
  189
+        throw new UnsupportedOperationException();
  190
+    }
  191
+
  192
+}
53  hawtdispatch-netty/src/main/java/org/fusesource/hawtdispatch/netty/HawtEventLoopGroup.java
... ...
@@ -0,0 +1,53 @@
  1
+/*
  2
+ * Copyright 2012 The Netty Project
  3
+ * Copyright 2013 Red Hat, Inc.
  4
+ *
  5
+ * The Netty Project licenses this file to you under the Apache License,
  6
+ * version 2.0 (the "License"); you may not use this file except in compliance
  7
+ * with the License. You may obtain a copy of the License at:
  8
+ *
  9
+ *   http://www.apache.org/licenses/LICENSE-2.0
  10
+ *
  11
+ * Unless required by applicable law or agreed to in writing, software
  12
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  13
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  14
+ * License for the specific language governing permissions and limitations
  15
+ * under the License.
  16
+ */
  17
+package org.fusesource.hawtdispatch.netty;
  18
+
  19
+import io.netty.channel.*;
  20
+import org.fusesource.hawtdispatch.Dispatch;
  21
+import org.fusesource.hawtdispatch.DispatchQueue;
  22
+
  23
+/**
  24
+ * {@link HawtEventLoopGroup} implementation which will handle
  25
+ * AIO {@link io.netty.channel.Channel} implementations.
  26
+ *
  27
+ */
  28
+public class HawtEventLoopGroup extends DefaultEventExecutorGroup {
  29
+
  30
+    DispatchQueue dispatchQueue;
  31
+
  32
+    /**
  33
+     *
  34
+     */
  35
+    public HawtEventLoopGroup() {
  36
+        this(Dispatch.getGlobalQueue());
  37
+    }
  38
+
  39
+    /**
  40
+     */
  41
+    public HawtEventLoopGroup(DispatchQueue queue) {
  42
+        super(1);
  43
+        this.dispatchQueue = queue;
  44
+    }
  45
+
  46
+    public DispatchQueue getDispatchQueue() {
  47
+        return dispatchQueue;
  48
+    }
  49
+
  50
+    public void setDispatchQueue(DispatchQueue dispatchQueue) {
  51
+        this.dispatchQueue = dispatchQueue;
  52
+    }
  53
+}
164  hawtdispatch-netty/src/main/java/org/fusesource/hawtdispatch/netty/HawtServerSocketChannel.java
... ...
@@ -0,0 +1,164 @@
  1
+/*
  2
+ * Copyright 2012 The Netty Project
  3
+ * Copyright 2013 Red Hat, Inc.
  4
+ *
  5
+ * The Netty Project licenses this file to you under the Apache License,
  6
+ * version 2.0 (the "License"); you may not use this file except in compliance
  7
+ * with the License. You may obtain a copy of the License at:
  8
+ *
  9
+ *   http://www.apache.org/licenses/LICENSE-2.0
  10
+ *
  11
+ * Unless required by applicable law or agreed to in writing, software
  12
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  13
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  14
+ * License for the specific language governing permissions and limitations
  15
+ * under the License.
  16
+ */
  17
+package org.fusesource.hawtdispatch.netty;
  18
+
  19
+import io.netty.buffer.BufType;
  20
+import io.netty.channel.ChannelException;
  21
+import io.netty.channel.ChannelMetadata;
  22
+import io.netty.channel.ChannelPromise;
  23
+import io.netty.channel.socket.DefaultServerSocketChannelConfig;
  24
+import io.netty.channel.socket.ServerSocketChannel;
  25
+import io.netty.channel.socket.ServerSocketChannelConfig;
  26
+import io.netty.logging.InternalLogger;
  27
+import io.netty.logging.InternalLoggerFactory;
  28
+import org.fusesource.hawtdispatch.*;
  29
+import static org.fusesource.hawtdispatch.Dispatch.*;
  30
+import static java.nio.channels.SelectionKey.*;
  31
+
  32
+import java.io.IOException;
  33
+import java.net.SocketAddress;
  34
+import java.nio.channels.AsynchronousCloseException;
  35
+
  36
+/**
  37
+ * {@link io.netty.channel.socket.ServerSocketChannel} implementation which uses HawtDispatch.
  38
+ *
  39
+ * NIO2 is only supported on Java 7+.
  40
+ */
  41
+public class HawtServerSocketChannel extends HawtAbstractChannel implements ServerSocketChannel {
  42
+
  43
+    private static final ChannelMetadata METADATA = new ChannelMetadata(BufType.MESSAGE, false);
  44
+    private static final InternalLogger logger =
  45
+            InternalLoggerFactory.getInstance(HawtServerSocketChannel.class);
  46
+
  47
+    private boolean closed;
  48
+
  49
+    private static java.nio.channels.ServerSocketChannel newSocket() {
  50
+        try {
  51
+            return java.nio.channels.ServerSocketChannel.open();
  52
+        } catch (IOException e) {
  53
+            throw new ChannelException(
  54
+                    "Failed to open a server socket.", e);
  55
+        }
  56
+    }
  57
+
  58
+    private final ServerSocketChannelConfig config;
  59
+    private final DispatchQueue queue;
  60
+
  61
+    /**
  62
+     * Create a new instance
  63
+     */
  64
+    public HawtServerSocketChannel() {
  65
+        super(null, null, newSocket());
  66
+        queue = createQueue();
  67
+        config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());
  68
+    }
  69
+
  70
+    @Override
  71
+    protected java.nio.channels.ServerSocketChannel javaChannel() {
  72
+        return (java.nio.channels.ServerSocketChannel) super.javaChannel();
  73
+    }
  74
+
  75
+    @Override
  76
+    public boolean isActive() {
  77
+        return ch != null && javaChannel().isOpen() && localAddress0() != null;
  78
+    }
  79
+
  80
+    @Override
  81
+    public ChannelMetadata metadata() {
  82
+        return METADATA;
  83
+    }
  84
+
  85
+    @Override
  86
+    protected SocketAddress localAddress0() {
  87
+        return javaChannel().socket().getLocalSocketAddress();
  88
+    }
  89
+
  90
+    @Override
  91
+    protected SocketAddress remoteAddress0() {
  92
+        return null;
  93
+    }
  94
+
  95
+    @Override
  96
+    protected void doBind(SocketAddress localAddress) throws Exception {
  97
+        javaChannel().socket().bind(localAddress, config.getBacklog());
  98
+    }
  99
+
  100
+    DispatchSource acceptSource;
  101
+
  102
+    @Override
  103
+    protected void doBeginRead() {
  104
+        if( acceptSource==null ) {
  105
+            acceptSource = createSource(javaChannel(), OP_READ, queue);
  106
+            acceptSource.setEventHandler(new Task(){
  107
+              public void run() {
  108
+                  HawtSocketChannel socket = null;
  109
+                  try {
  110
+                      socket = new HawtSocketChannel(HawtServerSocketChannel.this, null, javaChannel().accept());
  111
+                  } catch (IOException e) {
  112
+                      if (isOpen()) {
  113
+                          logger.warn("Failed to create a new channel from an accepted socket.", e);
  114
+                      }
  115
+                  }
  116
+                  pipeline().inboundMessageBuffer().add(socket);
  117
+                  pipeline().fireInboundBufferUpdated();
  118
+                  pipeline().fireInboundBufferSuspended();
  119
+              }
  120
+            });
  121
+            acceptSource.setCancelHandler(new Task(){
  122
+                @Override
  123
+                public void run() {
  124
+                    pipeline().fireChannelUnregistered();
  125
+                    try {
  126
+                        javaChannel().close();
  127
+                    } catch (IOException e) {
  128
+                    }
  129
+                }
  130
+            });
  131
+            acceptSource.resume();
  132
+            pipeline().fireChannelRegistered();
  133
+        }
  134
+    }
  135
+
  136
+    @Override
  137
+    protected void doClose() throws Exception {
  138
+        if (!closed) {
  139
+            closed = true;
  140
+            acceptSource.cancel();
  141
+        }
  142
+    }
  143
+
  144
+    @Override
  145
+    protected boolean isFlushPending() {
  146
+        return false;
  147
+    }
  148
+
  149
+    @Override
  150
+    protected void doConnect(
  151
+            SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
  152
+        promise.setFailure(new UnsupportedOperationException());
  153
+    }
  154
+
  155
+    @Override
  156
+    protected void doDisconnect() throws Exception {
  157
+        throw new UnsupportedOperationException();
  158
+    }
  159
+
  160
+    @Override
  161
+    public ServerSocketChannelConfig config() {
  162
+        return config;
  163
+    }
  164
+}
437  hawtdispatch-netty/src/main/java/org/fusesource/hawtdispatch/netty/HawtSocketChannel.java
... ...
@@ -0,0 +1,437 @@
  1
+/*
  2
+ * Copyright 2012 The Netty Project
  3
+ * Copyright 2013 Red Hat, Inc.
  4
+ *
  5
+ * The Netty Project licenses this file to you under the Apache License,
  6
+ * version 2.0 (the "License"); you may not use this file except in compliance
  7
+ * with the License. You may obtain a copy of the License at:
  8
+ *
  9
+ *   http://www.apache.org/licenses/LICENSE-2.0
  10
+ *
  11
+ * Unless required by applicable law or agreed to in writing, software
  12
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  13
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  14
+ * License for the specific language governing permissions and limitations
  15
+ * under the License.
  16
+ */
  17
+package org.fusesource.hawtdispatch.netty;
  18
+
  19
+import io.netty.buffer.BufType;
  20
+import io.netty.buffer.ByteBuf;
  21
+import io.netty.channel.*;
  22
+import io.netty.channel.socket.ChannelInputShutdownEvent;
  23
+import io.netty.channel.socket.DefaultSocketChannelConfig;
  24
+import io.netty.channel.socket.ServerSocketChannel;
  25
+import io.netty.channel.socket.SocketChannel;
  26
+import io.netty.logging.InternalLogger;
  27
+import io.netty.logging.InternalLoggerFactory;
  28
+import org.fusesource.hawtdispatch.Dispatch;
  29
+import org.fusesource.hawtdispatch.DispatchQueue;
  30
+import org.fusesource.hawtdispatch.DispatchSource;
  31
+import org.fusesource.hawtdispatch.Task;
  32
+
  33
+import java.io.IOException;
  34
+import java.net.InetSocketAddress;
  35
+import java.net.SocketAddress;
  36
+import java.util.concurrent.TimeUnit;
  37
+
  38
+import static java.nio.channels.SelectionKey.*;
  39
+import static org.fusesource.hawtdispatch.Dispatch.createQueue;
  40
+
  41
+
  42
+/**
  43
+ * {@link io.netty.channel.socket.SocketChannel} implementation which uses HawtDispatch.
  44
+ */
  45
+public class HawtSocketChannel extends HawtAbstractChannel implements SocketChannel {
  46
+
  47
+    private static final InternalLogger logger = InternalLoggerFactory.getInstance(HawtSocketChannel.class);
  48
+    private static final ChannelMetadata METADATA = new ChannelMetadata(BufType.BYTE, false);
  49
+
  50
+    private final DefaultSocketChannelConfig config;
  51
+    private volatile boolean inputShutdown;
  52
+    private volatile boolean outputShutdown;
  53
+
  54
+    private boolean readInProgress;
  55
+    private boolean inDoBeginRead;
  56
+    private boolean readAgain;
  57
+    private boolean writeInProgress;
  58
+    private boolean inDoFlushByteBuffer;
  59
+
  60
+    private final DispatchQueue queue;
  61
+
  62
+    private static java.nio.channels.SocketChannel newSocket() {
  63
+        try {
  64
+            return java.nio.channels.SocketChannel.open();
  65
+        } catch (IOException e) {
  66
+            throw new ChannelException("Failed to open a socket.", e);
  67
+        }
  68
+    }
  69
+
  70
+    /**
  71
+     * Create a new instance
  72
+     */
  73
+    public HawtSocketChannel() {
  74
+        this(newSocket());
  75
+    }
  76
+
  77
+    /**
  78
+     * Create a new instance using the given {@link java.nio.channels.SocketChannel}.
  79
+     */
  80
+    public HawtSocketChannel(java.nio.channels.SocketChannel socket) {
  81
+        this(null, null, socket);
  82
+    }
  83
+
  84
+    /**
  85
+     * Create a new instance
  86
+     *
  87
+     * @param parent the {@link io.netty.channel.Channel} which created this instance or {@code null} if it was created by the user
  88
+     * @param id     the id to use for this instance or {@code null} if a new one should be generated
  89
+     * @param socket the {@link java.nio.channels.SocketChannel} which will be used
  90
+     */
  91
+    public HawtSocketChannel(HawtServerSocketChannel parent, Integer id, java.nio.channels.SocketChannel socket) {
  92
+        super(parent, id, socket);
  93
+        try {
  94
+            socket.configureBlocking(false);
  95
+        } catch (IOException e) {
  96
+            try {
  97
+                socket.close();
  98
+            } catch (IOException e2) {
  99
+                if (logger.isWarnEnabled()) {
  100
+                    logger.warn(
  101
+                            "Failed to close a partially initialized socket.", e2);
  102
+                }
  103
+            }
  104
+
  105
+            throw new ChannelException("Failed to enter non-blocking mode.", e);
  106
+        }
  107
+        config = new DefaultSocketChannelConfig(this, socket.socket());
  108
+        queue = createQueue();
  109
+    }
  110
+
  111
+    @Override
  112
+    public ServerSocketChannel parent() {
  113
+        return (ServerSocketChannel) super.parent();
  114
+    }
  115
+
  116
+    @Override
  117
+    public DefaultSocketChannelConfig config() {
  118
+        return config;
  119
+    }
  120
+
  121
+    @Override
  122
+    public boolean isActive() {
  123
+        return ch != null && javaChannel().isOpen() && remoteAddress0() != null;
  124
+    }
  125
+
  126
+    @Override
  127
+    protected java.nio.channels.SocketChannel javaChannel() {
  128
+        return (java.nio.channels.SocketChannel) super.javaChannel();
  129
+    }
  130
+
  131
+    @Override
  132
+    public ChannelMetadata metadata() {
  133
+        return METADATA;
  134
+    }
  135
+
  136
+    @Override
  137
+    public boolean isInputShutdown() {
  138
+        return inputShutdown;
  139
+    }
  140
+
  141
+    /**
  142
+     * Shutdown the input of this {@link Channel}.
  143
+     */
  144
+    void setInputShutdown() {
  145
+        inputShutdown = true;
  146
+    }
  147
+
  148
+    @Override
  149
+    public boolean isOutputShutdown() {
  150
+        return outputShutdown;
  151
+    }
  152
+
  153
+    @Override
  154
+    public ChannelFuture shutdownOutput() {
  155
+        return shutdownOutput(newPromise());
  156
+    }
  157
+
  158
+    @Override
  159
+    public ChannelFuture shutdownOutput(final ChannelPromise promise) {
  160
+        EventLoop loop = eventLoop();
  161
+        if (loop.inEventLoop()) {
  162
+            try {
  163
+                javaChannel().socket().shutdownOutput();
  164
+                outputShutdown = true;
  165
+                promise.setSuccess();
  166
+            } catch (Throwable t) {
  167
+                promise.setFailure(t);
  168
+            }
  169
+        } else {
  170
+            loop.execute(new Runnable() {
  171
+                @Override
  172
+                public void run() {
  173
+                    shutdownOutput(promise);
  174
+                }
  175
+            });
  176
+        }
  177
+        return promise;
  178
+    }
  179
+
  180
+
  181
+    protected DispatchSource createSource(int OP) {
  182
+        return Dispatch.createSource(javaChannel(), OP, queue);
  183
+    }
  184
+
  185
+
  186
+    @Override
  187
+    protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress, final ChannelPromise promise) {
  188
+        assert promise != null;
  189
+
  190
+        if (localAddress != null) {
  191
+            try {
  192
+                javaChannel().socket().bind(localAddress);
  193
+            } catch (IOException e) {
  194
+                promise.setFailure(e);
  195
+                return;
  196
+            }
  197
+        }
  198
+
  199
+        // Hook into the CONNECT event..
  200
+        final DispatchSource connectSource = createSource(OP_CONNECT);
  201
+
  202
+        // Executed when the event source is canceled.
  203
+        connectSource.setCancelHandler(new Task() {
  204
+            public void run() {
  205
+                if (!promise.isDone()) {
  206
+                    promise.setFailure(new IOException("Event connectSource canceled."));
  207
+                }
  208
+            }
  209
+        });
  210
+
  211
+        // This gets triggered when the socket is connected..
  212
+        connectSource.setEventHandler(new Task() {
  213
+            public void run() {
  214
+                if (!promise.isDone() && javaChannel().isConnected()) {
  215
+                    try {
  216
+                        boolean wasActive = isActive();
  217
+                        promise.setSuccess();
  218
+                        if (!wasActive && isActive()) {
  219
+                            pipeline().fireChannelActive();
  220
+                        }
  221
+                    } catch (Throwable t) {
  222
+                        promise.setFailure(t);
  223
+                        pipeline().fireExceptionCaught(t);
  224
+                    } finally {
  225
+                        connectSource.cancel();
  226
+                    }
  227
+                }
  228
+            }
  229
+        });
  230
+
  231
+        // Lets fail if we don't connect after a timeout.
  232
+        queue.executeAfter(config().getConnectTimeoutMillis(), TimeUnit.MILLISECONDS, new Task() {
  233
+            @Override
  234
+            public void run() {
  235
+                if (!promise.isDone()) {
  236
+                    IOException t = new IOException("Connect timeout");
  237
+                    promise.setFailure(t);
  238
+                    pipeline().fireExceptionCaught(t);
  239
+                    connectSource.cancel();
  240
+                }
  241
+            }
  242
+        });
  243
+
  244
+        // enable the delivery of the connect events.
  245
+        connectSource.resume();
  246
+    }
  247
+
  248
+    @Override
  249
+    protected InetSocketAddress localAddress0() {
  250
+        if (ch == null) {
  251
+            return null;
  252
+        }
  253
+        return (InetSocketAddress) javaChannel().socket().getLocalSocketAddress();
  254
+    }
  255
+
  256
+    @Override
  257
+    protected InetSocketAddress remoteAddress0() {
  258
+        if (ch == null) {
  259
+            return null;
  260
+        }
  261
+        return (InetSocketAddress) javaChannel().socket().getRemoteSocketAddress();
  262
+    }
  263
+
  264
+    @Override
  265
+    protected void doBind(SocketAddress localAddress) throws Exception {
  266
+        javaChannel().socket().bind(localAddress);
  267
+    }
  268
+
  269
+    @Override
  270
+    protected void doDisconnect() throws Exception {
  271
+        doClose();
  272
+    }
  273
+
  274
+    @Override
  275
+    protected void doClose() throws Exception {
  276
+        javaChannel().close();
  277
+        inputShutdown = true;
  278
+        outputShutdown = true;
  279
+    }
  280
+
  281
+    @Override
  282
+    protected boolean isFlushPending() {
  283
+        return false;
  284
+    }
  285
+
  286
+
  287
+    @Override
  288
+    protected void doFlushByteBuffer(ByteBuf buf) throws Exception {
  289
+        if (!buf.readable()) {
  290
+            // Reset reader/writerIndex to 0 if the buffer is empty.
  291
+            buf.clear();
  292
+            return;
  293
+        }
  294
+
  295
+        for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
  296
+            int localFlushedAmount = doWriteBytes(buf, i == 0);
  297
+            if (localFlushedAmount > 0) {
  298
+                break;
  299
+            }
  300
+            if (!buf.readable()) {
  301
+                // Reset reader/writerIndex to 0 if the buffer is empty.
  302
+                buf.clear();
  303
+                break;
  304
+            }
  305
+        }
  306
+    }
  307
+
  308
+    DispatchSource writeSource;
  309
+
  310
+    protected int doWriteBytes(ByteBuf buf, boolean lastSpin) throws Exception {
  311
+        final int expectedWrittenBytes = buf.readableBytes();
  312
+        final int writtenBytes = buf.readBytes(javaChannel(), expectedWrittenBytes);
  313
+
  314
+        if (writtenBytes >= expectedWrittenBytes) {
  315
+            // Wrote the outbound buffer completely - clear OP_WRITE.
  316
+            if (writeSource != null) {
  317
+                writeSource.suspend();
  318
+            }
  319
+        } else {
  320
+            // Wrote something or nothing.
  321
+            // a) If wrote something, the caller will not retry.
  322
+            //    - Set OP_WRITE so that the event loop calls flushForcibly() later.
  323
+            // b) If wrote nothing:
  324
+            //    1) If 'lastSpin' is false, the caller will call this method again real soon.
  325
+            //       - Do not update OP_WRITE.
  326
+            //    2) If 'lastSpin' is true, the caller will not retry.
  327
+            //       - Set OP_WRITE so that the event loop calls flushForcibly() later.
  328
+            if (writtenBytes > 0 || lastSpin) {
  329
+                // Lazily create the write source..
  330
+                if (writeSource == null) {
  331
+                    writeSource = createSource(OP_WRITE);
  332
+                    writeSource.setEventHandler(new Task() {
  333
+                        @Override
  334
+                        public void run() {
  335
+                            unsafe().flushNow();
  336
+                        }
  337
+                    });
  338
+                }
  339
+                writeSource.resume();
  340
+
  341
+            }
  342
+        }
  343
+
  344
+        return writtenBytes;
  345
+    }
  346
+
  347
+    DispatchSource readSource;