Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
WIP async read, write ops on JVM.
  • Loading branch information
donaldh committed Apr 25, 2014
1 parent 4872cf7 commit 704c5c6
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 4 deletions.
87 changes: 87 additions & 0 deletions src/vm/jvm/runtime/org/perl6/nqp/io/AsyncSocketHandle.java
Expand Up @@ -2,13 +2,16 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;

import org.perl6.nqp.runtime.ExceptionHandling;
import org.perl6.nqp.runtime.Ops;
import org.perl6.nqp.runtime.ThreadContext;
import org.perl6.nqp.sixmodel.SixModelObject;
import org.perl6.nqp.sixmodel.reprs.AsyncTaskInstance;
Expand Down Expand Up @@ -73,6 +76,90 @@ public void failed(Throwable exc, AsyncTaskInstance task) {
}
}

public void writeStr(final ThreadContext tc, final AsyncTaskInstance task, String toWrite) {
try {
ByteBuffer buffer = enc.encode(CharBuffer.wrap(toWrite));

CompletionHandler<Integer, AsyncTaskInstance> handler
= new CompletionHandler<Integer, AsyncTaskInstance>() {

@Override
public void completed(Integer bytesWritten, AsyncTaskInstance attachment) {
SixModelObject Array = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.listType;
SixModelObject Int = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.intBoxType;
SixModelObject Null = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.nullValue;

ThreadContext curTC = tc.gc.getCurrentThreadContext();

SixModelObject result = Array.st.REPR.allocate(curTC, Array.st);
result.push_boxed(curTC, task.schedulee);
result.push_boxed(curTC, Ops.box_i(bytesWritten, Int, curTC));
result.push_boxed(curTC, Null);

((ConcBlockingQueueInstance) task.queue).push_boxed(curTC, result);
}

@Override
public void failed(Throwable exc, AsyncTaskInstance attachment) {
// TODO Auto-generated method stub
}
};

channel.write(buffer, task, handler);
} catch (Throwable e) {
throw ExceptionHandling.dieInternal(tc, e);
}
}

public void writeBytes(final ThreadContext tc, final AsyncTaskInstance task, SixModelObject toWrite) {

}

public void readChars(final ThreadContext tc, final AsyncTaskInstance task) {
final ByteBuffer readBuffer = ByteBuffer.allocate(32768);
final CharBuffer decodedBuffer = CharBuffer.allocate(32768);

CompletionHandler<Integer, AsyncTaskInstance> handler
= new CompletionHandler<Integer, AsyncTaskInstance>() {

@Override
public void completed(Integer numRead, AsyncTaskInstance task) {
SixModelObject Array = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.listType;
SixModelObject Int = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.intBoxType;
SixModelObject Str = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.strBoxType;
SixModelObject Null = tc.curFrame.codeRef.staticInfo.compUnit.hllConfig.nullValue;

ThreadContext curTC = tc.gc.getCurrentThreadContext();

SixModelObject result = Array.st.REPR.allocate(curTC, Array.st);
result.push_boxed(curTC, task.schedulee);

if (numRead == -1) {
task.seq = -1;
result.push_boxed(curTC, Ops.box_i(-1, Int, curTC));
result.push_boxed(curTC, Str);
} else {
result.push_boxed(curTC, Ops.box_i(task.seq++, Int, curTC));
dec.decode(readBuffer, decodedBuffer, numRead == 0 ? true : false);
result.push_boxed(curTC, Ops.box_s(decodedBuffer.toString(), Str, curTC));
}
result.push_boxed(curTC, Null);

((ConcBlockingQueueInstance) task.queue).push_boxed(curTC, result);
}

@Override
public void failed(Throwable exc, AsyncTaskInstance task) {
// TODO Auto-generated method stub
}
};
channel.read(readBuffer, task, handler);
}

public void readBytes(final ThreadContext tc, final AsyncTaskInstance task, SixModelObject bufType) {

}

