Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Implement socket asyncconnect, asynclisten on JVM.
  • Loading branch information
donaldh committed Apr 25, 2014
1 parent fd6faf9 commit e5b3c06
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 6 deletions.
89 changes: 89 additions & 0 deletions src/vm/jvm/runtime/org/perl6/nqp/io/AsyncServerSocketHandle.java
@@ -0,0 +1,89 @@
package org.perl6.nqp.io;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.channels.NotYetBoundException;

import org.perl6.nqp.runtime.ExceptionHandling;
import org.perl6.nqp.runtime.Ops;
import org.perl6.nqp.runtime.ThreadContext;
import org.perl6.nqp.sixmodel.SixModelObject;
import org.perl6.nqp.sixmodel.reprs.AsyncTaskInstance;
import org.perl6.nqp.sixmodel.reprs.ConcBlockingQueueInstance;
import org.perl6.nqp.sixmodel.reprs.IOHandleInstance;

public class AsyncServerSocketHandle implements IIOBindable {

AsynchronousServerSocketChannel listenChan;

public AsyncServerSocketHandle(ThreadContext tc) {
try {
listenChan = AsynchronousServerSocketChannel.open();
} catch (IOException e) {
ExceptionHandling.dieInternal(tc, e);
}
}

@Override
public void bind(ThreadContext tc, String host, int port) {
try {
InetSocketAddress addr = new InetSocketAddress(host, port);
listenChan.bind(addr);
} catch (IOException e) {
throw ExceptionHandling.dieInternal(tc, e);
}
}

public void accept(final ThreadContext tc, final AsyncTaskInstance task) {

final CompletionHandler<AsynchronousSocketChannel, AsyncTaskInstance> handler
= new CompletionHandler<AsynchronousSocketChannel, AsyncTaskInstance>() {

@Override
public void completed(AsynchronousSocketChannel channel, AsyncTaskInstance task) {
listenChan.accept(task, this);

SixModelObject Array = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.listType;
SixModelObject IOType = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.ioType;
SixModelObject Null = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.nullValue;

ThreadContext curTC = tc.gc.getCurrentThreadContext();
AsyncSocketHandle handle = new AsyncSocketHandle(curTC, channel);
IOHandleInstance ioHandle = (IOHandleInstance) IOType.st.REPR.allocate(curTC,
IOType.st);
ioHandle.handle = handle;

SixModelObject result = Array.st.REPR.allocate(curTC, Array.st);
result.push_boxed(curTC, task.schedulee);
result.push_boxed(curTC, ioHandle);
result.push_boxed(curTC, Null);

((ConcBlockingQueueInstance) task.queue).push_boxed(curTC, result);
}

@Override
public void failed(Throwable exc, AsyncTaskInstance task) {

SixModelObject Array = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.listType;
SixModelObject IOType = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.ioType;
SixModelObject Str = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.strBoxType;
ThreadContext curTC = tc.gc.getCurrentThreadContext();

SixModelObject result = Array.st.REPR.allocate(curTC, Array.st);
result.push_boxed(curTC, task.schedulee);
result.push_boxed(curTC, IOType);
result.push_boxed(curTC, Ops.box_s(exc.getMessage(), Str, curTC));
}
};

try {
listenChan.accept(task, handler);
} catch (NotYetBoundException e) {
throw ExceptionHandling.dieInternal(tc, e);
}
}

}
91 changes: 91 additions & 0 deletions src/vm/jvm/runtime/org/perl6/nqp/io/AsyncSocketHandle.java
@@ -0,0 +1,91 @@
package org.perl6.nqp.io;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;

import org.perl6.nqp.runtime.ExceptionHandling;
import org.perl6.nqp.runtime.ThreadContext;
import org.perl6.nqp.sixmodel.SixModelObject;
import org.perl6.nqp.sixmodel.reprs.AsyncTaskInstance;
import org.perl6.nqp.sixmodel.reprs.ConcBlockingQueueInstance;
import org.perl6.nqp.sixmodel.reprs.IOHandleInstance;

