Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitriy Lyubimov committed Nov 3, 2012
1 parent f6caf59 commit 5ac29f0
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 73 deletions.
15 changes: 2 additions & 13 deletions crunchR/src/main/Rpkg/inst/proto/crunchR.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,9 @@ package crunchR;
option java_package= "org.crunchr.proto.codegen";
option optimize_for = SPEED;

message Chunk {
message Strings {

optional string chunkType = 1;

repeated bytes Rserialized = 10;
repeated string character = 11;
repeated double vector = 12;

repeated SparseVectorElement sparseVector = 13;

message SparseVectorElement {
repeated fixed32 index = 10;
repeated double item = 11;
}
optional string strings = 1;

}

41 changes: 41 additions & 0 deletions crunchR/src/main/java/org/crunchr/RType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.crunchr;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.io.Writable;

/**
* This is meant to be supported on both java and R end so that stuff serialized
* to/from R raw buffers could be construed the same way on both ends.
*
* @author dmitriy
*
* @param <T>
*/
public interface RType<T> {
/**
* serialize given instance into the byte buffer.
*
* @param buffer
* the receiving storage
* @param src
* the object to be serialized
*/
void set(ByteBuffer buffer, T src) throws IOException;

/**
* deserializes the instance of <code>T</code>
*
* @param buffer
* @param value
* holder. Optional. If supplied and supported (like in case of
* {@link Writable}) then it is filled with the value and
* returned as deserialized result.
*
* @return the deserialized instance of <code>T</code>.
*
*/
T get(ByteBuffer buffer, T holder) throws IOException;

}
122 changes: 122 additions & 0 deletions crunchR/src/main/java/org/crunchr/io/CollectionRPipe.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package org.crunchr.io;

import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import org.apache.crunch.Emitter;
import org.crunchr.RType;
import org.rosuda.JRI.Rengine;

/**
* Data xchg pipe between Java and R side. We try to bulk up the transmission
* using 2 buffers so that jni expenses become ultimately dilluted in the cost
* of computations themselves at expense of some memory for the buffering.
* <P>
*
* The assumption is that the pipe is double-buffered and double-threaded (while
* R thread consumes one buffer, another buffer is being prepp'd so potentially
* we saturate CPU and memory bus better if there are currently available
* multicore pockets of resource).
* <P>
*
* @author dmitriy
*
*/
public class CollectionRPipe<S, T> {

private ByteBuffer buffers[];
private RType<S> srtype;
private RType<T> trtype;
private BlockingQueue<byte[]> outQueue;
private Rengine rengine;
private int outBuffer = 0;
private int outCount = 0;
private int availableOutBuffers;
private Emitter<T> emitter;

public CollectionRPipe(RType<S> srcRtype, RType<T> targetRtype,
Emitter<T> emitter,
int initialCapacity, Rengine rengine) {
super();
this.emitter=emitter;
srtype = srcRtype;
trtype = targetRtype;
buffers = new ByteBuffer[] { ByteBuffer.allocate(initialCapacity + 2),
ByteBuffer.allocate(initialCapacity + 2) };
for (ByteBuffer bb : buffers)
resetBuffer(bb);
availableOutBuffers = buffers.length;

outQueue = new ArrayBlockingQueue<byte[]>(buffers.length);
}

public void add(S value) throws IOException {
if (outBuffer < 0) {

}
int position = buffers[outBuffer].position();
while (true)
try {
srtype.set(buffers[outBuffer], value);
outCount++;
break;
} catch (BufferOverflowException exc) {
if (outCount == 0) {
buffers[outBuffer] = ByteBuffer.allocate(buffers[outBuffer]
.capacity() << 1);
continue;
} else {
buffers[outBuffer].position(position);
flushOutBuffer();
position = buffers[outBuffer].position();
continue;
}
}
if (outCount == 0x7FFF)
flushOutBuffer();
}

public void rcallbackOutBufferAvailable() throws IOException {
synchronized (this) {
availableOutBuffers++;
}
}

public void rcallbackEmitBufferAvailable(byte[] emitBuff) throws IOException {


}

private static void resetBuffer(ByteBuffer buffer) {
buffer.clear();
buffer.position(2);
}

private void flushOutBuffer() throws IOException {

ByteBuffer bb = buffers[outBuffer];
bb.flip();
bb.putShort((short) outCount);
bb.position(0);
synchronized (this) {
availableOutBuffers--;
}
try {
outQueue.put(bb.array());
/*
* wait until at least one buffer is available again
*/
synchronized (this) {
while (availableOutBuffers == 0)
wait();
}
} catch (InterruptedException exc) {
throw new IOException("Interrupted");
}

}

}
60 changes: 0 additions & 60 deletions crunchR/src/main/java/org/crunchr/mr/RDoFn.java

This file was deleted.

0 comments on commit 5ac29f0

Please sign in to comment.