Skip to content
Permalink
Browse files
GIRAPH-1181
closes #65
  • Loading branch information
Maja Kabiljo committed Apr 4, 2018
1 parent 98b724f commit 5c418ff33ad704d0596c631dad9e082c4b2076ca
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 166 deletions.
@@ -19,8 +19,8 @@
package org.apache.giraph.comm.aggregators;

import java.io.IOException;
import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.UnsafeByteArrayOutputStream;

/**
* Wrapper for output stream which keeps the place in the beginning for the
@@ -36,7 +36,7 @@ public abstract class CountingOutputStream {
* Default constructor
*/
public CountingOutputStream() {
dataOutput = new ExtendedByteArrayDataOutput();
dataOutput = new UnsafeByteArrayOutputStream();
reset();
}

@@ -18,6 +18,8 @@

package org.apache.giraph.comm.requests;

import org.apache.giraph.utils.UnsafeByteArrayInputStream;

import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
@@ -64,6 +66,14 @@ public DataInput getDataInput() {
return new DataInputStream(new ByteArrayInputStream(data));
}

/**
* Wraps the byte array with UnsafeByteArrayInputStream stream.
* @return UnsafeByteArrayInputStream
*/
public UnsafeByteArrayInputStream getUnsafeByteArrayInput() {
return new UnsafeByteArrayInputStream(data);
}

