Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import org.apache.drill.common.expression.PathSegment.NameSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.AbstractRecordReader;
Expand Down Expand Up @@ -75,7 +75,7 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
private boolean rowKeyOnly;

public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec subScanSpec,
List<SchemaPath> projectedColumns, FragmentContext context) throws OutOfMemoryException {
List<SchemaPath> projectedColumns, FragmentContext context) {
hbaseConf = conf;
hbaseTableName = Preconditions.checkNotNull(subScanSpec, "HBase reader needs a sub-scan spec").getTableName();
hbaseScan = new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow());
Expand Down Expand Up @@ -169,15 +169,16 @@ public int next() {
done:
for (; rowCount < TARGET_RECORD_COUNT; rowCount++) {
Result result = null;
final OperatorStats operatorStats = operatorContext == null ? null : operatorContext.getStats();
try {
if (operatorContext != null) {
operatorContext.getStats().startWait();
if (operatorStats != null) {
operatorStats.startWait();
}
try {
result = resultScanner.next();
} finally {
if (operatorContext != null) {
operatorContext.getStats().stopWait();
if (operatorStats != null) {
operatorStats.stopWait();
}
}
} catch (IOException e) {
Expand All @@ -193,20 +194,20 @@ public int next() {
rowKeyVector.getMutator().setSafe(rowCount, cells[0].getRowArray(), cells[0].getRowOffset(), cells[0].getRowLength());
}
if (!rowKeyOnly) {
for (Cell cell : cells) {
int familyOffset = cell.getFamilyOffset();
int familyLength = cell.getFamilyLength();
byte[] familyArray = cell.getFamilyArray();
MapVector mv = getOrCreateFamilyVector(new String(familyArray, familyOffset, familyLength), true);
for (final Cell cell : cells) {
final int familyOffset = cell.getFamilyOffset();
final int familyLength = cell.getFamilyLength();
final byte[] familyArray = cell.getFamilyArray();
final MapVector mv = getOrCreateFamilyVector(new String(familyArray, familyOffset, familyLength), true);

int qualifierOffset = cell.getQualifierOffset();
int qualifierLength = cell.getQualifierLength();
byte[] qualifierArray = cell.getQualifierArray();
NullableVarBinaryVector v = getOrCreateColumnVector(mv, new String(qualifierArray, qualifierOffset, qualifierLength));
final int qualifierOffset = cell.getQualifierOffset();
final int qualifierLength = cell.getQualifierLength();
final byte[] qualifierArray = cell.getQualifierArray();
final NullableVarBinaryVector v = getOrCreateColumnVector(mv, new String(qualifierArray, qualifierOffset, qualifierLength));

int valueOffset = cell.getValueOffset();
int valueLength = cell.getValueLength();
byte[] valueArray = cell.getValueArray();
final int valueOffset = cell.getValueOffset();
final int valueLength = cell.getValueLength();
final byte[] valueArray = cell.getValueArray();
v.getMutator().setSafe(rowCount, valueArray, valueOffset, valueLength);
}
}
Expand Down Expand Up @@ -246,7 +247,7 @@ private NullableVarBinaryVector getOrCreateColumnVector(MapVector mv, String qua
}

@Override
public void cleanup() {
public void close() {
try {
if (resultScanner != null) {
resultScanner.close();
Expand All @@ -267,5 +268,4 @@ private void setOutputRowCount(int count) {
rowKeyVector.getMutator().setValueCount(count);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private void init() throws ExecutionSetupException {
String inputFormatName = (partition == null) ? table.getSd().getInputFormat() : partition.getSd().getInputFormat();
try {
format = (InputFormat) Class.forName(inputFormatName).getConstructor().newInstance();
Class c = Class.forName(sLib);
Class<?> c = Class.forName(sLib);
serde = (SerDe) c.getConstructor().newInstance();
serde.initialize(job, properties);
} catch (ReflectiveOperationException | SerDeException e) {
Expand Down Expand Up @@ -286,7 +286,6 @@ public int next() {
}

private boolean readHiveRecordAndInsertIntoRecordBatch(Object deSerializedValue, int outputRecordIndex) {
boolean success;
for (int i = 0; i < selectedColumnNames.size(); i++) {
String columnName = selectedColumnNames.get(i);
Object hiveValue = sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName));
Expand All @@ -311,7 +310,7 @@ private void setValueCountAndPopulatePartitionVectors(int recordCount) {
}

@Override
public void cleanup() {
public void close() {
try {
if (reader != null) {
reader.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public int next() {
}

@Override
public void cleanup() {
public void close() {
AutoCloseables.close(resultSet, logger);
AutoCloseables.close(statement, logger);
AutoCloseables.close(connection, logger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import com.mongodb.client.MongoDatabase;

public class MongoRecordReader extends AbstractRecordReader {
static final Logger logger = LoggerFactory.getLogger(MongoRecordReader.class);
private static final Logger logger = LoggerFactory.getLogger(MongoRecordReader.class);

private MongoCollection<Document> collection;
private MongoCursor<Document> cursor;
Expand Down Expand Up @@ -187,7 +187,7 @@ public int next() {
}

@Override
public void cleanup() {
public void close() {
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,47 +92,49 @@ public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Operat
if (!readers.hasNext()) {
throw new ExecutionSetupException("A scan batch must contain at least one reader.");
}
this.currentReader = readers.next();
currentReader = readers.next();
this.oContext = oContext;

boolean setup = false;
try {
oContext.getStats().startProcessing();
this.currentReader.setup(oContext, mutator);
currentReader.setup(oContext, mutator);
setup = true;
} finally {
// if we had an exception during setup, make sure to release existing data.
if (!setup) {
currentReader.cleanup();
try {
currentReader.close();
} catch(final Exception e) {
throw new ExecutionSetupException(e);
}
}
oContext.getStats().stopProcessing();
}
this.partitionColumns = partitionColumns.iterator();
this.partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null;
partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null;
this.selectedPartitionColumns = selectedPartitionColumns;

// TODO Remove null check after DRILL-2097 is resolved. That JIRA refers to test cases that do not initialize
// options; so labelValue = null.
final OptionValue labelValue = context.getOptions().getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
this.partitionColumnDesignator = labelValue == null ? "dir" : labelValue.string_val;
partitionColumnDesignator = labelValue == null ? "dir" : labelValue.string_val;

addPartitionVectors();
}

public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers) throws ExecutionSetupException {
public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers)
throws ExecutionSetupException {
this(subScanConfig, context,
context.newOperatorContext(subScanConfig, false /* ScanBatch is not subject to fragment memory limit */),
readers, Collections.<String[]> emptyList(), Collections.<Integer> emptyList());
}

@Override
public FragmentContext getContext() {
return context;
}

public OperatorContext getOperatorContext() {
return oContext;
}

@Override
public BatchSchema getSchema() {
return schema;
Expand All @@ -156,6 +158,12 @@ private void releaseAssets() {
container.zeroVectors();
}

private void clearFieldVectorMap() {
for (final ValueVector v : fieldVectorMap.values()) {
v.clear();
}
}

@Override
public IterOutcome next() {
if (done) {
Expand All @@ -169,15 +177,13 @@ public IterOutcome next() {
currentReader.allocate(fieldVectorMap);
} catch (OutOfMemoryException | OutOfMemoryRuntimeException e) {
logger.debug("Caught Out of Memory Exception", e);
for (ValueVector v : fieldVectorMap.values()) {
v.clear();
}
clearFieldVectorMap();
return IterOutcome.OUT_OF_MEMORY;
}
while ((recordCount = currentReader.next()) == 0) {
try {
if (!readers.hasNext()) {
currentReader.cleanup();
currentReader.close();
releaseAssets();
done = true;
if (mutator.isNewSchema()) {
Expand All @@ -196,17 +202,15 @@ public IterOutcome next() {
fieldVectorMap.clear();
}

currentReader.cleanup();
currentReader.close();
currentReader = readers.next();
partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
currentReader.setup(oContext, mutator);
try {
currentReader.allocate(fieldVectorMap);
} catch (OutOfMemoryException e) {
logger.debug("Caught OutOfMemoryException");
for (ValueVector v : fieldVectorMap.values()) {
v.clear();
}
clearFieldVectorMap();
return IterOutcome.OUT_OF_MEMORY;
}
addPartitionVectors();
Expand Down Expand Up @@ -249,7 +253,7 @@ public IterOutcome next() {
}
}

private void addPartitionVectors() throws ExecutionSetupException{
private void addPartitionVectors() throws ExecutionSetupException {
try {
if (partitionVectors != null) {
for (ValueVector v : partitionVectors) {
Expand All @@ -258,8 +262,10 @@ private void addPartitionVectors() throws ExecutionSetupException{
}
partitionVectors = Lists.newArrayList();
for (int i : selectedPartitionColumns) {
MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), Types.optional(MinorType.VARCHAR));
ValueVector v = mutator.addField(field, NullableVarCharVector.class);
final MaterializedField field =
MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i),
Types.optional(MinorType.VARCHAR));
final ValueVector v = mutator.addField(field, NullableVarCharVector.class);
partitionVectors.add(v);
}
} catch(SchemaChangeException e) {
Expand All @@ -269,12 +275,12 @@ private void addPartitionVectors() throws ExecutionSetupException{

private void populatePartitionVectors() {
for (int index = 0; index < selectedPartitionColumns.size(); index++) {
int i = selectedPartitionColumns.get(index);
NullableVarCharVector v = (NullableVarCharVector) partitionVectors.get(index);
final int i = selectedPartitionColumns.get(index);
final NullableVarCharVector v = (NullableVarCharVector) partitionVectors.get(index);
if (partitionValues.length > i) {
String val = partitionValues[i];
final String val = partitionValues[i];
AllocationHelper.allocate(v, recordCount, val.length());
byte[] bytes = val.getBytes();
final byte[] bytes = val.getBytes();
for (int j = 0; j < recordCount; j++) {
v.getMutator().setSafe(j, bytes, 0, bytes.length);
}
Expand Down Expand Up @@ -306,27 +312,24 @@ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
return container.getValueAccessorById(clazz, ids);
}



private class Mutator implements OutputMutator {
private boolean schemaChange = true;

boolean schemaChange = true;

@SuppressWarnings("unchecked")
@Override
public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
// Check if the field exists
ValueVector v = fieldVectorMap.get(field.key());

if (v == null || v.getClass() != clazz) {
// Field does not exist add it to the map and the output container
v = TypeHelper.getNewVector(field, oContext.getAllocator(), callBack);
if (!clazz.isAssignableFrom(v.getClass())) {
throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
throw new SchemaChangeException(String.format(
"The class that was provided %s does not correspond to the expected vector type of %s.",
clazz.getSimpleName(), v.getClass().getSimpleName()));
}

ValueVector old = fieldVectorMap.put(field.key(), v);
if(old != null){
final ValueVector old = fieldVectorMap.put(field.key(), v);
if (old != null) {
old.clear();
container.remove(old);
}
Expand All @@ -336,12 +339,12 @@ public <T extends ValueVector> T addField(MaterializedField field, Class<T> claz
schemaChange = true;
}

return (T) v;
return clazz.cast(v);
}

@Override
public void allocate(int recordCount) {
for (ValueVector v : fieldVectorMap.values()) {
for (final ValueVector v : fieldVectorMap.values()) {
AllocationHelper.allocate(v, recordCount, 50, 10);
}
}
Expand Down Expand Up @@ -378,18 +381,17 @@ public WritableBatch getWritableBatch() {
}

@Override
public void close() {
public void close() throws Exception {
container.clear();
for (ValueVector v : partitionVectors) {
for (final ValueVector v : partitionVectors) {
v.clear();
}
fieldVectorMap.clear();
currentReader.cleanup();
currentReader.close();
}

@Override
public VectorContainer getOutgoingContainer() {
throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
}

}
Loading