Skip to content

Commit

Permalink
Fix ORC Record reader to ignore extra fields (#5645)
Browse files Browse the repository at this point in the history
* Fix ORC Record reader to ignore extra fields

Fixing an issue introduced in PR #5267
We should not be validating the type of fields that we don't care about.
Cleaned up the messages and exceptions thrown so that we know which
field is the problematic one.

* Update pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java

Co-authored-by: Xiaotian (Jackie) Jiang <17555551+Jackie-Jiang@users.noreply.github.com>

Co-authored-by: Xiaotian (Jackie) Jiang <17555551+Jackie-Jiang@users.noreply.github.com>
  • Loading branch information
mcvsubbu and Jackie-Jiang committed Jul 13, 2020
1 parent 24b2cba commit 85ec229
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReader
if (category == TypeDescription.Category.LIST) {
// Multi-value field
TypeDescription.Category childCategory = fieldType.getChildren().get(0).getCategory();
Preconditions.checkState(isSupportedSingleValueType(childCategory), "Illegal multi-value field type: %s",
childCategory);
Preconditions.checkState(isSupportedSingleValueType(childCategory), "Illegal multi-value field type: %s (field %s)",
childCategory, field);
// NOTE: LIST is stored as 2 vectors
int fieldId = fieldType.getId();
orcReaderInclude[fieldId] = true;
Expand All @@ -108,10 +108,10 @@ public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReader
List<TypeDescription> children = fieldType.getChildren();
TypeDescription.Category keyCategory = children.get(0).getCategory();
Preconditions
.checkState(isSupportedSingleValueType(keyCategory), "Illegal map key field type: %s", keyCategory);
.checkState(isSupportedSingleValueType(keyCategory), "Illegal map key field type: %s (field %s)", keyCategory, field);
TypeDescription.Category valueCategory = children.get(1).getCategory();
Preconditions
.checkState(isSupportedSingleValueType(valueCategory), "Illegal map value field type: %s", valueCategory);
.checkState(isSupportedSingleValueType(valueCategory), "Illegal map value field type: %s (field %s)", valueCategory, field);
// NOTE: MAP is stored as 3 vectors
int fieldId = fieldType.getId();
orcReaderInclude[fieldId] = true;
Expand All @@ -120,11 +120,11 @@ public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReader
} else {
// Single-value field
Preconditions
.checkState(isSupportedSingleValueType(category), "Illegal single-value field type: %s", category);
.checkState(isSupportedSingleValueType(category), "Illegal single-value field type: %s (field %s)", category, field);
orcReaderInclude[fieldType.getId()] = true;
}
_includeOrcFields[i] = true;
}
_includeOrcFields[i] = true;
}

_orcRecordReader = orcReader.rows(new Reader.Options().include(orcReaderInclude));
Expand Down Expand Up @@ -186,7 +186,7 @@ public GenericRow next(GenericRow reuse)
int length = (int) listColumnVector.lengths[rowId];
List<Object> values = new ArrayList<>(length);
for (int j = 0; j < length; j++) {
Object value = extractSingleValue(listColumnVector.child, offset + j, childCategory);
Object value = extractSingleValue(field, listColumnVector.child, offset + j, childCategory);
// NOTE: Only keep non-null values
// TODO: Revisit
if (value != null) {
Expand Down Expand Up @@ -216,8 +216,8 @@ public GenericRow next(GenericRow reuse)
Map<Object, Object> map = new HashMap<>();
for (int j = 0; j < length; j++) {
int childRowId = offset + j;
Object key = extractSingleValue(mapColumnVector.keys, childRowId, keyCategory);
Object value = extractSingleValue(mapColumnVector.values, childRowId, valueCategory);
Object key = extractSingleValue(field, mapColumnVector.keys, childRowId, keyCategory);
Object value = extractSingleValue(field, mapColumnVector.values, childRowId, valueCategory);
map.put(key, value);
}
reuse.putValue(field, map);
Expand All @@ -226,7 +226,7 @@ public GenericRow next(GenericRow reuse)
}
} else {
// Single-value field
reuse.putValue(field, extractSingleValue(_rowBatch.cols[i], _nextRowId, category));
reuse.putValue(field, extractSingleValue(field, _rowBatch.cols[i], _nextRowId, category));
}
}

Expand All @@ -238,7 +238,7 @@ public GenericRow next(GenericRow reuse)
}

@Nullable
private static Object extractSingleValue(ColumnVector columnVector, int rowId, TypeDescription.Category category) {
private static Object extractSingleValue(String field, ColumnVector columnVector, int rowId, TypeDescription.Category category) {
if (columnVector.isRepeating) {
rowId = 0;
}
Expand Down Expand Up @@ -324,7 +324,7 @@ private static Object extractSingleValue(ColumnVector columnVector, int rowId, T
}
default:
// Unsupported types
throw new IllegalStateException("Unsupported field type: " + category);
throw new IllegalStateException("Unsupported field type: " + category + " for field: " + field);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected RecordReader createRecordReader()
protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
throws Exception {
TypeDescription schema = TypeDescription.fromString(
"struct<dim_sv_int:int,dim_sv_long:bigint,dim_sv_float:float,dim_sv_double:double,dim_sv_string:string,dim_mv_int:array<int>,dim_mv_long:array<bigint>,dim_mv_float:array<float>,dim_mv_double:array<double>,dim_mv_string:array<string>,met_int:int,met_long:bigint,met_float:float,met_double:double>");
"struct<dim_sv_int:int,dim_sv_long:bigint,dim_sv_float:float,dim_sv_double:double,dim_sv_string:string,dim_mv_int:array<int>,dim_mv_long:array<bigint>,dim_mv_float:array<float>,dim_mv_double:array<double>,dim_mv_string:array<string>,met_int:int,met_long:bigint,met_float:float,met_double:double,extra_field:struct<f1:int,f2:int>>");
Writer writer = OrcFile.createWriter(new Path(_dataFile.getAbsolutePath()),
OrcFile.writerOptions(new Configuration()).setSchema(schema));

Expand Down

0 comments on commit 85ec229

Please sign in to comment.