Skip to content
Permalink
Browse files
DRILL-6603: Set num_nulls for parquet statistics to -1 when actual nu…
…mber is not defined.
  • Loading branch information
arina-ielchiieva authored and vvysotskyi committed Jul 19, 2018
1 parent b1aca33 commit 92fbed887ca4ca0f2208f367a8f86f8aa4940513
Showing 9 changed files with 109 additions and 94 deletions.
@@ -54,7 +54,7 @@ static boolean isAllNulls(Statistics stat, long rowCount) {
* @return <tt>true</tt> if the parquet file does not have nulls and <tt>false</tt> otherwise
*/
static boolean hasNoNulls(Statistics stat) {
return stat.getNumNulls() <= 0;
return stat.getNumNulls() == 0;
}

}
@@ -89,12 +89,12 @@ public void collect(List<RowGroupInfo> rowGroupInfos, ParquetTableMetadataBase p
SchemaPath schemaPath = SchemaPath.getCompoundPath(column.getName());
Long previousCount = columnValueCounts.get(schemaPath);
if (previousCount != null) {
if (previousCount != GroupScan.NO_COLUMN_STATS && column.getNulls() != null) {
if (previousCount != GroupScan.NO_COLUMN_STATS && column.isNumNullsSet()) {
Long newCount = rowCount - column.getNulls();
columnValueCounts.put(schemaPath, columnValueCounts.get(schemaPath) + newCount);
}
} else {
if (column.getNulls() != null) {
if (column.isNumNullsSet()) {
Long newCount = rowCount - column.getNulls();
columnValueCounts.put(schemaPath, newCount);
} else {
@@ -64,7 +64,6 @@
import java.io.OutputStream;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -447,47 +446,40 @@ private ParquetFileMetadata_v3 getParquetFileMetadata_v3(ParquetTableMetadata_v3
logger.debug(containsCorruptDates.toString());
}
for (BlockMetaData rowGroup : metadata.getBlocks()) {
List<ColumnMetadata_v3> columnMetadataList = Lists.newArrayList();
List<ColumnMetadata_v3> columnMetadataList = new ArrayList<>();
long length = 0;
for (ColumnChunkMetaData col : rowGroup.getColumns()) {
ColumnMetadata_v3 columnMetadata;

boolean statsAvailable = (col.getStatistics() != null && !col.getStatistics().isEmpty());

Statistics<?> stats = col.getStatistics();
String[] columnName = col.getPath().toArray();
SchemaPath columnSchemaName = SchemaPath.getCompoundPath(columnName);
ColTypeInfo colTypeInfo = colTypeInfoMap.get(columnSchemaName);

ColumnTypeMetadata_v3 columnTypeMetadata =
new ColumnTypeMetadata_v3(columnName, col.getType(), colTypeInfo.originalType,
new ColumnTypeMetadata_v3(columnName, col.getPrimitiveType().getPrimitiveTypeName(), colTypeInfo.originalType,
colTypeInfo.precision, colTypeInfo.scale, colTypeInfo.repetitionLevel, colTypeInfo.definitionLevel);

if (parquetTableMetadata.columnTypeInfo == null) {
parquetTableMetadata.columnTypeInfo = new ConcurrentHashMap<>();
}
parquetTableMetadata.columnTypeInfo.put(new ColumnTypeMetadata_v3.Key(columnTypeMetadata.name), columnTypeMetadata);

// Save the column schema info. We'll merge it into one list
parquetTableMetadata.columnTypeInfo
.put(new ColumnTypeMetadata_v3.Key(columnTypeMetadata.name), columnTypeMetadata);
Object minValue = null;
Object maxValue = null;
long numNulls = -1;
boolean statsAvailable = stats != null && !stats.isEmpty();
if (statsAvailable) {
// Write stats when they are not null
Object minValue = null;
Object maxValue = null;
if (stats.hasNonNullValue()) {
minValue = stats.genericGetMin();
maxValue = stats.genericGetMax();
if (containsCorruptDates == ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION
&& columnTypeMetadata.originalType == OriginalType.DATE) {
if (containsCorruptDates == ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION && columnTypeMetadata.originalType == OriginalType.DATE) {
minValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) minValue);
maxValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) maxValue);
}

}
columnMetadata =
new ColumnMetadata_v3(columnTypeMetadata.name, col.getType(), minValue, maxValue, stats.getNumNulls());
} else {
columnMetadata = new ColumnMetadata_v3(columnTypeMetadata.name, col.getType(), null, null, null);
numNulls = stats.getNumNulls();
}
ColumnMetadata_v3 columnMetadata = new ColumnMetadata_v3(columnTypeMetadata.name, col.getPrimitiveType().getPrimitiveTypeName(), minValue, maxValue, numNulls);
columnMetadataList.add(columnMetadata);
length += col.getTotalSize();
}
@@ -632,12 +624,7 @@ private void readBlockMeta(Path path, boolean dirsOnly, MetadataContext metaCont
List<? extends ParquetFileMetadata> files = parquetTableMetadata.getFiles();
for (ParquetFileMetadata file : files) {
List<? extends RowGroupMetadata> rowGroups = file.getRowGroups();
for (Iterator<? extends RowGroupMetadata> iter = rowGroups.iterator(); iter.hasNext(); ) {
RowGroupMetadata r = iter.next();
if (r.getRowCount() == 0) {
iter.remove();
}
}
rowGroups.removeIf(r -> r.getRowCount() == 0);
}

}
@@ -45,10 +45,7 @@ public class MetadataBase {
* <p>
* Note: keep metadata versions synchronized with {@link MetadataVersion.Constants}
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "metadata_version",
visible = true)
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "metadata_version", visible = true)
@JsonSubTypes({
@JsonSubTypes.Type(value = Metadata_V1.ParquetTableMetadata_v1.class, name = V1),
@JsonSubTypes.Type(value = Metadata_V2.ParquetTableMetadata_v2.class, name = V2),
@@ -108,6 +105,18 @@ public static abstract class RowGroupMetadata {


public static abstract class ColumnMetadata {

/**
* Number of nulls is considered to be valid if its value is not null and -1.
*
* @return true if nulls value is defined, false otherwise
*/
@JsonIgnore
public boolean isNumNullsSet() {
Long nulls = getNulls();
return nulls != null && nulls != -1;
}

public abstract String[] getName();

public abstract Long getNulls();
@@ -21,10 +21,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.KeyDeserializer;
import com.fasterxml.jackson.databind.SerializerProvider;
@@ -317,8 +313,7 @@ public DeSerializer() {
}

@Override
public Object deserializeKey(String key, com.fasterxml.jackson.databind.DeserializationContext ctxt)
throws IOException, com.fasterxml.jackson.core.JsonProcessingException {
public Object deserializeKey(String key, com.fasterxml.jackson.databind.DeserializationContext ctxt) {
// key string should contain '`' char if the field was serialized as SchemaPath object
if (key.contains("`")) {
return new Key(SchemaPath.parseFromString(key));
@@ -391,10 +386,10 @@ public ColumnMetadata_v3(String[] name, PrimitiveType.PrimitiveTypeName primitiv
*/
@Override
public boolean hasSingleValue(long rowCount) {
if (nulls != null) {
if (isNumNullsSet()) {
if (minValue != null) {
// Objects.deepEquals() is used here, since min and max may be byte arrays
return Objects.deepEquals(minValue, maxValue) && (nulls == 0 || nulls == rowCount);
return (nulls == 0 || nulls == rowCount) && Objects.deepEquals(minValue, maxValue);
} else {
return nulls == rowCount && maxValue == null;
}
@@ -418,19 +413,10 @@ public boolean hasSingleValue(long rowCount) {
return null;
}

public static class DeSerializer extends JsonDeserializer<ColumnMetadata_v3> {
@Override public ColumnMetadata_v3 deserialize(JsonParser jp, DeserializationContext ctxt)
throws IOException, JsonProcessingException {
return null;
}
}


// We use a custom serializer and write only non null values.
public static class Serializer extends JsonSerializer<ColumnMetadata_v3> {
@Override
public void serialize(ColumnMetadata_v3 value, JsonGenerator jgen, SerializerProvider provider)
throws IOException, JsonProcessingException {
public void serialize(ColumnMetadata_v3 value, JsonGenerator jgen, SerializerProvider provider) throws IOException {
jgen.writeStartObject();
jgen.writeArrayFieldStart("name");
for (String n : value.name) {
@@ -95,7 +95,7 @@ public Map<SchemaPath, ColumnStatistics> collectColStat(Set<SchemaPath> fields)
if (columnMetadata != null) {
final Object min = columnMetadata.getMinValue();
final Object max = columnMetadata.getMaxValue();
final Long numNull = columnMetadata.getNulls();
final long numNulls = columnMetadata.getNulls() == null ? -1 : columnMetadata.getNulls();

primitiveType = this.parquetTableMetadata.getPrimitiveType(columnMetadata.getName());
originalType = this.parquetTableMetadata.getOriginalType(columnMetadata.getName());
@@ -109,7 +109,7 @@ public Map<SchemaPath, ColumnStatistics> collectColStat(Set<SchemaPath> fields)
precision = columnTypeInfo.precision;
}

statMap.put(field, getStat(min, max, numNull, primitiveType, originalType, scale, precision));
statMap.put(field, getStat(min, max, numNulls, primitiveType, originalType, scale, precision));
} else {
final String columnName = field.getRootSegment().getPath();
if (implicitColValues.containsKey(columnName)) {
@@ -137,24 +137,21 @@ public Map<SchemaPath, ColumnStatistics> collectColStat(Set<SchemaPath> fields)
*
* @param min min value for statistics
* @param max max value for statistics
* @param numNull num_nulls for statistics
* @param numNulls num_nulls for statistics
* @param primitiveType type that determines statistics class
* @param originalType type that determines statistics class
* @param scale scale value (used for DECIMAL type)
* @param precision precision value (used for DECIMAL type)
* @return column statistics
*/
private ColumnStatistics getStat(Object min, Object max, Long numNull,
private ColumnStatistics getStat(Object min, Object max, long numNulls,
PrimitiveType.PrimitiveTypeName primitiveType, OriginalType originalType,
int scale, int precision) {
Statistics stat = Statistics.getStatsBasedOnType(primitiveType);
Statistics convertedStat = stat;

TypeProtos.MajorType type = ParquetReaderUtility.getType(primitiveType, originalType, scale, precision);

if (numNull != null) {
stat.setNumNulls(numNull);
}
stat.setNumNulls(numNulls);

if (min != null && max != null ) {
switch (type.getMinorType()) {
@@ -35,21 +35,16 @@ public static void setupTestFiles() {
}

@Test
public void ensureCaseDoesntConvertToDirectScan() throws Exception {
public void ensureCaseDoesNotConvertToDirectScan() throws Exception {
testPlanMatchingPatterns(
"select count(case when n_name = 'ALGERIA' and n_regionkey = 2 then n_nationkey else null end) as cnt\n" +
"from dfs.`directcount.parquet`",
new String[] { "CASE" },
new String[]{});
"from dfs.`directcount.parquet`", new String[]{"CASE"});
}

@Test
public void ensureConvertSimpleCountToDirectScan() throws Exception {
final String sql = "select count(*) as cnt from cp.`tpch/nation.parquet`";
testPlanMatchingPatterns(
sql,
new String[] { "DynamicPojoRecordReader" },
new String[]{});
String sql = "select count(*) as cnt from cp.`tpch/nation.parquet`";
testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"});

testBuilder()
.sqlQuery(sql)
@@ -61,11 +56,8 @@ public void ensureConvertSimpleCountToDirectScan() throws Exception {

@Test
public void ensureConvertSimpleCountConstToDirectScan() throws Exception {
final String sql = "select count(100) as cnt from cp.`tpch/nation.parquet`";
testPlanMatchingPatterns(
sql,
new String[] { "DynamicPojoRecordReader" },
new String[]{});
String sql = "select count(100) as cnt from cp.`tpch/nation.parquet`";
testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"});

testBuilder()
.sqlQuery(sql)
@@ -77,11 +69,8 @@ public void ensureConvertSimpleCountConstToDirectScan() throws Exception {

@Test
public void ensureConvertSimpleCountConstExprToDirectScan() throws Exception {
final String sql = "select count(1 + 2) as cnt from cp.`tpch/nation.parquet`";
testPlanMatchingPatterns(
sql,
new String[] { "DynamicPojoRecordReader" },
new String[]{});
String sql = "select count(1 + 2) as cnt from cp.`tpch/nation.parquet`";
testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"});

testBuilder()
.sqlQuery(sql)
@@ -93,11 +82,8 @@ public void ensureConvertSimpleCountConstExprToDirectScan() throws Exception {

@Test
public void ensureDoesNotConvertForDirectoryColumns() throws Exception {
final String sql = "select count(dir0) as cnt from cp.`tpch/nation.parquet`";
testPlanMatchingPatterns(
sql,
new String[] { "ParquetGroupScan" },
new String[]{});
String sql = "select count(dir0) as cnt from cp.`tpch/nation.parquet`";
testPlanMatchingPatterns(sql, new String[]{"ParquetGroupScan"});

testBuilder()
.sqlQuery(sql)
@@ -109,11 +95,8 @@ public void ensureDoesNotConvertForDirectoryColumns() throws Exception {

@Test
public void ensureConvertForImplicitColumns() throws Exception {
final String sql = "select count(fqn) as cnt from cp.`tpch/nation.parquet`";
testPlanMatchingPatterns(
sql,
new String[] { "DynamicPojoRecordReader" },
new String[]{});
String sql = "select count(fqn) as cnt from cp.`tpch/nation.parquet`";
testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"});

testBuilder()
.sqlQuery(sql)
@@ -126,25 +109,22 @@ public void ensureConvertForImplicitColumns() throws Exception {
@Test
public void ensureConvertForSeveralColumns() throws Exception {
test("use dfs.tmp");
final String tableName = "parquet_table_counts";
String tableName = "parquet_table_counts";

try {
final String newFqnColumnName = "new_fqn";
String newFqnColumnName = "new_fqn";
test("alter session set `%s` = '%s'", ExecConstants.IMPLICIT_FQN_COLUMN_LABEL, newFqnColumnName);
test("create table %s as select * from cp.`parquet/alltypes_optional.parquet`", tableName);
test("refresh table metadata %s", tableName);

final String sql = String.format("select\n" +
String sql = String.format("select\n" +
"count(%s) as implicit_count,\n" +
"count(*) as star_count,\n" +
"count(col_int) as int_column_count,\n" +
"count(col_vrchr) as vrchr_column_count\n" +
"from %s", newFqnColumnName, tableName);

testPlanMatchingPatterns(
sql,
new String[] { "DynamicPojoRecordReader" },
new String[]{});
testPlanMatchingPatterns(sql, new String[]{"DynamicPojoRecordReader"});

testBuilder()
.sqlQuery(sql)
@@ -159,4 +139,28 @@ public void ensureConvertForSeveralColumns() throws Exception {
}
}

@Test
public void ensureCorrectCountWithMissingStatistics() throws Exception {
test("use dfs.tmp");
String tableName = "wide_str_table";
try {
// table will contain two partitions: one - with null value, second - with non null value
test("create table %s partition by (col_str) as select * from cp.`parquet/wide_string.parquet`", tableName);

String query = String.format("select count(col_str) as cnt_str, count(*) as cnt_total from %s", tableName);

// direct scan should not be applied since we don't have statistics
testPlanMatchingPatterns(query, null, new String[]{"DynamicPojoRecordReader"});

testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("cnt_str", "cnt_total")
.baselineValues(1L, 2L)
.go();
} finally {
test("drop table if exists %s", tableName);
}
}

}

0 comments on commit 92fbed8

Please sign in to comment.