public class AsyncSocketHandle implements IIOClosable, IIOEncodable {

private AsynchronousSocketChannel channel;

private CharsetEncoder enc;
private CharsetDecoder dec;

public AsyncSocketHandle(ThreadContext tc) {
try {
this.channel = AsynchronousSocketChannel.open();
setEncoding(tc, Charset.forName("UTF-8"));
} catch (IOException e) {
throw ExceptionHandling.dieInternal(tc, e);
}
}

public AsyncSocketHandle(ThreadContext tc, AsynchronousSocketChannel channel) {
this.channel = channel;
setEncoding(tc, Charset.forName("UTF-8"));
}

public void connect(final ThreadContext tc, String host, int port,
final AsyncTaskInstance task) {

final CompletionHandler<Void, AsyncTaskInstance> handler
= new CompletionHandler<Void, AsyncTaskInstance>() {

@Override
public void completed(Void v, AsyncTaskInstance task) {
ThreadContext curTC = tc.gc.getCurrentThreadContext();

SixModelObject IOType = curTC.curFrame.codeRef.staticInfo.compUnit.hllConfig.ioType;
IOHandleInstance ioHandle = (IOHandleInstance) IOType.st.REPR.allocate(curTC,
IOType.st);
ioHandle.handle = task.handle;

SixModelObject Array = curTC.curFrame.codeRef.staticInfo.compUnit.hllConfig.listType;
SixModelObject result = Array.st.REPR.allocate(curTC, Array.st);
result.push_boxed(curTC, task.schedulee);
result.push_boxed(curTC, ioHandle);

((ConcBlockingQueueInstance) task.queue).push_boxed(curTC, result);
}

@Override
public void failed(Throwable exc, AsyncTaskInstance task) {

}
};

try {
InetSocketAddress addr = new InetSocketAddress(host, port);
channel.connect(addr, task, handler);
} catch (Throwable e) {
throw ExceptionHandling.dieInternal(tc, e);
}
}

@Override
public void close(ThreadContext tc) {
try {
channel.close();
} catch (IOException e) {
throw ExceptionHandling.dieInternal(tc, e);
}
}

@Override
public void setEncoding(ThreadContext tc, Charset cs) {
enc = cs.newEncoder();
dec = cs.newDecoder();
}

}
3 changes: 1 addition & 2 deletions src/vm/jvm/runtime/org/perl6/nqp/io/IIOBindable.java
Expand Up @@ -5,6 +5,5 @@
public interface IIOBindable {

public void bind(ThreadContext tc, String host, int port);
public SocketHandle accept(ThreadContext tc);

}
}
29 changes: 26 additions & 3 deletions src/vm/jvm/runtime/org/perl6/nqp/runtime/IOOps.java
@@ -1,12 +1,15 @@
package org.perl6.nqp.runtime;

import org.perl6.nqp.io.AsyncServerSocketHandle;
import org.perl6.nqp.io.AsyncSocketHandle;
import org.perl6.nqp.sixmodel.SixModelObject;
import org.perl6.nqp.sixmodel.reprs.AsyncTaskInstance;

public final class IOOps {

public static SixModelObject signal(SixModelObject queue, SixModelObject schedulee,
long signalNum, SixModelObject asyncType, ThreadContext tc) {
throw new UnsupportedOperationException("signal is not yet implemented.");
throw new UnsupportedOperationException("signal is not available on JVM.");
}

public static SixModelObject watchfile(SixModelObject queue, SixModelObject schedulee,
Expand All @@ -16,12 +19,32 @@ public static SixModelObject watchfile(SixModelObject queue, SixModelObject sche

public static SixModelObject asyncconnect(SixModelObject queue, SixModelObject schedulee,
String host, long port, SixModelObject asyncType, ThreadContext tc) {
throw new UnsupportedOperationException("asyncconnect is not yet implemented.");

AsyncTaskInstance task = (AsyncTaskInstance) asyncType.st.REPR.allocate(tc, asyncType.st);
task.queue = queue;
task.schedulee = schedulee;

AsyncSocketHandle handle = new AsyncSocketHandle(tc);
task.handle = handle;
handle.connect(tc, host, (int) port, task);

return task;
}

public static SixModelObject asynclisten(SixModelObject queue, SixModelObject schedulee,
String host, long port, SixModelObject asyncType, ThreadContext tc) {
throw new UnsupportedOperationException("asynclisten is not yet implemented.");

AsyncTaskInstance task = (AsyncTaskInstance) asyncType.st.REPR.allocate(tc, asyncType.st);
task.queue = queue;
task.schedulee = schedulee;

AsyncServerSocketHandle handle = new AsyncServerSocketHandle(tc);
task.handle = handle;

handle.bind(tc, host, (int) port);
handle.accept(tc, task);

return task;
}

public static SixModelObject asyncwritestr(SixModelObject handle, SixModelObject queue,
Expand Down
Expand Up @@ -3,5 +3,11 @@
import org.perl6.nqp.sixmodel.SixModelObject;

public class AsyncTaskInstance extends SixModelObject {


public SixModelObject queue;
public SixModelObject schedulee;

/* Object that can perform I/O operations; will be checked for its
* capabilities by interface by ops and then invoked. */
public Object handle;
}

0 comments on commit e5b3c06

Please sign in to comment.