Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: eventmachine/eventmachine
...
head fork: gnufied/eventmachine
Checking mergeability… Don’t worry, you can still create the pull request.
  • 8 commits
  • 18 files changed
  • 0 commit comments
  • 1 contributor
View
3  .gitignore
@@ -10,4 +10,5 @@ Makefile
*.o
*.log
*.def
-*.pdb
+*.pdb
+java/src/.project
View
200 java/src/com/rubyeventmachine/Application.java
@@ -1,200 +0,0 @@
-/**
- * $Id$
- *
- * Author:: Francis Cianfrocca (gmail: blackhedd)
- * Homepage:: http://rubyeventmachine.com
- * Date:: 15 Jul 2007
- *
- * See EventMachine and EventMachine::Connection for documentation and
- * usage examples.
- *
- *
- *----------------------------------------------------------------------------
- *
- * Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
- * Gmail: blackhedd
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of either: 1) the GNU General Public License
- * as published by the Free Software Foundation; either version 2 of the
- * License, or (at your option) any later version; or 2) Ruby's License.
- *
- * See the file COPYING for complete licensing information.
- *
- *---------------------------------------------------------------------------
- *
- *
- */
-
-/**
- *
- */
-package com.rubyeventmachine;
-
-import java.nio.ByteBuffer;
-import java.nio.channels.*;
-import java.util.*;
-import java.io.*;
-import java.net.*;
-import java.net.SocketAddress;
-
-/**
- * @author francis
- *
- */
-public class Application {
-
-
- public class Reactor extends EmReactor {
-
- private Application application;
- private TreeMap<String, Timer> timers;
- private TreeMap<String, Connection> connections;
- private TreeMap<String, ConnectionFactory> acceptors;
- /**
- *
- */
- public Reactor (Application app) {
- application = app;
- timers = new TreeMap<String, Timer>();
- connections = new TreeMap<String, Connection>();
- acceptors = new TreeMap<String, ConnectionFactory>();
- }
-
-
- public void eventCallback (String sig, int eventType, ByteBuffer data) {
- if (eventType == EM_TIMER_FIRED) {
- String timersig = new String (data.array());
- //System.out.println ("EVSIG "+sig + "..."+new String(data.array()));
- Timer r = timers.remove(timersig);
- if (r != null)
- r._fire();
- else
- throw new RuntimeException ("unable to run unknown timer");
- }
- else if (eventType == EM_CONNECTION_COMPLETED) {
- Connection c = connections.get(sig);
- if (c != null) {
- c.connectionCompleted();
- }
- else
- throw new RuntimeException ("connection completed to unknown object");
-
- }
- else if (eventType == EM_CONNECTION_UNBOUND) {
- Connection c = connections.get(sig);
- if (c != null) {
- c.unbind();
- }
- else
- throw new RuntimeException ("unbind received on unknown object");
- }
- else if (eventType == EM_CONNECTION_ACCEPTED) {
- ConnectionFactory f = acceptors.get(sig);
- if (f != null) {
- Connection c = f.connection();
- c.signature = new String (data.array());
- c.application = application;
- connections.put(c.signature, c);
- c.postInit();
- //System.out.println (sig+"..."+new String(data.array()));
- }
- else
- throw new RuntimeException ("received connection on unknown acceptor");
- }
- else if (eventType == EM_CONNECTION_READ) {
- Connection c = connections.get(sig);
- if (c != null) {
- c.receiveData(data);
- }
- else throw new RuntimeException ("received data on unknown object");
- }
- else {
- System.out.println ("unknown event type: " + eventType);
- }
- }
- }
-
-
- Reactor reactor;
-
- public Application() {
- reactor = new Reactor (this);
- }
- public void addTimer (double seconds, Timer t) {
- t.application = this;
- t.interval = seconds;
- String s = reactor.installOneshotTimer ((int)(seconds * 1000));
- reactor.timers.put(s, t);
-
- }
-
- public void bindConnect (String bindAddr, int bindPort, String host, int port, Connection c) {
- try {
- String s = reactor.connectTcpServer(bindAddr, bindPort, host, port);
- c.application = this;
- c.signature = s;
- reactor.connections.put(s, c);
- c.postInit();
- } catch (ClosedChannelException e) {}
- }
-
- public void connect (String host, int port, Connection c) {
- bindConnect(null, 0, host, port, c);
- }
-
- public void startServer (SocketAddress sa, ConnectionFactory f) throws EmReactorException {
- String s = reactor.startTcpServer(sa);
- reactor.acceptors.put(s, f);
- }
-
- public void stop() {
- reactor.stop();
- }
- public void run() {
- try {
- reactor.run();
- } catch (IOException e) {}
- }
- public void run (final Runnable r) {
- addTimer(0, new Timer() {
- public void fire() {
- r.run();
- }
- });
- run();
- }
-
- public void sendData (String sig, ByteBuffer bb) {
- try {
- reactor.sendData(sig, bb);
- } catch (IOException e) {}
- }
-
- public void sendDatagram (String sig, ByteBuffer bb, InetSocketAddress target) {
- reactor.sendDatagram(sig, bb, target.getHostName(), target.getPort());
- }
-
- public void closeConnection (String sig, boolean afterWriting) {
- try {
- reactor.closeConnection(sig, afterWriting);
- } catch (ClosedChannelException e) {}
- }
-
- public void openDatagramSocket (Connection c) {
- openDatagramSocket (new InetSocketAddress ("0.0.0.0", 0), c);
- }
- public void openDatagramSocket (InetSocketAddress addr, Connection c) {
- try {
- String s = reactor.openUdpSocket(addr);
- c.application = this;
- c.signature = s;
- reactor.connections.put(s, c);
- c.postInit();
- } catch (ClosedChannelException e) {
- } catch (IOException e) {
- System.out.println ("Bad Datagram socket "+e+" "+addr);
- /* TODO, can't catch this here, because it can happen on a bad address */
- }
- }
-}
View
9 java/src/com/rubyeventmachine/ClientDisconnectException.java
@@ -0,0 +1,9 @@
+package com.rubyeventmachine;
+
+public class ClientDisconnectException extends RuntimeException {
+ private static final long serialVersionUID = -318337588650290552L;
+ public String message;
+ public ClientDisconnectException(String _message) {
+ message = _message;
+ }
+}
View
74 java/src/com/rubyeventmachine/Connection.java
@@ -1,74 +0,0 @@
-/**
- * $Id$
- *
- * Author:: Francis Cianfrocca (gmail: blackhedd)
- * Homepage:: http://rubyeventmachine.com
- * Date:: 15 Jul 2007
- *
- * See EventMachine and EventMachine::Connection for documentation and
- * usage examples.
- *
- *
- *----------------------------------------------------------------------------
- *
- * Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
- * Gmail: blackhedd
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of either: 1) the GNU General Public License
- * as published by the Free Software Foundation; either version 2 of the
- * License, or (at your option) any later version; or 2) Ruby's License.
- *
- * See the file COPYING for complete licensing information.
- *
- *---------------------------------------------------------------------------
- *
- *
- */
-
-
-
-package com.rubyeventmachine;
-
-//import java.io.*;
-import java.nio.*;
-import java.net.*;
-//import java.nio.channels.*;
-
-public class Connection {
-
- public Application application;
- public String signature;
-
- public void postInit() {}
- public void connectionCompleted() {}
- public void unbind() {}
- public void receiveData (ByteBuffer bytebuffer) {}
-
-
- /**
- * Called by user code.
- * @param bytebuffer
- */
- public void sendData (ByteBuffer b) {
- application.sendData(signature, b);
- }
-
- /**
- * This is called by user code.
- * TODO: don't expose the exception here.
- */
- public void close() {
- application.closeConnection(signature, false);
- }
- /**
- * This is called by user code/
- */
- public void closeAfterWriting() {
- application.closeConnection(signature, true);
- }
-
- public void sendDatagram (ByteBuffer bb, InetSocketAddress addr) {
- application.sendDatagram (signature, bb, addr);
- }
-}
View
37 java/src/com/rubyeventmachine/ConnectionFactory.java
@@ -1,37 +0,0 @@
-/**
- * $Id$
- *
- * Author:: Francis Cianfrocca (gmail: blackhedd)
- * Homepage:: http://rubyeventmachine.com
- * Date:: 15 Jul 2007
- *
- * See EventMachine and EventMachine::Connection for documentation and
- * usage examples.
- *
- *
- *----------------------------------------------------------------------------
- *
- * Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
- * Gmail: blackhedd
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of either: 1) the GNU General Public License
- * as published by the Free Software Foundation; either version 2 of the
- * License, or (at your option) any later version; or 2) Ruby's License.
- *
- * See the file COPYING for complete licensing information.
- *
- *---------------------------------------------------------------------------
- *
- *
- */
-
-
-
-package com.rubyeventmachine;
-
-//import com.rubyeventmachine.*;
-
-public interface ConnectionFactory {
- public Connection connection();
-}
View
46 java/src/com/rubyeventmachine/DefaultConnectionFactory.java
@@ -1,46 +0,0 @@
-/**
- * $Id$
- *
- * Author:: Francis Cianfrocca (gmail: blackhedd)
- * Homepage:: http://rubyeventmachine.com
- * Date:: 15 Jul 2007
- *
- * See EventMachine and EventMachine::Connection for documentation and
- * usage examples.
- *
- *
- *----------------------------------------------------------------------------
- *
- * Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
- * Gmail: blackhedd
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of either: 1) the GNU General Public License
- * as published by the Free Software Foundation; either version 2 of the
- * License, or (at your option) any later version; or 2) Ruby's License.
- *
- * See the file COPYING for complete licensing information.
- *
- *---------------------------------------------------------------------------
- *
- *
- */
-
-
-
-package com.rubyeventmachine;
-
-import com.rubyeventmachine.ConnectionFactory;
-
-public class DefaultConnectionFactory implements ConnectionFactory {
-
- /**
- * Convenience class. Its connection() method returns an instance of class
- * Connection, which is usually overridden. This class is probably most
- * useful for unit testing.
- */
- public Connection connection() {
- return new Connection();
- }
-
-}
View
252 java/src/com/rubyeventmachine/EmReactor.java
@@ -50,7 +50,8 @@
private TreeMap<Long, String> Timers;
private TreeMap<String, EventableChannel> Connections;
private TreeMap<String, ServerSocketChannel> Acceptors;
-
+ private TreeMap<Long, Set<String>> interestedInInactivity;
+
private boolean bRunReactor;
private long BindingIndex;
private ByteBuffer EmptyByteBuffer;
@@ -62,6 +63,7 @@ public EmReactor() {
Timers = new TreeMap<Long, String>();
Connections = new TreeMap<String, EventableChannel>();
Acceptors = new TreeMap<String, ServerSocketChannel>();
+ interestedInInactivity = new TreeMap<Long, Set<String>>();
BindingIndex = 100000;
EmptyByteBuffer = ByteBuffer.allocate(0);
@@ -70,20 +72,7 @@ public EmReactor() {
myReadBuffer = ByteBuffer.allocate(32*1024); // don't use a direct buffer. Ruby doesn't seem to like them.
timerQuantum = 98;
}
-
- /**
- * Intended to be overridden in languages (like Ruby) that can't handle ByteBuffer. This is a stub.
- * Obsolete now that I figured out how to make Ruby deal with ByteBuffers.
- * @param sig
- * @param eventType
- * @param data
- */
- /*
- public void stringEventCallback (String sig, int eventType, String data) {
- System.out.println ("Default event callback: " + sig + " " + eventType + " " + data);
- }
- */
-
+
/**
* This is a no-op stub, intended to be overridden in user code.
* @param sig
@@ -108,6 +97,9 @@ public void run() throws IOException {
if (!bRunReactor) break;
runTimers();
if (!bRunReactor) break;
+ checkForInactivity();
+ if (!bRunReactor) break;
+
mySelector.select(timerQuantum);
Iterator<SelectionKey> it = mySelector.selectedKeys().iterator();
@@ -116,92 +108,124 @@ public void run() throws IOException {
it.remove();
try {
- if (k.isAcceptable()) {
- ServerSocketChannel ss = (ServerSocketChannel) k.channel();
- SocketChannel sn;
- while ((sn = ss.accept()) != null) {
- sn.configureBlocking(false);
- String b = createBinding();
- EventableSocketChannel ec = new EventableSocketChannel (sn, b, mySelector);
- Connections.put(b, ec);
- eventCallback ((String)k.attachment(), EM_CONNECTION_ACCEPTED, ByteBuffer.wrap(b.getBytes()));
- }
- }
-
- if (k.isReadable()) {
- EventableChannel ec = (EventableChannel)k.attachment();
- myReadBuffer.clear();
- ec.readInboundData (myReadBuffer);
- myReadBuffer.flip();
- String b = ec.getBinding();
- if (myReadBuffer.limit() > 0) {
- eventCallback (b, EM_CONNECTION_READ, myReadBuffer);
- }
- else {
- eventCallback (b, EM_CONNECTION_UNBOUND, EmptyByteBuffer);
- Connections.remove(b);
- k.channel().close();
- }
- /*
- System.out.println ("READABLE");
- SocketChannel sn = (SocketChannel) k.channel();
- //ByteBuffer bb = ByteBuffer.allocate(16 * 1024);
- // Obviously not thread-safe, since we're using the same buffer for every connection.
- // This should minimize the production of garbage, though.
- // TODO, we need somehow to make a call to the EventableChannel, so we can pass the
- // inbound data through an SSLEngine. Hope that won't break the strategy of using one
- // global read-buffer.
- myReadBuffer.clear();
- int r = sn.read(myReadBuffer);
- if (r > 0) {
- myReadBuffer.flip();
- //bb = ((EventableChannel)k.attachment()).dispatchInboundData (bb);
- eventCallback (((EventableChannel)k.attachment()).getBinding(), EM_CONNECTION_READ, myReadBuffer);
- }
- else {
- // TODO. Figure out if a socket that selects readable can ever return 0 bytes
- // without it being indicative of an error condition. If Java is like C, the answer is no.
- String b = ((EventableChannel)k.attachment()).getBinding();
- eventCallback (b, EM_CONNECTION_UNBOUND, EmptyByteBuffer);
- Connections.remove(b);
- sn.close();
- }
- */
- }
-
-
- if (k.isWritable()) {
- EventableChannel ec = (EventableChannel)k.attachment();
- if (!ec.writeOutboundData()) {
- eventCallback (ec.getBinding(), EM_CONNECTION_UNBOUND, EmptyByteBuffer);
- Connections.remove (ec.getBinding());
- k.channel().close();
- }
- }
-
- if (k.isConnectable()) {
- EventableSocketChannel ec = (EventableSocketChannel)k.attachment();
- if (ec.finishConnecting()) {
- eventCallback (ec.getBinding(), EM_CONNECTION_COMPLETED, EmptyByteBuffer);
- }
- else {
- Connections.remove (ec.getBinding());
- k.channel().close();
- eventCallback (ec.getBinding(), EM_CONNECTION_UNBOUND, EmptyByteBuffer);
- }
- }
- }
+ if(k.isAcceptable()) acceptClient(k);
+ else if(k.isReadable()) readData(k);
+ else if(k.isWritable()) handleWrite(k);
+ else if(k.isConnectable()) completeExternalConnection(k);
+ }
catch (CancelledKeyException e) {
- // No-op. We can come here if a read-handler closes a socket before we fall through
- // to call isWritable.
+ e.printStackTrace();
}
-
+ catch (IOException ex){
+ ex.printStackTrace();
+ }
}
}
close();
}
+ public void handleWrite(SelectionKey k) throws IOException {
+ EventableChannel ec = (EventableChannel) k.attachment();
+ ec.updateActivityTimeStamp();
+ try {
+ ec.writeOutboundData();
+ } catch (ClientDisconnectException ex) {
+ cleanupConnection(ec);
+ }
+ }
+
+ public void readData(SelectionKey k) throws IOException {
+ EventableChannel ec = (EventableChannel) k.attachment();
+ myReadBuffer.clear();
+ try {
+ ec.readInboundData(myReadBuffer);
+ myReadBuffer.flip();
+ ec.updateActivityTimeStamp();
+ eventCallback(ec.getBinding(), EM_CONNECTION_READ, myReadBuffer);
+ } catch (ClientDisconnectException e) {
+ System.out.println("There was an exception here");
+ cleanupConnection(ec);
+ }
+ }
+
+ public void acceptClient(SelectionKey k) throws IOException {
+ ServerSocketChannel ss = (ServerSocketChannel) k.channel();
+ SocketChannel sn;
+ while ((sn = ss.accept()) != null) {
+ sn.configureBlocking(false);
+ String b = createBinding();
+ EventableSocketChannel ec = new EventableSocketChannel(sn, b,
+ mySelector, SelectionKey.OP_READ);
+ Connections.put(b, ec);
+ eventCallback((String) k.attachment(), EM_CONNECTION_ACCEPTED,
+ ByteBuffer.wrap(b.getBytes()));
+ }
+ }
+
+ public void completeExternalConnection(SelectionKey k) throws IOException {
+ EventableSocketChannel ec = (EventableSocketChannel) k.attachment();
+ if (ec.finishConnecting()) {
+ eventCallback(ec.getBinding(), EM_CONNECTION_COMPLETED,
+ EmptyByteBuffer);
+ } else {
+ Connections.remove(ec.getBinding());
+ k.channel().close();
+ eventCallback(ec.getBinding(), EM_CONNECTION_UNBOUND,
+ EmptyByteBuffer);
+ }
+ }
+
+ public void cleanupConnection(EventableChannel ec) {
+ eventCallback(ec.getBinding(), EM_CONNECTION_UNBOUND, EmptyByteBuffer);
+ Connections.remove(ec.getBinding());
+ long inactivitykey = ec.currentInActivityKey();
+ Set<String> toRemove = interestedInInactivity.get(inactivitykey);
+ if (toRemove != null) {
+ toRemove.remove(ec.getBinding());
+ interestedInInactivity.put(inactivitykey, toRemove);
+ }
+ }
+
+ public void checkForInactivity() {
+ long now = System.currentTimeMillis();
+ Set<String> toAddback = new HashSet<String>();
+
+ while (!interestedInInactivity.isEmpty()) {
+ long k = interestedInInactivity.firstKey();
+ if (k > now) {
+ break;
+ }
+ Set<String> bindingStringSet = interestedInInactivity.remove(k);
+ Iterator<String> iterator = bindingStringSet.iterator();
+
+ while (iterator.hasNext()) {
+ String key = iterator.next();
+ EventableChannel t = Connections.get(key);
+ if (t != null) {
+ if (t.isInactive()){
+ System.out.println("Schedule a close on the connection");
+ t.scheduleClose(true);
+ }
+ else
+ toAddback.add(t.getBinding());
+ }
+ }
+ }
+ if (!toAddback.isEmpty())
+ addToInactivityCheck(toAddback);
+ }
+
+ public void addToInactivityCheck(Set<String> sids) {
+ Iterator<String> iterator = sids.iterator();
+ while (iterator.hasNext()) {
+ String sid = iterator.next();
+ EventableChannel t = Connections.get(sid);
+ setCommInactivityTimeout(t);
+ }
+ }
+
+
void close() throws IOException {
mySelector.close();
mySelector = null;
@@ -307,14 +331,48 @@ public String openUdpSocket (InetSocketAddress address) throws IOException {
}
public void sendData (String sig, ByteBuffer bb) throws IOException {
- (Connections.get(sig)).scheduleOutboundData( bb );
+ try {
+ EventableChannel ec = Connections.get(sig);
+ if(ec != null) ec.scheduleOutboundData(bb);
+ } catch(ClientDisconnectException ex){
+ System.out.println("Error while writing data to socket");
+ cleanupConnection(Connections.get(sig));
+ }
}
+
public void sendData (String sig, byte[] data) throws IOException {
sendData (sig, ByteBuffer.wrap(data));
//(Connections.get(sig)).scheduleOutboundData( ByteBuffer.wrap(data.getBytes()));
}
- public void setCommInactivityTimeout (String sig, long mills) {
- (Connections.get(sig)).setCommInactivityTimeout (mills);
+ public void setCommInactivityTimeout(EventableChannel t) {
+ long timeout = t.getLastActivity() + t.getInActivityPeriod() * 1000;
+ Set<String> oldSet = interestedInInactivity.get(timeout);
+ String sig = t.getBinding();
+
+ if (oldSet == null) {
+ oldSet = new HashSet<String>();
+ oldSet.add(sig);
+ } else
+ oldSet.add(sig);
+
+ interestedInInactivity.put(timeout, oldSet);
+ t.setInactivityKey(timeout);
+ }
+
+ public void setCommInactivityTimeout(String sig, int seconds) {
+ long timeout = System.currentTimeMillis() + seconds * 1000;
+ Set<String> oldSet = interestedInInactivity.get(timeout);
+
+ if (oldSet == null) {
+ oldSet = new HashSet<String>();
+ oldSet.add(sig);
+ } else {
+ oldSet.add(sig);
+ }
+ interestedInInactivity.put(timeout, oldSet);
+ EventableChannel conn = Connections.get(sig);
+ conn.setCommInactivityTimeout(seconds);
+ conn.setInactivityKey(timeout);
}
/**
@@ -369,7 +427,7 @@ public String connectTcpServer (String bindAddr, int bindPort, String address, i
if (bindAddr != null)
sc.socket().bind(new InetSocketAddress (bindAddr, bindPort));
- EventableSocketChannel ec = new EventableSocketChannel (sc, b, mySelector);
+ EventableSocketChannel ec = new EventableSocketChannel (sc, b, mySelector,0);
if (sc.connect (new InetSocketAddress (address, port))) {
// Connection returned immediately. Can happen with localhost connections.
View
12 java/src/com/rubyeventmachine/EventableChannel.java
@@ -53,5 +53,15 @@
public boolean writeOutboundData();
- public void setCommInactivityTimeout (long seconds);
+ public boolean isInactive();
+
+ public void setCommInactivityTimeout (long seconds);
+
+ public void updateActivityTimeStamp();
+
+ public long getLastActivity();
+ public long getInActivityPeriod();
+
+ public long currentInActivityKey();
+ public void setInactivityKey(long millis);
}
View
38 java/src/com/rubyeventmachine/EventableDatagramChannel.java
@@ -37,6 +37,7 @@
import java.util.LinkedList;
import java.io.*;
import java.net.*;
+import java.util.Date;
public class EventableDatagramChannel implements EventableChannel {
@@ -55,6 +56,9 @@ public Packet (ByteBuffer _bb, SocketAddress _recipient) {
boolean bCloseScheduled;
LinkedList<Packet> outboundQ;
SocketAddress returnAddress;
+ long inactivityPeriod;
+ long lastActivity;
+ long inActivityKey;
public EventableDatagramChannel (DatagramChannel dc, String _binding, Selector sel) throws ClosedChannelException {
@@ -63,6 +67,7 @@ public EventableDatagramChannel (DatagramChannel dc, String _binding, Selector s
selector = sel;
bCloseScheduled = false;
outboundQ = new LinkedList<Packet>();
+ lastActivity = (new Date()).getTime();
dc.register(selector, SelectionKey.OP_READ, this);
}
@@ -163,9 +168,36 @@ public boolean writeOutboundData() {
// the outbound queue.
return (bCloseScheduled && outboundQ.isEmpty()) ? false : true;
}
+ public void setCommInactivityTimeout(long seconds) {
+ inactivityPeriod = seconds;
+ }
+
+ public void updateActivityTimeStamp() {
+ lastActivity = new Date().getTime();
+ }
+
+ public boolean isInactive() {
+ long now = new Date().getTime();
+ if ((now - lastActivity) / 1000 > inactivityPeriod) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public long getLastActivity() {
+ return lastActivity;
+ }
+
+ public long getInActivityPeriod() {
+ return inactivityPeriod;
+ }
+
+ public void setInactivityKey(long mills) {
+ inActivityKey = mills;
+ }
- public void setCommInactivityTimeout (long seconds) {
- // TODO
- System.out.println ("DATAGRAM: SET COMM INACTIVITY UNIMPLEMENTED " + seconds);
+ public long currentInActivityKey() {
+ return inActivityKey;
}
}
View
187 java/src/com/rubyeventmachine/EventableSocketChannel.java
@@ -51,25 +51,35 @@
SocketChannel channel;
String binding;
Selector selector;
- LinkedList<ByteBuffer> outboundQ;
+ ArrayDeque<ByteBuffer> outboundQ;
boolean bCloseScheduled;
+ boolean connectionStatus = false;
+
+ long inactivityPeriod;
+ long lastActivity;
+ long inActivityKey;
+ boolean writeScheduled;
+
boolean bConnectPending;
SSLEngine sslEngine;
-
-
SSLContext sslContext;
-
-
- public EventableSocketChannel (SocketChannel sc, String _binding, Selector sel) throws ClosedChannelException {
+
+ SelectionKey channelKey;
+ public EventableSocketChannel(SocketChannel sc, String _binding,
+ Selector sel, int ops) throws ClosedChannelException {
+ writeScheduled = false;
channel = sc;
binding = _binding;
selector = sel;
bCloseScheduled = false;
bConnectPending = false;
- outboundQ = new LinkedList<ByteBuffer>();
-
- sc.register(selector, SelectionKey.OP_READ, this);
+ outboundQ = new ArrayDeque<ByteBuffer>();
+ lastActivity = new Date().getTime();
+ if (ops != 0) {
+ connectionStatus = true;
+ channelKey = sc.register(selector, ops, this);
+ }
}
public String getBinding() {
@@ -87,27 +97,20 @@ public void close() {
}
}
- public void scheduleOutboundData (ByteBuffer bb) {
- try {
- if ((!bCloseScheduled) && (bb.remaining() > 0)) {
- if (sslEngine != null) {
- ByteBuffer b = ByteBuffer.allocate(32*1024); // TODO, preallocate this buffer.
- sslEngine.wrap(bb, b);
- b.flip();
- outboundQ.addLast(b);
- }
- else {
- outboundQ.addLast(bb);
- }
- channel.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ | (bConnectPending ? SelectionKey.OP_CONNECT : 0), this);
- }
- } catch (ClosedChannelException e) {
- throw new RuntimeException ("no outbound data");
- } catch (SSLException e) {
- throw new RuntimeException ("no outbound data");
+ public void scheduleOutboundData(ByteBuffer bb) {
+ try {
+ if(sslEngine != null) {
+ ByteBuffer b= ByteBuffer.allocate(32*1024);
+ sslEngine.wrap(bb,b);
+ b.flip();
+ outboundQ.offer(b);
+ } else outboundQ.offer(bb);
+ } catch(SSLException e) {
+ throw new RuntimeException("Error while writing to ssl channel");
}
+ writeOutboundData();
}
-
+
public void scheduleOutboundDatagram (ByteBuffer bb, String recipAddress, int recipPort) {
throw new RuntimeException ("datagram sends not supported on this channel");
}
@@ -115,13 +118,32 @@ public void scheduleOutboundDatagram (ByteBuffer bb, String recipAddress, int re
/**
* Called by the reactor when we have selected readable.
*/
- public void readInboundData (ByteBuffer bb) {
+ public void readInboundData(ByteBuffer bb){
+ int dataLength = 0;
try {
- channel.read(bb);
+ dataLength = channel.read(bb);
} catch (IOException e) {
- throw new RuntimeException ("i/o error");
+ System.out.println("I am in readInboundData");
+ signalConnectionFail();
+ }
+
+ if (dataLength == -1){
+ System.out.println("I am throwing from data length");
+ signalConnectionFail();
}
}
+
+ public void signalConnectionFail() {
+ System.out.println("Cancel the key from taking part in polling");
+ connectionStatus = false;
+ channelKey.cancel();
+ try {
+ channel.close();
+ } catch(IOException ex) {}
+
+ throw new ClientDisconnectException("Client closed the connection");
+ }
+
/**
* Called by the reactor when we have selected writable.
* Return false to indicate an error that should cause the connection to close.
@@ -137,37 +159,47 @@ public void readInboundData (ByteBuffer bb) {
*/
public boolean writeOutboundData(){
while (!outboundQ.isEmpty()) {
- ByteBuffer b = outboundQ.getFirst();
+ ByteBuffer b = outboundQ.peek();
+ int n = 0;
try {
- if (b.remaining() > 0)
- channel.write(b);
- }
- catch (IOException e) {
- return false;
+ if (b.hasRemaining())
+ n = channel.write(b);
+ } catch (IOException e) {
+ signalConnectionFail();
+ //return false;
}
-
// Did we consume the whole outbound buffer? If yes,
// pop it off and keep looping. If no, the outbound network
// buffers are full, so break out of here.
- if (b.remaining() == 0)
- outboundQ.removeFirst();
- else
- break;
+ if (!b.hasRemaining()) outboundQ.poll();
+ else break;
}
- if (outboundQ.isEmpty()) {
- try {
- channel.register(selector, SelectionKey.OP_READ, this);
- } catch (ClosedChannelException e) {
- }
+ if (outboundQ.isEmpty()){
+ cancelWrite();
}
-
+ else if (!outboundQ.isEmpty() && !writeScheduled) {
+ writeScheduled = true;
+ channelKey.interestOps(SelectionKey.OP_WRITE);
+ }
+
// ALWAYS drain the outbound queue before triggering a connection close.
- // If anyone wants to close immediately, they're responsible for clearing
+ // If anyone wants to close immediately, they're responsible for
+ // clearing
// the outbound queue.
- return (bCloseScheduled && outboundQ.isEmpty()) ? false : true;
- }
+ if(bCloseScheduled && outboundQ.isEmpty()) {
+ signalConnectionFail();
+ return false;
+ } else return true;
+ }
+ public void cancelWrite() {
+ if (writeScheduled) {
+ writeScheduled = false;
+ channelKey.interestOps(SelectionKey.OP_READ);
+ }
+ }
+
public void setConnectPending() throws ClosedChannelException {
channel.register(selector, SelectionKey.OP_CONNECT, this);
bConnectPending = true;
@@ -186,21 +218,20 @@ public boolean finishConnecting() throws ClosedChannelException {
return false;
}
bConnectPending = false;
- channel.register(selector, SelectionKey.OP_READ | (outboundQ.isEmpty() ? 0 : SelectionKey.OP_WRITE), this);
+ channelKey = channel.register(selector, SelectionKey.OP_READ, this);
return true;
}
public void scheduleClose (boolean afterWriting) {
// TODO: What the hell happens here if bConnectPending is set?
- if (!afterWriting)
- outboundQ.clear();
- try {
- channel.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE, this);
- } catch (ClosedChannelException e) {
- throw new RuntimeException ("unable to schedule close"); // TODO, get rid of this.
- }
+ if (!afterWriting) outboundQ.clear();
+ else {
+ writeScheduled = true;
+ channelKey.interestOps(SelectionKey.OP_WRITE);
+ }
bCloseScheduled = true;
}
+
public void startTls() {
if (sslEngine == null) {
try {
@@ -236,9 +267,41 @@ public ByteBuffer dispatchInboundData (ByteBuffer bb) throws SSLException {
else
return bb;
}
+
+ public void setCommInactivityTimeout(long seconds) {
+ inactivityPeriod = seconds;
+ }
+
+ public boolean isInactive() {
+ long now = new Date().getTime();
+ if ((now - lastActivity) / 1000 > inactivityPeriod) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public void updateActivityTimeStamp() {
+ lastActivity = new Date().getTime();
+ }
+
+ public long getLastActivity() {
+ return lastActivity;
+ }
+
+ public long getInActivityPeriod() {
+ return inactivityPeriod;
+ }
+
+ public void setInactivityKey(long mills) {
+ inActivityKey = mills;
+ }
+
+ public long currentInActivityKey() {
+ return inActivityKey;
+ }
- public void setCommInactivityTimeout (long seconds) {
- // TODO
- System.out.println ("SOCKET: SET COMM INACTIVITY UNIMPLEMENTED " + seconds);
+ public SocketChannel getChannel() {
+ return channel;
}
}
View
38 java/src/com/rubyeventmachine/PeriodicTimer.java
@@ -1,38 +0,0 @@
-/**
- * $Id$
- *
- * Author:: Francis Cianfrocca (gmail: blackhedd)
- * Homepage:: http://rubyeventmachine.com
- * Date:: 15 Jul 2007
- *
- * See EventMachine and EventMachine::Connection for documentation and
- * usage examples.
- *
- *
- *----------------------------------------------------------------------------
- *
- * Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
- * Gmail: blackhedd
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of either: 1) the GNU General Public License
- * as published by the Free Software Foundation; either version 2 of the
- * License, or (at your option) any later version; or 2) Ruby's License.
- *
- * See the file COPYING for complete licensing information.
- *
- *---------------------------------------------------------------------------
- *
- *
- */
-
-
-package com.rubyeventmachine;
-
-public class PeriodicTimer extends Timer {
-
- public void _fire() {
- fire();
- application.addTimer(interval, this);
- }
-}
View
54 java/src/com/rubyeventmachine/Timer.java
@@ -1,54 +0,0 @@
-/**
- * $Id$
- *
- * Author:: Francis Cianfrocca (gmail: blackhedd)
- * Homepage:: http://rubyeventmachine.com
- * Date:: 15 Jul 2007
- *
- * See EventMachine and EventMachine::Connection for documentation and
- * usage examples.
- *
- *
- *----------------------------------------------------------------------------
- *
- * Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
- * Gmail: blackhedd
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of either: 1) the GNU General Public License
- * as published by the Free Software Foundation; either version 2 of the
- * License, or (at your option) any later version; or 2) Ruby's License.
- *
- * See the file COPYING for complete licensing information.
- *
- *---------------------------------------------------------------------------
- *
- *
- */
-
-
-package com.rubyeventmachine;
-
-public class Timer {
- /**
- * User code is expected to call a method on a controlling Application,
- * which will fill in this field so subsequent user code can access it.
- */
- public Application application;
- public double interval;
-
- /**
- * The reactor calls here, and it may be overridden in subclasses.
- * User code should never call this method.
- */
- public void _fire() {
- fire();
- }
-
- /**
- * User code is expected to override this method.
- */
- public void fire() {
- }
-
-}
View
108 java/src/com/rubyeventmachine/tests/ApplicationTest.java
@@ -1,108 +0,0 @@
-/**
- * $Id$
- *
- * Author:: Francis Cianfrocca (gmail: blackhedd)
- * Homepage:: http://rubyeventmachine.com
- * Date:: 15 Jul 2007
- *
- * See EventMachine and EventMachine::Connection for documentation and
- * usage examples.
- *
- *
- *----------------------------------------------------------------------------
- *
- * Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
- * Gmail: blackhedd
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of either: 1) the GNU General Public License
- * as published by the Free Software Foundation; either version 2 of the
- * License, or (at your option) any later version; or 2) Ruby's License.
- *
- * See the file COPYING for complete licensing information.
- *
- *---------------------------------------------------------------------------
- *
- *
- */
-
-
-
-package com.rubyeventmachine.tests;
-
-
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.Assert;
-import java.net.*;
-import java.io.*;
-import java.nio.*;
-
-import com.rubyeventmachine.*;
-
-public class ApplicationTest {
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- }
-
- @Before
- public void setUp() throws Exception {
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
- @Test
- public void testRunnableArgument() {
- final Application a = new Application();
- a.run (new Runnable() {
- public void run() {
- a.stop();
- }
- });
- }
-
-
-
- class F implements ConnectionFactory {
- public Connection connection() {
- return new Connection() {
- public void receiveData (ByteBuffer bb) {
- application.stop();
- }
- };
- }
-
- };
-
- @Test
- public void testTcpServer() throws EmReactorException {
- final Application a = new Application();
- final SocketAddress saddr = new InetSocketAddress ("127.0.0.1", 9008);
- a.run (new Runnable() {
- public void run() {
- try {
- a.startServer (saddr, new F());
- } catch (EmReactorException e) { Assert.fail(); }
- new Thread() {
- public void run() {
- try {
- Socket s = new Socket ("127.0.0.1", 9008);
- s.getOutputStream().write(new String ("boo").getBytes());
- } catch (UnknownHostException e) {
- } catch (IOException e) {}
- }
- }.start();
- }
- });
- }
-}
View
146 java/src/com/rubyeventmachine/tests/ConnectTest.java
@@ -1,146 +0,0 @@
-/**
- * $Id$
- *
- * Author:: Francis Cianfrocca (gmail: blackhedd)
- * Homepage:: http://rubyeventmachine.com
- * Date:: 15 Jul 2007
- *
- * See EventMachine and EventMachine::Connection for documentation and
- * usage examples.
- *
- *
- *----------------------------------------------------------------------------
- *
- * Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
- * Gmail: blackhedd
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of either: 1) the GNU General Public License
- * as published by the Free Software Foundation; either version 2 of the
- * License, or (at your option) any later version; or 2) Ruby's License.
- *
- * See the file COPYING for complete licensing information.
- *
- *---------------------------------------------------------------------------
- *
- *
- */
-
-
-package com.rubyeventmachine.tests;
-
-
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import java.io.*;
-import java.nio.*;
-import java.nio.channels.*;
-
-import com.rubyeventmachine.*;
-
-public class ConnectTest {
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- }
-
- @Before
- public void setUp() throws Exception {
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
- @Test
- public final void test1() throws IOException, ClosedChannelException {
- Application a = new Application();
- a.addTimer(0, new Timer() {
- public void fire() {
- application.connect("www.bayshorenetworks.com", 80, new Connection() {
- public void connectionCompleted() {
- close();
- }
- public void unbind() {
- application.stop();
- }
- });
- }
- });
- a.run();
- }
-
- @Test
- public final void test2() throws IOException {
- class Bays extends Connection {
- public void connectionCompleted() {
- sendData (ByteBuffer.wrap( new String ("GET / HTTP/1.1\r\nHost: _\r\n\r\n").getBytes()));
- }
- public void receiveData (ByteBuffer b) {
- System.out.println (new String(b.array()));
- application.stop();
- }
- };
-
- Application a = new Application();
- a.addTimer(0, new Timer() {
- public void fire() {
- application.connect("www.bayshorenetworks.com", 80, new Bays());
- }
- });
- a.run();
- }
-
- public final void testBindConnect() throws IOException {
- class Server extends Connection {
- public void postInit() {
- // TODO: get peername here and check if the port is 33333
- // doesnt seem like peername is impl yet?
- System.out.println("post init!");
- }
- };
-
- Application a = new Application();
- a.addTimer(0, new Timer() {
- public void fire() {
- application.startServer(new InetSocketAddress("localhost", 20000), new Server());
- }
- });
- a.addTimer(500, new Timer() {
- public void fire() {
- application.bindConnect("localhost", 33333, "localhost", 20000);
- }
- });
-
- a.run();
- }
-
- class C1 extends Connection {
- Application application;
- public C1 (Application a) {
- application = a;
- }
- public void postInit() {
- application.stop();
- }
- }
- @Test
- public final void test3() {
- final Application a = new Application();
- a.run (new Runnable() {
- public void run() {
- a.connect("www.bayshorenetworks.com", 80, new C1(a));
- }
- });
- }
-
-
-
-}
View
53 java/src/com/rubyeventmachine/tests/TestDatagrams.java
@@ -1,53 +0,0 @@
-package com.rubyeventmachine.tests;
-
-import com.rubyeventmachine.*;
-import java.net.*;
-import java.nio.*;
-
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-
-public class TestDatagrams {
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- }
-
- @Before
- public void setUp() throws Exception {
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
- class A extends Connection {
- public void receiveData (ByteBuffer bb) {
- application.stop();
- }
- }
- class B extends Connection {
- public void postInit() {
- this.sendDatagram(ByteBuffer.wrap(new String("ABC").getBytes()), new InetSocketAddress ("127.0.0.1", 9550));
- }
-
- }
- @Test
- public final void testA() {
- final Application a = new Application();
- a.run (new Runnable() {
- public void run() {
- a.openDatagramSocket( new InetSocketAddress ("0.0.0.0", 9550), new A() );
- a.openDatagramSocket( new B() );
- }
- });
- }
-}
View
74 java/src/com/rubyeventmachine/tests/TestServers.java
@@ -1,74 +0,0 @@
-/**
- * $Id$
- *
- * Author:: Francis Cianfrocca (gmail: blackhedd)
- * Homepage:: http://rubyeventmachine.com
- * Date:: 15 Jul 2007
- *
- * See EventMachine and EventMachine::Connection for documentation and
- * usage examples.
- *
- *
- *----------------------------------------------------------------------------
- *
- * Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
- * Gmail: blackhedd
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of either: 1) the GNU General Public License
- * as published by the Free Software Foundation; either version 2 of the
- * License, or (at your option) any later version; or 2) Ruby's License.
- *
- * See the file COPYING for complete licensing information.
- *
- *---------------------------------------------------------------------------
- *
- *
- */
-
-
-package com.rubyeventmachine.tests;
-
-
-import com.rubyeventmachine.*;
-import java.net.*;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.Assert;
-
-public class TestServers {
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- }
-
- @Before
- public void setUp() throws Exception {
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
-
- @Test
- public void testBadServerAddress() {
- final Application a = new Application();
- a.run (new Runnable() {
- public void run() {
- try {
- a.startServer(new InetSocketAddress ("100.100.100.100", 100), new DefaultConnectionFactory());
- Assert.fail ("was supposed to throw a reactor exception");
- } catch (EmReactorException e) {}
- a.stop();
- }
- });
- }
-}
View
89 java/src/com/rubyeventmachine/tests/TestTimers.java
@@ -1,89 +0,0 @@
-/**
- * $Id$
- *
- * Author:: Francis Cianfrocca (gmail: blackhedd)
- * Homepage:: http://rubyeventmachine.com
- * Date:: 15 Jul 2007
- *
- * See EventMachine and EventMachine::Connection for documentation and
- * usage examples.
- *
- *
- *----------------------------------------------------------------------------
- *
- * Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
- * Gmail: blackhedd
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of either: 1) the GNU General Public License
- * as published by the Free Software Foundation; either version 2 of the
- * License, or (at your option) any later version; or 2) Ruby's License.
- *
- * See the file COPYING for complete licensing information.
- *
- *---------------------------------------------------------------------------
- *
- *
- */
-
-
-package com.rubyeventmachine.tests;
-
-import com.rubyeventmachine.*;
-import java.io.*;
-
-import org.junit.Assert;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-
-public class TestTimers {
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- }
-
- @Before
- public void setUp() throws Exception {
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
-
-
- @Test
- public final void test2() throws IOException {
- Application a = new Application();
- a.addTimer(0, new Timer() {
- public void fire() {
- application.stop();
- }
- });
- a.run();
- Assert.assertEquals (1, 1); // just to make sure the reactor halts.
- }
-
- @Test
- public final void test3() throws IOException {
- Application a = new Application();
- a.addTimer (0.1, new PeriodicTimer() {
- int n = 0;
- public void fire() {
- n++;
- if (n == 5)
- application.stop();
- }
- });
- a.run();
- Assert.assertEquals(1, 1);
- }
-}
View
27 lib/jeventmachine.rb
@@ -3,7 +3,7 @@
# Author:: Francis Cianfrocca (gmail: blackhedd)
# Homepage:: http://rubyeventmachine.com
# Date:: 8 Apr 2006
-#
+#
# See EventMachine and EventMachine::Connection for documentation and
# usage examples.
#
@@ -11,17 +11,17 @@
#
# Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
# Gmail: blackhedd
-#
+#
# This program is free software; you can redistribute it and/or modify
# it under the terms of either: 1) the GNU General Public License
# as published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version; or 2) Ruby's License.
-#
+#
# See the file COPYING for complete licensing information.
#
#---------------------------------------------------------------------------
#
-#
+#
# This module provides "glue" for the Java version of the EventMachine reactor core.
# For C++ EventMachines, the analogous functionality is found in ext/rubymain.cpp,
@@ -48,11 +48,11 @@ def eventCallback a1, a2, a3
EventMachine::event_callback a1, a2, s
end
end
- class Connection < com.rubyeventmachine.Connection
- def associate_callback_target sig
- # No-op for the time being.
- end
- end
+ # class Connection < com.rubyeventmachine.Connection
+ # def associate_callback_target sig
+ # # No-op for the time being.
+ # end
+ # end
def self.initialize_event_machine
@em = JEM.new
end
@@ -87,9 +87,16 @@ def self.send_datagram sig, data, length, address, port
def self.connect_server server, port
bind_connect_server nil, nil, server, port
end
+
def self.bind_connect_server bind_addr, bind_port, server, port
- @em.connectTcpServer bind_addr, bind_port, server, port
+ if bind_addr
+ @em.connectTcpServer bind_addr, bind_port, server, port
+ else
+ @em.connectTcpServer server, port
+ end
+
end
+
def self.close_connection sig, after_writing
@em.closeConnection sig, after_writing
end

No commit comments for this range

Something went wrong with that request. Please try again.