Skip to content

Commit

Permalink
DRILL-3938: Support reading from Hive tables that have schema altered…
Browse files Browse the repository at this point in the history
… after the creation

Also:
+ Remove "redoRecord" logic which is not needed after "automatic reallocation" (DRILL-1960) changes.
+ Remove HiveTestRecordReader. This is incomplete in implementation and not used anywhere. It is currently just
  a burden to maintain with changes in its superclass HiveRecordReader
  • Loading branch information
vkorukanti committed Dec 1, 2015
1 parent 53e7a69 commit bca72ab
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 271 deletions.
Expand Up @@ -101,12 +101,25 @@ public boolean matches(RelOptRuleCall call) {
return true;
}

final List<FieldSchema> tableSchema = hiveTable.getSd().getCols();
// Make sure all partitions have the same input format as the table input format
for (HivePartition partition : partitions) {
Class<? extends InputFormat> inputFormat = getInputFormatFromSD(hiveTable, partition.getPartition().getSd());
final StorageDescriptor partitionSD = partition.getPartition().getSd();
Class<? extends InputFormat> inputFormat = getInputFormatFromSD(hiveTable, partitionSD);
if (inputFormat == null || !inputFormat.equals(tableInputFormat)) {
return false;
}

// Make sure the schema of the table and schema of the partition matches. If not return false. Schema changes
// between table and partition can happen when table schema is altered using ALTER statements after some
// partitions are already created. Currently native reader conversion doesn't handle schema changes between
// partition and table. Hive has extensive list of convert methods to convert from one type to rest of the
// possible types. Drill doesn't have the similar set of methods yet.
if (!partitionSD.getCols().equals(tableSchema)) {
logger.debug("Partitions schema is different from table schema. Currently native reader conversion can't " +
"handle schema difference between partitions and table");
return false;
}
}