@Override
void readFieldsRequest(DataInput input) throws IOException {
int dataLength = input.readInt();
@@ -18,13 +18,13 @@

package org.apache.giraph.comm.requests;

import java.io.DataInput;
import java.io.IOException;

import org.apache.giraph.comm.GlobalCommType;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
import org.apache.giraph.reducers.ReduceOperation;
import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
import org.apache.giraph.utils.WritableUtils;
@@ -59,7 +59,7 @@ public void doRequest(ServerData serverData) {
UnsafeByteArrayOutputStream reusedOut = new UnsafeByteArrayOutputStream();
UnsafeReusableByteArrayInput reusedIn = new UnsafeReusableByteArrayInput();

DataInput input = getDataInput();
UnsafeByteArrayInputStream input = getUnsafeByteArrayInput();
AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
try {
int num = input.readInt();
@@ -18,12 +18,12 @@

package org.apache.giraph.comm.requests;

import java.io.DataInput;
import java.io.IOException;

import org.apache.giraph.comm.GlobalCommType;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
@@ -53,7 +53,7 @@ public SendAggregatorsToWorkerRequest() {

@Override
public void doRequest(ServerData serverData) {
DataInput input = getDataInput();
UnsafeByteArrayInputStream input = getUnsafeByteArrayInput();
AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
try {
int num = input.readInt();
@@ -47,7 +47,8 @@ public SendReducedToMasterRequest() {
@Override
public void doRequest(MasterGlobalCommHandler commHandler) {
try {
commHandler.getAggregatorHandler().acceptReducedValues(getDataInput());
commHandler.getAggregatorHandler().
acceptReducedValues(getUnsafeByteArrayInput());
} catch (IOException e) {
throw new IllegalStateException("doRequest: " +
"IOException occurred while processing request", e);
@@ -18,12 +18,12 @@

package org.apache.giraph.comm.requests;

import java.io.DataInput;
import java.io.IOException;

import org.apache.giraph.comm.GlobalCommType;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;

@@ -52,7 +52,7 @@ public SendWorkerAggregatorsRequest() {

@Override
public void doRequest(ServerData serverData) {
DataInput input = getDataInput();
UnsafeByteArrayInputStream input = getUnsafeByteArrayInput();
OwnerAggregatorServerData aggregatorData =
serverData.getOwnerAggregatorData();
try {
@@ -54,17 +54,14 @@ public class UnsafeArrayReads extends UnsafeReads {
private static final long BYTE_ARRAY_OFFSET =
UNSAFE.arrayBaseOffset(byte[].class);

/** Byte buffer */
protected byte[] buf;

/**
* Constructor
*
* @param buf Buffer to read from
*/
public UnsafeArrayReads(byte[] buf) {
super(buf.length);
this.buf = buf;
this.buffer = buf;
}

/**
@@ -76,12 +73,12 @@ public UnsafeArrayReads(byte[] buf) {
*/
public UnsafeArrayReads(byte[] buf, int offset, int length) {
super(offset, length);
this.buf = buf;
this.buffer = buf;
}

@Override
public int available() {
return (int) (bufLength - pos);
return (int) (limit - position);
}

@Override
@@ -92,38 +89,38 @@ public boolean endOfInput() {

@Override
public int getPos() {
return (int) pos;
return (int) position;
}

@Override
public void readFully(byte[] b) throws IOException {
ensureRemaining(b.length);
System.arraycopy(buf, (int) pos, b, 0, b.length);
pos += b.length;
require(b.length);
System.arraycopy(buffer, (int) position, b, 0, b.length);
position += b.length;
}

@Override
public void readFully(byte[] b, int off, int len) throws IOException {
ensureRemaining(len);
System.arraycopy(buf, (int) pos, b, off, len);
pos += len;
require(len);
System.arraycopy(buffer, (int) position, b, off, len);
position += len;
}

@Override
public boolean readBoolean() throws IOException {
ensureRemaining(SIZE_OF_BOOLEAN);
boolean value = UNSAFE.getBoolean(buf,
BYTE_ARRAY_OFFSET + pos);
pos += SIZE_OF_BOOLEAN;
public boolean readBoolean() {
require(SIZE_OF_BOOLEAN);
boolean value = UNSAFE.getBoolean(buffer,
BYTE_ARRAY_OFFSET + position);
position += SIZE_OF_BOOLEAN;
return value;
}

@Override
public byte readByte() throws IOException {
ensureRemaining(SIZE_OF_BYTE);
byte value = UNSAFE.getByte(buf,
BYTE_ARRAY_OFFSET + pos);
pos += SIZE_OF_BYTE;
public byte readByte() {
require(SIZE_OF_BYTE);
byte value = UNSAFE.getByte(buffer,
BYTE_ARRAY_OFFSET + position);
position += SIZE_OF_BYTE;
return value;
}

@@ -133,11 +130,11 @@ public int readUnsignedByte() throws IOException {
}

@Override
public short readShort() throws IOException {
ensureRemaining(SIZE_OF_SHORT);
short value = UNSAFE.getShort(buf,
BYTE_ARRAY_OFFSET + pos);
pos += SIZE_OF_SHORT;
public short readShort() {
require(SIZE_OF_SHORT);
short value = UNSAFE.getShort(buffer,
BYTE_ARRAY_OFFSET + position);
position += SIZE_OF_SHORT;
return value;
}

@@ -147,47 +144,47 @@ public int readUnsignedShort() throws IOException {
}

@Override
public char readChar() throws IOException {
ensureRemaining(SIZE_OF_CHAR);
char value = UNSAFE.getChar(buf,
BYTE_ARRAY_OFFSET + pos);
pos += SIZE_OF_CHAR;
public char readChar() {
require(SIZE_OF_CHAR);
char value = UNSAFE.getChar(buffer,
BYTE_ARRAY_OFFSET + position);
position += SIZE_OF_CHAR;
return value;
}

@Override
public int readInt() throws IOException {
ensureRemaining(SIZE_OF_INT);
int value = UNSAFE.getInt(buf,
BYTE_ARRAY_OFFSET + pos);
pos += SIZE_OF_INT;
public int readInt() {
require(SIZE_OF_INT);
int value = UNSAFE.getInt(buffer,
BYTE_ARRAY_OFFSET + position);
position += SIZE_OF_INT;
return value;
}

@Override
public long readLong() throws IOException {
ensureRemaining(SIZE_OF_LONG);
long value = UNSAFE.getLong(buf,
BYTE_ARRAY_OFFSET + pos);
pos += SIZE_OF_LONG;
public long readLong() {
require(SIZE_OF_LONG);
long value = UNSAFE.getLong(buffer,
BYTE_ARRAY_OFFSET + position);
position += SIZE_OF_LONG;
return value;
}

@Override
public float readFloat() throws IOException {
ensureRemaining(SIZE_OF_FLOAT);
float value = UNSAFE.getFloat(buf,
BYTE_ARRAY_OFFSET + pos);
pos += SIZE_OF_FLOAT;
public float readFloat() {
require(SIZE_OF_FLOAT);
float value = UNSAFE.getFloat(buffer,
BYTE_ARRAY_OFFSET + position);
position += SIZE_OF_FLOAT;
return value;
}

@Override
public double readDouble() throws IOException {
ensureRemaining(SIZE_OF_DOUBLE);
double value = UNSAFE.getDouble(buf,
BYTE_ARRAY_OFFSET + pos);
pos += SIZE_OF_DOUBLE;
public double readDouble() {
require(SIZE_OF_DOUBLE);
double value = UNSAFE.getDouble(buffer,
BYTE_ARRAY_OFFSET + position);
position += SIZE_OF_DOUBLE;
return value;
}

@@ -20,6 +20,13 @@

/**
* UnsafeByteArrayInputStream
*
* This stream now extends com.esotericsoftware.kryo.io.Input so that kryo
* serialization can directly read from this stream without using an
* additional buffer, providing a faster serialization.
* Users of this class has to explicitly close the stream to avoid style check
* errors even though close is no-op when the underlying stream is not set.
*/
public class UnsafeByteArrayInputStream extends UnsafeArrayReads {

0 comments on commit 5c418ff

Please sign in to comment.