Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRILL-1942-hygiene: #133

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -21,14 +21,12 @@
import java.io.InputStream;
import java.io.OutputStream;

public abstract class AbstractStreamSerializable extends LoopedAbstractDrillSerializable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStreamSerializable.class);
public abstract class AbstractStreamSerializable extends LoopedAbstractDrillSerializable {
//private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStreamSerializable.class);

@Override
public abstract void readFromStream(InputStream input) throws IOException;

@Override
public abstract void writeToStream(OutputStream output) throws IOException;


}
Expand Up @@ -33,23 +33,23 @@
import org.apache.drill.common.util.DataOutputOutputStream;

/**
* Helper class that holds the basic functionality to interchangably use the different Drill serializble interfaces.
* This is package private as users should utilize either AbstractDataSerializable or AbstractStreamSerializable instead
* to avoid infinite loops.
* Helper class that holds the basic functionality to interchangeably use
* the different Drill serializble interfaces. This is package private as
* users should utilize either AbstractDataSerializable or AbstractStreamSerializable
* instead to avoid infinite loops.
*/
abstract class LoopedAbstractDrillSerializable implements DrillSerializable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LoopedAbstractDrillSerializable.class);
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LoopedAbstractDrillSerializable.class);

@Override
public void writeExternal(ObjectOutput out) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
writeToStream(baos);
byte[] ba = baos.toByteArray();
final byte[] ba = baos.toByteArray();
out.writeInt(ba.length);
out.write(ba);
}


@Override
public void read(DataInput input) throws IOException {
readFromStream(DataInputInputStream.constructInputStream(input));
Expand All @@ -72,8 +72,8 @@ public void writeToStream(OutputStream output) throws IOException {

@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
int len = in.readInt();
byte[] bytes = new byte[len];
final int len = in.readInt();
final byte[] bytes = new byte[len];
in.readFully(bytes);
readFromStream(new ByteArrayInputStream(bytes));
}
Expand Down
Expand Up @@ -52,7 +52,7 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {

private VectorContainer va;
private WritableBatch batch;
private BufferAllocator allocator;
private final BufferAllocator allocator;
private int recordCount = -1;
private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE;
private SelectionVector2 sv2;
Expand All @@ -61,7 +61,7 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {

public VectorAccessibleSerializable(BufferAllocator allocator) {
this.allocator = allocator;
this.va = new VectorContainer();
va = new VectorContainer();
}

public VectorAccessibleSerializable(WritableBatch batch, BufferAllocator allocator) {
Expand All @@ -77,16 +77,13 @@ public VectorAccessibleSerializable(WritableBatch batch, BufferAllocator allocat
*/
public VectorAccessibleSerializable(WritableBatch batch, SelectionVector2 sv2, BufferAllocator allocator) {
this.allocator = allocator;
if (batch != null) {
this.batch = batch;
}
this.batch = batch;
if (sv2 != null) {
this.sv2 = sv2;
this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
}
}


/**
* Reads from an InputStream and parses a RecordBatchDef. From this, we construct a SelectionVector2 if it exits
* and construct the vectors and add them to a vector container
Expand All @@ -95,8 +92,8 @@ public VectorAccessibleSerializable(WritableBatch batch, SelectionVector2 sv2, B
*/
@Override
public void readFromStream(InputStream input) throws IOException {
VectorContainer container = new VectorContainer();
UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
final VectorContainer container = new VectorContainer();
final UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
recordCount = batchDef.getRecordCount();
if (batchDef.hasCarriesTwoByteSelectionVector() && batchDef.getCarriesTwoByteSelectionVector()) {

Expand All @@ -107,12 +104,12 @@ public void readFromStream(InputStream input) throws IOException {
sv2.getBuffer().setBytes(0, input, recordCount * SelectionVector2.RECORD_SIZE);
svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
}
List<ValueVector> vectorList = Lists.newArrayList();
List<SerializedField> fieldList = batchDef.getFieldList();
final List<ValueVector> vectorList = Lists.newArrayList();
final List<SerializedField> fieldList = batchDef.getFieldList();
for (SerializedField metaData : fieldList) {
int dataLength = metaData.getBufferLength();
MaterializedField field = MaterializedField.create(metaData);
DrillBuf buf = allocator.buffer(dataLength);
final int dataLength = metaData.getBufferLength();
final MaterializedField field = MaterializedField.create(metaData);
final DrillBuf buf = allocator.buffer(dataLength);
final ValueVector vector;
try {
buf.writeBytes(input, dataLength);
Expand All @@ -129,7 +126,6 @@ public void readFromStream(InputStream input) throws IOException {
va = container;
}


public void writeToStreamAndRetain(OutputStream output) throws IOException {
retain = true;
writeToStream(output);
Expand All @@ -145,8 +141,8 @@ public void writeToStream(OutputStream output) throws IOException {
Preconditions.checkNotNull(output);
final Timer.Context timerContext = metrics.timer(WRITER_TIMER).time();

DrillBuf[] incomingBuffers = batch.getBuffers();
UserBitShared.RecordBatchDef batchDef = batch.getDef();
final DrillBuf[] incomingBuffers = batch.getBuffers();
final UserBitShared.RecordBatchDef batchDef = batch.getDef();

/* DrillBuf associated with the selection vector */
DrillBuf svBuf = null;
Expand Down Expand Up @@ -202,5 +198,4 @@ public VectorContainer get() {
public SelectionVector2 getSv2() {
return sv2;
}

}