return true;
Expand Down
Expand Up @@ -48,6 +48,8 @@
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
Expand Down Expand Up @@ -76,14 +78,25 @@ public class HiveRecordReader extends AbstractRecordReader {
protected List<String> selectedPartitionNames = Lists.newArrayList();
protected List<TypeInfo> selectedPartitionTypes = Lists.newArrayList();
protected List<Object> selectedPartitionValues = Lists.newArrayList();
protected List<String> tableColumns; // all columns in table (not including partition columns)
protected SerDe serde;
protected StructObjectInspector sInspector;

// SerDe of the reading partition (or table if the table is non-partitioned)
protected SerDe partitionSerDe;

// ObjectInspector to read data from partitionSerDe (for a non-partitioned table this is same as the table
// ObjectInspector).
protected StructObjectInspector partitionOI;

// Final ObjectInspector. We may not use the partitionOI directly if there are schema changes between the table and
// partition. If there are no schema changes then this is same as the partitionOI.
protected StructObjectInspector finalOI;

// Converter which converts data from partition schema to table schema.
private Converter partTblObjectInspectorConverter;

protected Object key, value;
protected org.apache.hadoop.mapred.RecordReader reader;
protected List<ValueVector> vectors = Lists.newArrayList();
protected List<ValueVector> pVectors = Lists.newArrayList();
protected Object redoRecord;
protected boolean empty;
private Map<String, String> hiveConfigOverride;
private FragmentContext fragmentContext;
Expand All @@ -107,97 +120,80 @@ public HiveRecordReader(Table table, Partition partition, InputSplit inputSplit,
}

private void init() throws ExecutionSetupException {
Properties properties;
JobConf job = new JobConf();
if (partition != null) {
properties = MetaStoreUtils.getPartitionMetadata(partition, table);

// SerDe expects properties from Table, but above call doesn't add Table properties.
// Include Table properties in final list in order to not to break SerDes that depend on
// Table properties. For example AvroSerDe gets the schema from properties (passed as second argument)
for (Map.Entry<String, String> entry : table.getParameters().entrySet()) {
if (entry.getKey() != null && entry.getKey() != null) {
properties.put(entry.getKey(), entry.getValue());
}
}
} else {
properties = MetaStoreUtils.getTableMetadata(table);
}
for (Object obj : properties.keySet()) {
job.set((String) obj, (String) properties.get(obj));
}
for(Map.Entry<String, String> entry : hiveConfigOverride.entrySet()) {
job.set(entry.getKey(), entry.getValue());
}
final JobConf job = new JobConf();

// Get the configured default val
defaultPartitionValue = HiveUtilities.getDefaultPartitionValue(hiveConfigOverride);

InputFormat format;
String sLib = (partition == null) ? table.getSd().getSerdeInfo().getSerializationLib() : partition.getSd().getSerdeInfo().getSerializationLib();
String inputFormatName = (partition == null) ? table.getSd().getInputFormat() : partition.getSd().getInputFormat();
try {
format = (InputFormat) Class.forName(inputFormatName).getConstructor().newInstance();
Class<?> c = Class.forName(sLib);
serde = (SerDe) c.getConstructor().newInstance();
serde.initialize(job, properties);
} catch (ReflectiveOperationException | SerDeException e) {
throw new ExecutionSetupException("Unable to instantiate InputFormat", e);
}
job.setInputFormat(format.getClass());
Properties properties = MetaStoreUtils.getTableMetadata(table);
final SerDe tableSerDe = createSerDe(job, table.getSd().getSerdeInfo().getSerializationLib(), properties);
final StructObjectInspector tableOI = getStructOI(tableSerDe);

List<FieldSchema> partitionKeys = table.getPartitionKeys();
List<String> partitionNames = Lists.newArrayList();
for (FieldSchema field : partitionKeys) {
partitionNames.add(field.getName());
}
if (partition != null) {
properties = HiveUtilities.getPartitionMetadata(partition, table);

try {
ObjectInspector oi = serde.getObjectInspector();
if (oi.getCategory() != ObjectInspector.Category.STRUCT) {
throw new UnsupportedOperationException(String.format("%s category not supported", oi.getCategory()));
partitionSerDe = createSerDe(job, partition.getSd().getSerdeInfo().getSerializationLib(), properties);
partitionOI = getStructOI(partitionSerDe);

finalOI = (StructObjectInspector)ObjectInspectorConverters.getConvertedOI(partitionOI, tableOI);
partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(partitionOI, finalOI);
HiveUtilities.setInputFormatClass(job, partition.getSd());
} else {
// For non-partitioned tables, there is no need to create converter as there are no schema changes expected.
partitionSerDe = tableSerDe;
partitionOI = tableOI;
partTblObjectInspectorConverter = null;
finalOI = tableOI;
HiveUtilities.setInputFormatClass(job, table.getSd());
}

HiveUtilities.addConfToJob(job, properties, hiveConfigOverride);

// Get list of partition column names
final List<String> partitionNames = Lists.newArrayList();
for (FieldSchema field : table.getPartitionKeys()) {
partitionNames.add(field.getName());
}
sInspector = (StructObjectInspector) oi;
StructTypeInfo sTypeInfo = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(sInspector);
List<Integer> columnIds = Lists.newArrayList();

// We should always get the columns names from ObjectInspector. For some of the tables (ex. avro) metastore
// may not contain the schema, instead it is derived from other sources such as table properties or external file.
// SerDe object knows how to get the schema with all the config and table properties passed in initialization.
// ObjectInspector created from the SerDe object has the schema.
final StructTypeInfo sTypeInfo = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(finalOI);
final List<String> tableColumnNames = sTypeInfo.getAllStructFieldNames();

// Select list of columns for project pushdown into Hive SerDe readers.
final List<Integer> columnIds = Lists.newArrayList();
if (isStarQuery()) {
selectedColumnNames = sTypeInfo.getAllStructFieldNames();
tableColumns = selectedColumnNames;
selectedColumnNames = tableColumnNames;
for(int i=0; i<selectedColumnNames.size(); i++) {
columnIds.add(i);
}
selectedPartitionNames = partitionNames;
} else {
tableColumns = sTypeInfo.getAllStructFieldNames();
selectedColumnNames = Lists.newArrayList();
for (SchemaPath field : getColumns()) {
String columnName = field.getRootSegment().getPath();
if (!tableColumns.contains(columnName)) {
if (partitionNames.contains(columnName)) {
selectedPartitionNames.add(columnName);
} else {
throw new ExecutionSetupException(String.format("Column %s does not exist", columnName));
}
if (partitionNames.contains(columnName)) {
selectedPartitionNames.add(columnName);
} else {
columnIds.add(tableColumns.indexOf(columnName));
columnIds.add(tableColumnNames.indexOf(columnName));
selectedColumnNames.add(columnName);
}
}
}
ColumnProjectionUtils.appendReadColumns(job, columnIds, selectedColumnNames);

for (String columnName : selectedColumnNames) {
ObjectInspector fieldOI = sInspector.getStructFieldRef(columnName).getFieldObjectInspector();
ObjectInspector fieldOI = finalOI.getStructFieldRef(columnName).getFieldObjectInspector();
TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(fieldOI.getTypeName());

selectedColumnObjInspectors.add(fieldOI);
selectedColumnTypes.add(typeInfo);
selectedColumnFieldConverters.add(HiveFieldConverter.create(typeInfo, fragmentContext));
}

if (isStarQuery()) {
selectedPartitionNames = partitionNames;
}

for (int i = 0; i < table.getPartitionKeys().size(); i++) {
FieldSchema field = table.getPartitionKeys().get(i);
if (selectedPartitionNames.contains(field.getName())) {
Expand All @@ -216,15 +212,34 @@ private void init() throws ExecutionSetupException {

if (!empty) {
try {
reader = format.getRecordReader(inputSplit, job, Reporter.NULL);
} catch (IOException e) {
reader = job.getInputFormat().getRecordReader(inputSplit, job, Reporter.NULL);
} catch (Exception e) {
throw new ExecutionSetupException("Failed to get o.a.hadoop.mapred.RecordReader from Hive InputFormat", e);
}
key = reader.createKey();
value = reader.createValue();
}
}

/**
* Utility method which creates a SerDe object for given SerDe class name and properties.
*/
private static SerDe createSerDe(final JobConf job, final String sLib, final Properties properties) throws Exception {
final Class<?> c = Class.forName(sLib);
final SerDe serde = (SerDe) c.getConstructor().newInstance();
serde.initialize(job, properties);

return serde;
}

private static StructObjectInspector getStructOI(final SerDe serDe) throws Exception {
ObjectInspector oi = serDe.getObjectInspector();
if (oi.getCategory() != ObjectInspector.Category.STRUCT) {
throw new UnsupportedOperationException(String.format("%s category not supported", oi.getCategory()));
}
return (StructObjectInspector) oi;
}

@Override
public void setup(@SuppressWarnings("unused") OperatorContext context, OutputMutator output)
throws ExecutionSetupException {
Expand Down Expand Up @@ -280,26 +295,12 @@ public int next() {

try {
int recordCount = 0;

if (redoRecord != null) {
// Try writing the record that didn't fit into the last RecordBatch
Object deSerializedValue = serde.deserialize((Writable) redoRecord);
boolean status = readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, recordCount);
if (!status) {
throw new DrillRuntimeException("Current record is too big to fit into allocated ValueVector buffer");
}
redoRecord = null;
recordCount++;
}

while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value)) {
Object deSerializedValue = serde.deserialize((Writable) value);
boolean status = readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, recordCount);
if (!status) {
redoRecord = value;
setValueCountAndPopulatePartitionVectors(recordCount);
return recordCount;
Object deSerializedValue = partitionSerDe.deserialize((Writable) value);
if (partTblObjectInspectorConverter != null) {
deSerializedValue = partTblObjectInspectorConverter.convert(deSerializedValue);
}
readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, recordCount);
recordCount++;
}

Expand All @@ -310,18 +311,16 @@ public int next() {
}
}

private boolean readHiveRecordAndInsertIntoRecordBatch(Object deSerializedValue, int outputRecordIndex) {
private void readHiveRecordAndInsertIntoRecordBatch(Object deSerializedValue, int outputRecordIndex) {
for (int i = 0; i < selectedColumnNames.size(); i++) {
String columnName = selectedColumnNames.get(i);
Object hiveValue = sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName));
final String columnName = selectedColumnNames.get(i);
Object hiveValue = finalOI.getStructFieldData(deSerializedValue, finalOI.getStructFieldRef(columnName));

if (hiveValue != null) {
selectedColumnFieldConverters.get(i).setSafeValue(selectedColumnObjInspectors.get(i), hiveValue,
vectors.get(i), outputRecordIndex);
}
}

return true;
}

private void setValueCountAndPopulatePartitionVectors(int recordCount) {
Expand Down

0 comments on commit bca72ab

Please sign in to comment.