Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Implement failed callbacks in async IO on JVM.
  • Loading branch information
donaldh committed Apr 30, 2014
1 parent 65bcfae commit 77672d0
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 35 deletions.
19 changes: 8 additions & 11 deletions src/vm/jvm/runtime/org/perl6/nqp/io/AsyncServerSocketHandle.java
Expand Up @@ -8,6 +8,7 @@
import java.nio.channels.NotYetBoundException;

import org.perl6.nqp.runtime.ExceptionHandling;
import org.perl6.nqp.runtime.HLLConfig;
import org.perl6.nqp.runtime.Ops;
import org.perl6.nqp.runtime.ThreadContext;
import org.perl6.nqp.sixmodel.SixModelObject;
Expand Down Expand Up @@ -42,15 +43,17 @@ public void accept(final ThreadContext tc, final AsyncTaskInstance task) {
final CompletionHandler<AsynchronousSocketChannel, AsyncTaskInstance> handler
= new CompletionHandler<AsynchronousSocketChannel, AsyncTaskInstance>() {

HLLConfig hllConfig = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig;
final SixModelObject IOType = hllConfig.ioType;
final SixModelObject Array = hllConfig.listType;
final SixModelObject Null = hllConfig.nullValue;
final SixModelObject Str = hllConfig.strBoxType;

@Override
public void completed(AsynchronousSocketChannel channel, AsyncTaskInstance task) {
listenChan.accept(task, this);

SixModelObject Array = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.listType;
SixModelObject IOType = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.ioType;
SixModelObject Null = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.nullValue;

ThreadContext curTC = tc.gc.getCurrentThreadContext();

AsyncSocketHandle handle = new AsyncSocketHandle(curTC, channel);
IOHandleInstance ioHandle = (IOHandleInstance) IOType.st.REPR.allocate(curTC,
IOType.st);
Expand All @@ -66,12 +69,7 @@ public void completed(AsynchronousSocketChannel channel, AsyncTaskInstance task)

@Override
public void failed(Throwable exc, AsyncTaskInstance task) {

SixModelObject Array = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.listType;
SixModelObject IOType = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.ioType;
SixModelObject Str = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.strBoxType;
ThreadContext curTC = tc.gc.getCurrentThreadContext();

SixModelObject result = Array.st.REPR.allocate(curTC, Array.st);
result.push_boxed(curTC, task.schedulee);
result.push_boxed(curTC, IOType);
Expand All @@ -85,5 +83,4 @@ public void failed(Throwable exc, AsyncTaskInstance task) {
throw ExceptionHandling.dieInternal(tc, e);
}
}

}
61 changes: 37 additions & 24 deletions src/vm/jvm/runtime/org/perl6/nqp/io/AsyncSocketHandle.java
Expand Up @@ -10,6 +10,7 @@
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;

import org.perl6.nqp.runtime.ExceptionHandling;
import org.perl6.nqp.runtime.HLLConfig;
Expand Down Expand Up @@ -47,26 +48,33 @@ public void connect(final ThreadContext tc, String host, int port,
final CompletionHandler<Void, AsyncTaskInstance> handler
= new CompletionHandler<Void, AsyncTaskInstance>() {

HLLConfig hllConfig = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig;
final SixModelObject IOType = hllConfig.ioType;
final SixModelObject Array = hllConfig.listType;
final SixModelObject Str = hllConfig.strBoxType;

@Override
public void completed(Void v, AsyncTaskInstance task) {
ThreadContext curTC = tc.gc.getCurrentThreadContext();

SixModelObject IOType = curTC.curFrame.codeRef.staticInfo.compUnit.hllConfig.ioType;
IOHandleInstance ioHandle = (IOHandleInstance) IOType.st.REPR.allocate(curTC,
IOType.st);
ioHandle.handle = task.handle;

SixModelObject Array = curTC.curFrame.codeRef.staticInfo.compUnit.hllConfig.listType;
SixModelObject result = Array.st.REPR.allocate(curTC, Array.st);
result.push_boxed(curTC, task.schedulee);
result.push_boxed(curTC, ioHandle);

((ConcBlockingQueueInstance) task.queue).push_boxed(curTC, result);
callback(curTC, task, ioHandle, Str);
}

@Override
public void failed(Throwable exc, AsyncTaskInstance task) {
public void failed(Throwable t, AsyncTaskInstance task) {
ThreadContext curTC = tc.gc.getCurrentThreadContext();
callback(curTC, task, IOType, Ops.box_s(t.toString(), Str, curTC));
}

protected void callback(ThreadContext tc, AsyncTaskInstance task, SixModelObject ioHandle, SixModelObject err) {
SixModelObject result = Array.st.REPR.allocate(tc, Array.st);
result.push_boxed(tc, task.schedulee);
result.push_boxed(tc, ioHandle);
result.push_boxed(tc, err);
((ConcBlockingQueueInstance) task.queue).push_boxed(tc, result);
}
};

Expand All @@ -82,28 +90,33 @@ public void writeStr(final ThreadContext tc, final AsyncTaskInstance task, Strin
try {
ByteBuffer buffer = enc.encode(CharBuffer.wrap(toWrite));

HLLConfig hllConfig = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig;
final SixModelObject Array = hllConfig.listType;
final SixModelObject Int = hllConfig.intBoxType;
final SixModelObject Null = hllConfig.nullValue;
final SixModelObject Str = hllConfig.strBoxType;

CompletionHandler<Integer, AsyncTaskInstance> handler
= new CompletionHandler<Integer, AsyncTaskInstance>() {

@Override
public void completed(Integer bytesWritten, AsyncTaskInstance attachment) {
SixModelObject Array = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.listType;
SixModelObject Int = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.intBoxType;
SixModelObject Null = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.nullValue;

public void completed(Integer bytesWritten, AsyncTaskInstance task) {
ThreadContext curTC = tc.gc.getCurrentThreadContext();

SixModelObject result = Array.st.REPR.allocate(curTC, Array.st);
result.push_boxed(curTC, task.schedulee);
result.push_boxed(curTC, Ops.box_i(bytesWritten, Int, curTC));
result.push_boxed(curTC, Null);

((ConcBlockingQueueInstance) task.queue).push_boxed(curTC, result);
callback(curTC, task, Ops.box_i(bytesWritten, Int, curTC), Null);
}

@Override
public void failed(Throwable exc, AsyncTaskInstance attachment) {
// TODO Auto-generated method stub
public void failed(Throwable t, AsyncTaskInstance attachment) {
ThreadContext curTC = tc.gc.getCurrentThreadContext();
callback(curTC, task, Str, Ops.box_s(t.toString(), Str, curTC));
}

protected void callback(ThreadContext tc, AsyncTaskInstance task, SixModelObject bytesWritten, SixModelObject err) {
SixModelObject result = Array.st.REPR.allocate(tc, Array.st);
result.push_boxed(tc, task.schedulee);
result.push_boxed(tc, bytesWritten);
result.push_boxed(tc, err);
((ConcBlockingQueueInstance) task.queue).push_boxed(tc, result);
}
};

Expand Down Expand Up @@ -162,7 +175,7 @@ public void completed(Integer numRead, AsyncTaskInstance task) {
@Override
public void failed(Throwable t, AsyncTaskInstance task) {
ThreadContext curTC = tc.gc.getCurrentThreadContext();
callback(curTC, task, -1, Str, Ops.box_s(t.getMessage(), Str, tc));
callback(curTC, task, -1, Str, Ops.box_s(t.toString(), Str, tc));
}

protected void callback(ThreadContext tc, AsyncTaskInstance task, long seq, SixModelObject str, SixModelObject err) {
Expand Down

0 comments on commit 77672d0

Please sign in to comment.