@Override
public void close(ThreadContext tc) {
try {
Expand Down
49 changes: 45 additions & 4 deletions src/vm/jvm/runtime/org/perl6/nqp/runtime/IOOps.java
Expand Up @@ -4,6 +4,7 @@
import org.perl6.nqp.io.AsyncSocketHandle;
import org.perl6.nqp.sixmodel.SixModelObject;
import org.perl6.nqp.sixmodel.reprs.AsyncTaskInstance;
import org.perl6.nqp.sixmodel.reprs.IOHandleInstance;

public final class IOOps {

Expand Down Expand Up @@ -49,23 +50,63 @@ public static SixModelObject asynclisten(SixModelObject queue, SixModelObject sc

public static SixModelObject asyncwritestr(SixModelObject handle, SixModelObject queue,
SixModelObject schedulee, String toWrite, SixModelObject asyncType, ThreadContext tc) {
throw new UnsupportedOperationException("asyncwritestr is not yet implemented.");
AsyncTaskInstance task = (AsyncTaskInstance) asyncType.st.REPR.allocate(tc, asyncType.st);
task.queue = queue;
task.schedulee = schedulee;
task.handle = ((IOHandleInstance)handle).handle;

if (task.handle instanceof AsyncSocketHandle) {
((AsyncSocketHandle)task.handle).writeStr(tc, task, toWrite);
} else {
throw ExceptionHandling.dieInternal(tc, "This handle does not support asyncwritestr");
}
return task;
}

public static SixModelObject asyncwritebytes(SixModelObject handle, SixModelObject queue,
SixModelObject schedulee, SixModelObject toWrite, SixModelObject asyncType,
ThreadContext tc) {
throw new UnsupportedOperationException("asyncwritebytes is not yet implemented.");
AsyncTaskInstance task = (AsyncTaskInstance) asyncType.st.REPR.allocate(tc, asyncType.st);
task.queue = queue;
task.schedulee = schedulee;
task.handle = ((IOHandleInstance)handle).handle;

if (task.handle instanceof AsyncSocketHandle) {
((AsyncSocketHandle)task.handle).writeBytes(tc, task, toWrite);
} else {
throw ExceptionHandling.dieInternal(tc, "This handle does not support asyncwritebytes");
}
return task;
}

public static SixModelObject asyncreadchars(SixModelObject handle, SixModelObject queue,
SixModelObject schedulee, SixModelObject asyncType, ThreadContext tc) {
throw new UnsupportedOperationException("asyncreadchars is not yet implemented.");
AsyncTaskInstance task = (AsyncTaskInstance) asyncType.st.REPR.allocate(tc, asyncType.st);
task.queue = queue;
task.schedulee = schedulee;
task.handle = ((IOHandleInstance)handle).handle;

if (task.handle instanceof AsyncSocketHandle) {
((AsyncSocketHandle)task.handle).readChars(tc, task);
} else {
throw ExceptionHandling.dieInternal(tc, "This handle does not support asyncreadchars");
}
return task;
}

public static SixModelObject asyncreadbytes(SixModelObject handle, SixModelObject queue,
SixModelObject schedulee, SixModelObject bufType, SixModelObject asyncType,
ThreadContext tc) {
throw new UnsupportedOperationException("asyncreadbytes is not yet implemented.");
AsyncTaskInstance task = (AsyncTaskInstance) asyncType.st.REPR.allocate(tc, asyncType.st);
task.queue = queue;
task.schedulee = schedulee;
task.handle = ((IOHandleInstance)handle).handle;

if (task.handle instanceof AsyncSocketHandle) {
((AsyncSocketHandle)task.handle).readBytes(tc, task, bufType);
} else {
throw ExceptionHandling.dieInternal(tc, "This handle does not support asyncreadbytes");
}
return task;
}
}
Expand Up @@ -10,4 +10,7 @@ public class AsyncTaskInstance extends SixModelObject {
/* Object that can perform I/O operations; will be checked for its
* capabilities by interface by ops and then invoked. */
public Object handle;

/* Sequence number for incremental ops */
public long seq;
}

0 comments on commit 704c5c6

Please sign in to comment.