From 9c55f8455f17750a5828562f312e1ad88b7745f9 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Wed, 28 Oct 2015 18:24:16 -0700 Subject: [PATCH 1/2] DRILL-4053: Reduce metadata cache file size. Save a merged schema instead of repeating for every row group. Save maxValue in a row group iff minVal equals maxVal and null otherwise. --- exec/java-exec/pom.xml | 5 + .../drill/exec/store/parquet/Metadata.java | 281 ++++++++++++++---- .../exec/store/parquet/ParquetGroupScan.java | 32 +- 3 files changed, 235 insertions(+), 83 deletions(-) diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index ada66eb9dfa..90f0f3edff0 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -118,6 +118,11 @@ jackson-jaxrs-json-provider 2.4.3 + + com.fasterxml.jackson.module + jackson-module-afterburner + 2.4.0 + org.glassfish.jersey.ext jersey-mvc-freemarker diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java index aa2628f5443..024deaa2d88 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java @@ -21,17 +21,24 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonGenerator.Feature; import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.KeyDeserializer; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; + import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.module.afterburner.AfterburnerModule; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.collect.Maps; - import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.expression.SchemaPath.De; import org.apache.drill.exec.store.TimedRunnable; import org.apache.drill.exec.store.dfs.DrillPathFilter; import org.apache.hadoop.fs.BlockLocation; @@ -40,6 +47,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.codehaus.jackson.annotate.JsonIgnore; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; @@ -54,6 +62,8 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.concurrent.ConcurrentHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -61,7 +71,8 @@ public class Metadata { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Metadata.class); - public static final String METADATA_FILENAME = ".drill.parquet_metadata"; + public static final String[] OLD_METADATA_FILENAMES = {".drill.parquet_metadata"}; + public static final String METADATA_FILENAME = ".drill.parquet_metadata.v2"; private final FileSystem fs; @@ -125,6 +136,7 @@ private Metadata(FileSystem fs) { private ParquetTableMetadata_v1 createMetaFilesRecursively(final String path) throws IOException { List metaDataList = Lists.newArrayList(); List directoryList = Lists.newArrayList(); + ConcurrentHashMap columnTypeInfoSet = new ConcurrentHashMap<>(); Path p = new Path(path); FileStatus fileStatus = fs.getFileStatus(p); assert fileStatus.isDirectory() : "Expected directory"; @@ -137,14 +149,34 @@ private ParquetTableMetadata_v1 createMetaFilesRecursively(final String path) th metaDataList.addAll(subTableMetadata.files); directoryList.addAll(subTableMetadata.directories); directoryList.add(file.getPath().toString()); + // Merge the schema from the child level into the current level + //TODO: We need a merge method that merges two colums with the same name but different types + columnTypeInfoSet.putAll(subTableMetadata.columnTypeInfo); } else { childFiles.add(file); } } + ParquetTableMetadata_v1 parquetTableMetadata = new ParquetTableMetadata_v1(); if (childFiles.size() > 0) { - metaDataList.addAll(getParquetFileMetadata(childFiles)); + List childFilesMetadata = + getParquetFileMetadata(parquetTableMetadata, childFiles); + metaDataList.addAll(childFilesMetadata); + // Note that we do not need to merge the columnInfo at this point. The columnInfo is already added + // to the parquetTableMetadata. + } + + parquetTableMetadata.directories = directoryList; + parquetTableMetadata.files = metaDataList; + //TODO: We need a merge method that merges two colums with the same name but different types + if (parquetTableMetadata.columnTypeInfo == null) { + parquetTableMetadata.columnTypeInfo = new ConcurrentHashMap<>(); + } + parquetTableMetadata.columnTypeInfo.putAll(columnTypeInfoSet); + + // delete old files + for(String oldname: OLD_METADATA_FILENAMES){ + fs.delete(new Path(p, oldname), false); } - ParquetTableMetadata_v1 parquetTableMetadata = new ParquetTableMetadata_v1(metaDataList, directoryList); writeFile(parquetTableMetadata, new Path(p, METADATA_FILENAME)); return parquetTableMetadata; } @@ -175,9 +207,13 @@ private ParquetTableMetadata_v1 getParquetTableMetadata(String path) throws IOEx * @return * @throws IOException */ - private ParquetTableMetadata_v1 getParquetTableMetadata(List fileStatuses) throws IOException { - List fileMetadataList = getParquetFileMetadata(fileStatuses); - return new ParquetTableMetadata_v1(fileMetadataList, new ArrayList()); + private ParquetTableMetadata_v1 getParquetTableMetadata(List fileStatuses) + throws IOException { + ParquetTableMetadata_v1 tableMetadata = new ParquetTableMetadata_v1(); + List fileMetadataList = getParquetFileMetadata(tableMetadata, fileStatuses); + tableMetadata.files = fileMetadataList; + tableMetadata.directories = new ArrayList(); + return tableMetadata; } /** @@ -186,10 +222,11 @@ private ParquetTableMetadata_v1 getParquetTableMetadata(List fileSta * @return * @throws IOException */ - private List getParquetFileMetadata(List fileStatuses) throws IOException { + private List getParquetFileMetadata(ParquetTableMetadata_v1 parquetTableMetadata_v1, + List fileStatuses) throws IOException { List> gatherers = Lists.newArrayList(); for (FileStatus file : fileStatuses) { - gatherers.add(new MetadataGatherer(file)); + gatherers.add(new MetadataGatherer(parquetTableMetadata_v1, file)); } List metaDataList = Lists.newArrayList(); @@ -221,14 +258,16 @@ private List getFileStatuses(FileStatus fileStatus) throws IOExcepti private class MetadataGatherer extends TimedRunnable { private FileStatus fileStatus; + private ParquetTableMetadata_v1 parquetTableMetadata; - public MetadataGatherer(FileStatus fileStatus) { + public MetadataGatherer(ParquetTableMetadata_v1 parquetTableMetadata, FileStatus fileStatus) { this.fileStatus = fileStatus; + this.parquetTableMetadata = parquetTableMetadata; } @Override protected ParquetFileMetadata runInner() throws Exception { - return getParquetFileMetadata(fileStatus); + return getParquetFileMetadata(parquetTableMetadata, fileStatus); } @Override @@ -255,7 +294,8 @@ private OriginalType getOriginalType(Type type, String[] path, int depth) { * @return * @throws IOException */ - private ParquetFileMetadata getParquetFileMetadata(FileStatus file) throws IOException { + private ParquetFileMetadata getParquetFileMetadata(ParquetTableMetadata_v1 parquetTableMetadata, + FileStatus file) throws IOException { ParquetMetadata metadata = ParquetFileReader.readFooter(fs.getConf(), file); MessageType schema = metadata.getFileMetaData().getSchema(); @@ -276,13 +316,27 @@ private ParquetFileMetadata getParquetFileMetadata(FileStatus file) throws IOExc boolean statsAvailable = (col.getStatistics() != null && !col.getStatistics().isEmpty()); Statistics stats = col.getStatistics(); - SchemaPath columnName = SchemaPath.getCompoundPath(col.getPath().toArray()); + String[] columnName = col.getPath().toArray() ; + SchemaPath columnSchemaName = SchemaPath.getCompoundPath(columnName); + ColumnTypeMetadata columnTypeMetadata = + new ColumnTypeMetadata(columnName, col.getType(), originalTypeMap.get(columnSchemaName)); + if (parquetTableMetadata.columnTypeInfo == null) { + parquetTableMetadata.columnTypeInfo = new ConcurrentHashMap<>(); + } + // Save the column schema info. We'll merge it into one list + parquetTableMetadata.columnTypeInfo + .put(new ColumnTypeMetadata.Key(columnTypeMetadata.name), columnTypeMetadata); if (statsAvailable) { - columnMetadata = new ColumnMetadata(columnName, col.getType(), originalTypeMap.get(columnName), - stats.genericGetMax(), stats.genericGetMin(), stats.getNumNulls()); + // Write stats only if minVal==maxVal. Also, we then store only maxVal + Object mxValue = null; + if (stats.genericGetMax() != null && stats.genericGetMin() != null && stats.genericGetMax() + .equals(stats.genericGetMin())) { + mxValue = stats.genericGetMax(); + } + columnMetadata = + new ColumnMetadata(columnTypeMetadata.name, col.getType(), mxValue, stats.getNumNulls()); } else { - columnMetadata = new ColumnMetadata(columnName, col.getType(), originalTypeMap.get(columnName), - null, null, null); + columnMetadata = new ColumnMetadata(columnTypeMetadata.name, col.getType(), null, null); } columnMetadataList.add(columnMetadata); length += col.getTotalSize(); @@ -338,6 +392,9 @@ private void writeFile(ParquetTableMetadata_v1 parquetTableMetadata, Path p) thr jsonFactory.configure(Feature.AUTO_CLOSE_TARGET, false); jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false); ObjectMapper mapper = new ObjectMapper(jsonFactory); + SimpleModule module = new SimpleModule(); + module.addSerializer(ColumnMetadata.class, new ColumnMetadata.Serializer()); + mapper.registerModule(module); FSDataOutputStream os = fs.create(p); mapper.writerWithDefaultPrettyPrinter().writeValue(os, parquetTableMetadata); os.flush(); @@ -355,16 +412,18 @@ private ParquetTableMetadata_v1 readBlockMeta(String path) throws IOException { timer.start(); Path p = new Path(path); ObjectMapper mapper = new ObjectMapper(); - SimpleModule module = new SimpleModule(); - module.addDeserializer(SchemaPath.class, new De()); + AfterburnerModule module = new AfterburnerModule(); + module.addKeyDeserializer(ColumnTypeMetadata.Key.class, new ColumnTypeMetadata.Key.DeSerializer()); mapper.registerModule(module); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); FSDataInputStream is = fs.open(p); + ParquetTableMetadata_v1 parquetTableMetadata = mapper.readValue(is, ParquetTableMetadata_v1.class); logger.info("Took {} ms to read metadata from cache file", timer.elapsed(TimeUnit.MILLISECONDS)); timer.stop(); if (tableModified(parquetTableMetadata, p)) { - parquetTableMetadata = createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString()); + parquetTableMetadata = + createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString()); } return parquetTableMetadata; } @@ -400,8 +459,15 @@ public static class ParquetTableMetadataBase { /** * Struct which contains the metadata for an entire parquet directory structure */ - @JsonTypeName("v1") + @JsonTypeName("v2") public static class ParquetTableMetadata_v1 extends ParquetTableMetadataBase { + /* + ColumnTypeInfo is schema information from all the files and row groups, merged into + one. To get this info, we pass the ParquetTableMetadata object all the way dow to the + RowGroup and the column type is built there as it is read from the footer. + */ + @JsonProperty + public ConcurrentHashMap columnTypeInfo; @JsonProperty List files; @JsonProperty @@ -411,10 +477,17 @@ public ParquetTableMetadata_v1() { super(); } - public ParquetTableMetadata_v1(List files, List directories) { + public ParquetTableMetadata_v1(ConcurrentHashMap columnTypeInfo, + List files, List directories) { this.files = files; this.directories = directories; + this.columnTypeInfo = columnTypeInfo; } + + public ColumnTypeMetadata getColumnTypeInfo(String[] name) { + return columnTypeInfo.get(new ColumnTypeMetadata.Key(name)); + } + } /** @@ -473,71 +546,149 @@ public RowGroupMetadata(Long start, Long length, Long rowCount, } } + + public static class ColumnTypeMetadata { + @JsonProperty public String[] name; + @JsonProperty public PrimitiveTypeName primitiveType; + @JsonProperty public OriginalType originalType; + + //@JsonIgnore private int hashCode = 0; + + // Key to find by name only + @JsonIgnore private Key key; + + public ColumnTypeMetadata() { + super(); + } + + public ColumnTypeMetadata(String[] name, PrimitiveTypeName primitiveType, OriginalType originalType) { + this.name = name; + this.primitiveType = primitiveType; + this.originalType = originalType; + this.key = new Key(name); + } + + @JsonIgnore private Key key() { + return this.key; + } + + private static class Key { + private String[] name; + private int hashCode = 0; + + public Key(String[] name) { + this.name = name; + } + + @Override public int hashCode() { + if (hashCode == 0) { + hashCode = Arrays.hashCode(name); + } + return hashCode; + } + + @Override public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final Key other = (Key) obj; + return Arrays.equals(this.name, other.name); + } + + @Override public String toString() { + String s = null; + for (String namePart : name) { + if (s != null) { + s += "."; + s += namePart; + } else { + s = namePart; + } + } + return s; + } + + public static class DeSerializer extends KeyDeserializer { + + public DeSerializer() { + super(); + } + + @Override + public Object deserializeKey(String key, com.fasterxml.jackson.databind.DeserializationContext ctxt) + throws IOException, com.fasterxml.jackson.core.JsonProcessingException { + return new Key(key.split("\\.")); + } + } + } + } + + /** * A struct that contains the metadata for a column in a parquet file */ public static class ColumnMetadata { - @JsonProperty - public SchemaPath name; - @JsonProperty - public PrimitiveTypeName primitiveType; - @JsonProperty - public OriginalType originalType; + // Use a string array for name instead of Schema Path to make serialization easier + @JsonProperty public String[] name; @JsonProperty public Long nulls; - // JsonProperty for these are associated with the getters and setters - public Object max; - public Object min; + public Object mxValue; + @JsonIgnore private PrimitiveTypeName primitiveType; public ColumnMetadata() { super(); } - public ColumnMetadata(SchemaPath name, PrimitiveTypeName primitiveType, OriginalType originalType, - Object max, Object min, Long nulls) { + public ColumnMetadata(String[] name, PrimitiveTypeName primitiveType, Object mxValue, Long nulls) { this.name = name; - this.primitiveType = primitiveType; - this.originalType = originalType; - this.max = max; - this.min = min; + this.mxValue = mxValue; this.nulls = nulls; + this.primitiveType=primitiveType; } - @JsonProperty(value = "min") - public Object getMin() { - if (primitiveType == PrimitiveTypeName.BINARY && min != null) { - return new String(((Binary) min).getBytes()); - } - return min; + @JsonProperty(value = "mxValue") + public void setMax(Object mxValue) { + this.mxValue = mxValue; } - @JsonProperty(value = "max") - public Object getMax() { - if (primitiveType == PrimitiveTypeName.BINARY && max != null) { - return new String(((Binary) max).getBytes()); + public static class DeSerializer extends JsonDeserializer { + @Override public ColumnMetadata deserialize(JsonParser jp, DeserializationContext ctxt) + throws IOException, JsonProcessingException { + return null; } - return max; } - /** - * setter used during deserialization of the 'min' field of the metadata cache file. - * @param min - */ - @JsonProperty(value = "min") - public void setMin(Object min) { - this.min = min; - } - - /** - * setter used during deserialization of the 'max' field of the metadata cache file. - * @param max - */ - @JsonProperty(value = "max") - public void setMax(Object max) { - this.max = max; + // WE use a custom serializer and write only non null values. + public static class Serializer extends JsonSerializer { + @Override public void serialize(ColumnMetadata value, JsonGenerator jgen, SerializerProvider provider) + throws IOException, JsonProcessingException { + jgen.writeStartObject(); + jgen.writeArrayFieldStart("name"); + for (String n : value.name) { + jgen.writeString(n); + } + jgen.writeEndArray(); + if (value.mxValue != null) { + Object val; + if (value.primitiveType == PrimitiveTypeName.BINARY && value.mxValue != null) { + val = new String(((Binary) value.mxValue).getBytes()); + } else { + val = value.mxValue; + } + jgen.writeObjectField("mxValue", val); + } + if (value.nulls != null) { + jgen.writeObjectField("nulls", value.nulls); + } + jgen.writeEndObject(); + } } } } + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index 3a9fc0de39b..bf40002961f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -250,10 +250,13 @@ public Set getFileSet() { * @return whether column is a potential partition column */ private boolean checkForPartitionColumn(ColumnMetadata columnMetadata, boolean first) { - SchemaPath schemaPath = columnMetadata.name; + SchemaPath schemaPath = SchemaPath.getCompoundPath(columnMetadata.name); + Metadata.ColumnTypeMetadata columnTypeMetadata = + this.parquetTableMetadata.getColumnTypeInfo(columnMetadata.name); if (first) { if (hasSingleValue(columnMetadata)) { - columnTypeMap.put(schemaPath, getType(columnMetadata.primitiveType, columnMetadata.originalType)); + columnTypeMap.put(schemaPath, getType(columnTypeMetadata.primitiveType, + columnTypeMetadata.originalType)); return true; } else { return false; @@ -266,7 +269,8 @@ private boolean checkForPartitionColumn(ColumnMetadata columnMetadata, boolean f columnTypeMap.remove(schemaPath); return false; } - if (!getType(columnMetadata.primitiveType, columnMetadata.originalType).equals(columnTypeMap.get(schemaPath))) { + if (!getType(columnTypeMetadata.primitiveType, + columnTypeMetadata.originalType).equals(columnTypeMap.get(schemaPath))) { columnTypeMap.remove(schemaPath); return false; } @@ -325,18 +329,9 @@ private MajorType getType(PrimitiveTypeName type, OriginalType originalType) { } private boolean hasSingleValue(ColumnMetadata columnChunkMetaData) { - Object max = columnChunkMetaData.max; - Object min = columnChunkMetaData.min; - return max != null && max.equals(min); -/* - if (max != null && min != null) { - if (max instanceof byte[] && min instanceof byte[]) { - return Arrays.equals((byte[])max, (byte[])min); - } - return max.equals(min); - } - return false; -*/ + // ColumnMetadata will have a non-null value iff the minValue and the maxValue for the + // rowgroup are the same + return (columnChunkMetaData != null) && (columnChunkMetaData.mxValue != null); } @Override @@ -605,7 +600,7 @@ private void init() throws IOException { for (RowGroupMetadata rowGroup : file.rowGroups) { long rowCount = rowGroup.rowCount; for (ColumnMetadata column : rowGroup.columns) { - SchemaPath schemaPath = column.name; + SchemaPath schemaPath = SchemaPath.getCompoundPath(column.name); Long previousCount = columnValueCounts.get(schemaPath); if (previousCount != null) { if (previousCount != GroupScan.NO_COLUMN_STATS) { @@ -632,7 +627,7 @@ private void init() throws IOException { partitionValueMap.put(file.path, map); } Object value = map.get(schemaPath); - Object currentValue = column.max; + Object currentValue = column.mxValue; // Object currentValue = column.getMax(); if (value != null) { if (value != currentValue) { @@ -658,7 +653,8 @@ private ParquetTableMetadata_v1 removeUnneededRowGroups(ParquetTableMetadata_v1 newFileMetadataList.add(file); } } - return new ParquetTableMetadata_v1(newFileMetadataList, new ArrayList()); + return new ParquetTableMetadata_v1(parquetTableMetadata.columnTypeInfo, newFileMetadataList, + new ArrayList()); } /** From f1b11fe39f2cab226a30f4232734c783faaa9438 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Thu, 26 Nov 2015 11:20:15 -0800 Subject: [PATCH 2/2] DRILL-4053: Ensure backward compatibility for metadata cache files --- .../drill/exec/store/parquet/Metadata.java | 587 ++++++++++++++---- .../store/parquet/ParquetFileSelection.java | 12 +- .../store/parquet/ParquetFormatPlugin.java | 8 +- .../exec/store/parquet/ParquetGroupScan.java | 423 ++++++------- 4 files changed, 702 insertions(+), 328 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java index 024deaa2d88..950993aa2c4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,6 +18,7 @@ package org.apache.drill.exec.store.parquet; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.core.JsonFactory; @@ -47,6 +48,10 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.Type; import org.codehaus.jackson.annotate.JsonIgnore; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.hadoop.ParquetFileReader; @@ -54,11 +59,7 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.api.Binary; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; -import org.apache.parquet.schema.Type; import java.io.IOException; import java.util.ArrayList; @@ -71,13 +72,14 @@ public class Metadata { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Metadata.class); - public static final String[] OLD_METADATA_FILENAMES = {".drill.parquet_metadata"}; - public static final String METADATA_FILENAME = ".drill.parquet_metadata.v2"; + public static final String[] OLD_METADATA_FILENAMES = {".drill.parquet_metadata.v2"}; + public static final String METADATA_FILENAME = ".drill.parquet_metadata"; private final FileSystem fs; /** * Create the parquet metadata file for the directory at the given path, and for any subdirectories + * * @param fs * @param path * @throws IOException @@ -89,37 +91,41 @@ public static void createMeta(FileSystem fs, String path) throws IOException { /** * Get the parquet metadata for the parquet files in the given directory, including those in subdirectories + * * @param fs * @param path * @return * @throws IOException */ - public static ParquetTableMetadata_v1 getParquetTableMetadata(FileSystem fs, String path) throws IOException { + public static ParquetTableMetadata_v2 getParquetTableMetadata(FileSystem fs, String path) + throws IOException { Metadata metadata = new Metadata(fs); return metadata.getParquetTableMetadata(path); } /** * Get the parquet metadata for a list of parquet files + * * @param fs * @param fileStatuses * @return * @throws IOException */ - public static ParquetTableMetadata_v1 getParquetTableMetadata(FileSystem fs, - List fileStatuses) throws IOException { + public static ParquetTableMetadata_v2 getParquetTableMetadata(FileSystem fs, + List fileStatuses) throws IOException { Metadata metadata = new Metadata(fs); return metadata.getParquetTableMetadata(fileStatuses); } /** * Get the parquet metadata for a directory by reading the metadata file + * * @param fs * @param path The path to the metadata file, located in the directory that contains the parquet files * @return * @throws IOException */ - public static ParquetTableMetadata_v1 readBlockMeta(FileSystem fs, String path) throws IOException { + public static ParquetTableMetadataBase readBlockMeta(FileSystem fs, String path) throws IOException { Metadata metadata = new Metadata(fs); return metadata.readBlockMeta(path); } @@ -130,13 +136,15 @@ private Metadata(FileSystem fs) { /** * Create the parquet metadata file for the directory at the given path, and for any subdirectories + * * @param path * @throws IOException */ - private ParquetTableMetadata_v1 createMetaFilesRecursively(final String path) throws IOException { - List metaDataList = Lists.newArrayList(); + private ParquetTableMetadata_v2 createMetaFilesRecursively(final String path) throws IOException { + List metaDataList = Lists.newArrayList(); List directoryList = Lists.newArrayList(); - ConcurrentHashMap columnTypeInfoSet = new ConcurrentHashMap<>(); + ConcurrentHashMap columnTypeInfoSet = + new ConcurrentHashMap<>(); Path p = new Path(path); FileStatus fileStatus = fs.getFileStatus(p); assert fileStatus.isDirectory() : "Expected directory"; @@ -145,7 +153,7 @@ private ParquetTableMetadata_v1 createMetaFilesRecursively(final String path) th for (final FileStatus file : fs.listStatus(p, new DrillPathFilter())) { if (file.isDirectory()) { - ParquetTableMetadata_v1 subTableMetadata = createMetaFilesRecursively(file.getPath().toString()); + ParquetTableMetadata_v2 subTableMetadata = createMetaFilesRecursively(file.getPath().toString()); metaDataList.addAll(subTableMetadata.files); directoryList.addAll(subTableMetadata.directories); directoryList.add(file.getPath().toString()); @@ -156,10 +164,10 @@ private ParquetTableMetadata_v1 createMetaFilesRecursively(final String path) th childFiles.add(file); } } - ParquetTableMetadata_v1 parquetTableMetadata = new ParquetTableMetadata_v1(); + ParquetTableMetadata_v2 parquetTableMetadata = new ParquetTableMetadata_v2(); if (childFiles.size() > 0) { - List childFilesMetadata = - getParquetFileMetadata(parquetTableMetadata, childFiles); + List childFilesMetadata = + getParquetFileMetadata_v2(parquetTableMetadata, childFiles); metaDataList.addAll(childFilesMetadata); // Note that we do not need to merge the columnInfo at this point. The columnInfo is already added // to the parquetTableMetadata. @@ -173,8 +181,7 @@ private ParquetTableMetadata_v1 createMetaFilesRecursively(final String path) th } parquetTableMetadata.columnTypeInfo.putAll(columnTypeInfoSet); - // delete old files - for(String oldname: OLD_METADATA_FILENAMES){ + for (String oldname : OLD_METADATA_FILENAMES) { fs.delete(new Path(p, oldname), false); } writeFile(parquetTableMetadata, new Path(p, METADATA_FILENAME)); @@ -183,11 +190,12 @@ private ParquetTableMetadata_v1 createMetaFilesRecursively(final String path) th /** * Get the parquet metadata for the parquet files in a directory + * * @param path the path of the directory * @return * @throws IOException */ - private ParquetTableMetadata_v1 getParquetTableMetadata(String path) throws IOException { + private ParquetTableMetadata_v2 getParquetTableMetadata(String path) throws IOException { Path p = new Path(path); FileStatus fileStatus = fs.getFileStatus(p); Stopwatch watch = new Stopwatch(); @@ -196,21 +204,22 @@ private ParquetTableMetadata_v1 getParquetTableMetadata(String path) throws IOEx logger.info("Took {} ms to get file statuses", watch.elapsed(TimeUnit.MILLISECONDS)); watch.reset(); watch.start(); - ParquetTableMetadata_v1 metadata_v1 = getParquetTableMetadata(fileStatuses); + ParquetTableMetadata_v2 metadata_v1 = getParquetTableMetadata(fileStatuses); logger.info("Took {} ms to read file metadata", watch.elapsed(TimeUnit.MILLISECONDS)); return metadata_v1; } /** * Get the parquet metadata for a list of parquet files + * * @param fileStatuses * @return * @throws IOException */ - private ParquetTableMetadata_v1 getParquetTableMetadata(List fileStatuses) + private ParquetTableMetadata_v2 getParquetTableMetadata(List fileStatuses) throws IOException { - ParquetTableMetadata_v1 tableMetadata = new ParquetTableMetadata_v1(); - List fileMetadataList = getParquetFileMetadata(tableMetadata, fileStatuses); + ParquetTableMetadata_v2 tableMetadata = new ParquetTableMetadata_v2(); + List fileMetadataList = getParquetFileMetadata_v2(tableMetadata, fileStatuses); tableMetadata.files = fileMetadataList; tableMetadata.directories = new ArrayList(); return tableMetadata; @@ -218,24 +227,26 @@ private ParquetTableMetadata_v1 getParquetTableMetadata(List fileSta /** * Get a list of file metadata for a list of parquet files + * * @param fileStatuses * @return * @throws IOException */ - private List getParquetFileMetadata(ParquetTableMetadata_v1 parquetTableMetadata_v1, - List fileStatuses) throws IOException { - List> gatherers = Lists.newArrayList(); + private List getParquetFileMetadata_v2( + ParquetTableMetadata_v2 parquetTableMetadata_v1, List fileStatuses) throws IOException { + List> gatherers = Lists.newArrayList(); for (FileStatus file : fileStatuses) { gatherers.add(new MetadataGatherer(parquetTableMetadata_v1, file)); } - List metaDataList = Lists.newArrayList(); + List metaDataList = Lists.newArrayList(); metaDataList.addAll(TimedRunnable.run("Fetch parquet metadata", logger, gatherers, 16)); return metaDataList; } /** * Recursively get a list of files + * * @param fileStatus * @return * @throws IOException @@ -255,19 +266,19 @@ private List getFileStatuses(FileStatus fileStatus) throws IOExcepti /** * TimedRunnable that reads the footer from parquet and collects file metadata */ - private class MetadataGatherer extends TimedRunnable { + private class MetadataGatherer extends TimedRunnable { private FileStatus fileStatus; - private ParquetTableMetadata_v1 parquetTableMetadata; + private ParquetTableMetadata_v2 parquetTableMetadata; - public MetadataGatherer(ParquetTableMetadata_v1 parquetTableMetadata, FileStatus fileStatus) { + public MetadataGatherer(ParquetTableMetadata_v2 parquetTableMetadata, FileStatus fileStatus) { this.fileStatus = fileStatus; this.parquetTableMetadata = parquetTableMetadata; } @Override - protected ParquetFileMetadata runInner() throws Exception { - return getParquetFileMetadata(parquetTableMetadata, fileStatus); + protected ParquetFileMetadata_v2 runInner() throws Exception { + return getParquetFileMetadata_v2(parquetTableMetadata, fileStatus); } @Override @@ -290,42 +301,43 @@ private OriginalType getOriginalType(Type type, String[] path, int depth) { /** * Get the metadata for a single file + * * @param file * @return * @throws IOException */ - private ParquetFileMetadata getParquetFileMetadata(ParquetTableMetadata_v1 parquetTableMetadata, + private ParquetFileMetadata_v2 getParquetFileMetadata_v2(ParquetTableMetadata_v2 parquetTableMetadata, FileStatus file) throws IOException { ParquetMetadata metadata = ParquetFileReader.readFooter(fs.getConf(), file); MessageType schema = metadata.getFileMetaData().getSchema(); - Map originalTypeMap = Maps.newHashMap(); + Map originalTypeMap = Maps.newHashMap(); schema.getPaths(); for (String[] path : schema.getPaths()) { originalTypeMap.put(SchemaPath.getCompoundPath(path), getOriginalType(schema, path, 0)); } - List rowGroupMetadataList = Lists.newArrayList(); + List rowGroupMetadataList = Lists.newArrayList(); for (BlockMetaData rowGroup : metadata.getBlocks()) { - List columnMetadataList = Lists.newArrayList(); + List columnMetadataList = Lists.newArrayList(); long length = 0; for (ColumnChunkMetaData col : rowGroup.getColumns()) { - ColumnMetadata columnMetadata; + ColumnMetadata_v2 columnMetadata; boolean statsAvailable = (col.getStatistics() != null && !col.getStatistics().isEmpty()); Statistics stats = col.getStatistics(); - String[] columnName = col.getPath().toArray() ; + String[] columnName = col.getPath().toArray(); SchemaPath columnSchemaName = SchemaPath.getCompoundPath(columnName); - ColumnTypeMetadata columnTypeMetadata = - new ColumnTypeMetadata(columnName, col.getType(), originalTypeMap.get(columnSchemaName)); + ColumnTypeMetadata_v2 columnTypeMetadata = + new ColumnTypeMetadata_v2(columnName, col.getType(), originalTypeMap.get(columnSchemaName)); if (parquetTableMetadata.columnTypeInfo == null) { parquetTableMetadata.columnTypeInfo = new ConcurrentHashMap<>(); } // Save the column schema info. We'll merge it into one list parquetTableMetadata.columnTypeInfo - .put(new ColumnTypeMetadata.Key(columnTypeMetadata.name), columnTypeMetadata); + .put(new ColumnTypeMetadata_v2.Key(columnTypeMetadata.name), columnTypeMetadata); if (statsAvailable) { // Write stats only if minVal==maxVal. Also, we then store only maxVal Object mxValue = null; @@ -334,35 +346,38 @@ private ParquetFileMetadata getParquetFileMetadata(ParquetTableMetadata_v1 parqu mxValue = stats.genericGetMax(); } columnMetadata = - new ColumnMetadata(columnTypeMetadata.name, col.getType(), mxValue, stats.getNumNulls()); + new ColumnMetadata_v2(columnTypeMetadata.name, col.getType(), mxValue, stats.getNumNulls()); } else { - columnMetadata = new ColumnMetadata(columnTypeMetadata.name, col.getType(), null, null); + columnMetadata = new ColumnMetadata_v2(columnTypeMetadata.name, col.getType(), null, null); } columnMetadataList.add(columnMetadata); length += col.getTotalSize(); } - RowGroupMetadata rowGroupMeta = new RowGroupMetadata(rowGroup.getStartingPos(), length, rowGroup.getRowCount(), + RowGroupMetadata_v2 rowGroupMeta = + new RowGroupMetadata_v2(rowGroup.getStartingPos(), length, rowGroup.getRowCount(), getHostAffinity(file, rowGroup.getStartingPos(), length), columnMetadataList); rowGroupMetadataList.add(rowGroupMeta); } String path = Path.getPathWithoutSchemeAndAuthority(file.getPath()).toString(); - return new ParquetFileMetadata(path, file.getLen(), rowGroupMetadataList); + return new ParquetFileMetadata_v2(path, file.getLen(), rowGroupMetadataList); } /** * Get the host affinity for a row group + * * @param fileStatus the parquet file - * @param start the start of the row group - * @param length the length of the row group + * @param start the start of the row group + * @param length the length of the row group * @return * @throws IOException */ - private Map getHostAffinity(FileStatus fileStatus, long start, long length) throws IOException { + private Map getHostAffinity(FileStatus fileStatus, long start, long length) + throws IOException { BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, start, length); - Map hostAffinityMap = Maps.newHashMap(); + Map hostAffinityMap = Maps.newHashMap(); for (BlockLocation blockLocation : blockLocations) { for (String host : blockLocation.getHosts()) { Float currentAffinity = hostAffinityMap.get(host); @@ -370,7 +385,7 @@ private Map getHostAffinity(FileStatus fileStatus, long start, lon float blockEnd = blockStart + blockLocation.getLength(); float rowGroupEnd = start + length; Float newAffinity = (blockLocation.getLength() - (blockStart < start ? start - blockStart : 0) - - (blockEnd > rowGroupEnd ? blockEnd - rowGroupEnd : 0)) / length; + (blockEnd > rowGroupEnd ? blockEnd - rowGroupEnd : 0)) / length; if (currentAffinity != null) { hostAffinityMap.put(host, currentAffinity + newAffinity); } else { @@ -383,17 +398,18 @@ private Map getHostAffinity(FileStatus fileStatus, long start, lon /** * Serialize parquet metadata to json and write to a file + * * @param parquetTableMetadata * @param p * @throws IOException */ - private void writeFile(ParquetTableMetadata_v1 parquetTableMetadata, Path p) throws IOException { + private void writeFile(ParquetTableMetadata_v2 parquetTableMetadata, Path p) throws IOException { JsonFactory jsonFactory = new JsonFactory(); jsonFactory.configure(Feature.AUTO_CLOSE_TARGET, false); jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false); ObjectMapper mapper = new ObjectMapper(jsonFactory); SimpleModule module = new SimpleModule(); - module.addSerializer(ColumnMetadata.class, new ColumnMetadata.Serializer()); + module.addSerializer(ColumnMetadata_v2.class, new ColumnMetadata_v2.Serializer()); mapper.registerModule(module); FSDataOutputStream os = fs.create(p); mapper.writerWithDefaultPrettyPrinter().writeValue(os, parquetTableMetadata); @@ -403,22 +419,24 @@ private void writeFile(ParquetTableMetadata_v1 parquetTableMetadata, Path p) thr /** * Read the parquet metadata from a file + * * @param path * @return * @throws IOException */ - private ParquetTableMetadata_v1 readBlockMeta(String path) throws IOException { + private ParquetTableMetadataBase readBlockMeta(String path) throws IOException { Stopwatch timer = new Stopwatch(); timer.start(); Path p = new Path(path); ObjectMapper mapper = new ObjectMapper(); AfterburnerModule module = new AfterburnerModule(); - module.addKeyDeserializer(ColumnTypeMetadata.Key.class, new ColumnTypeMetadata.Key.DeSerializer()); + module.addDeserializer(SchemaPath.class, new SchemaPath.De()); + module.addKeyDeserializer(ColumnTypeMetadata_v2.Key.class, new ColumnTypeMetadata_v2.Key.DeSerializer()); mapper.registerModule(module); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); FSDataInputStream is = fs.open(p); - ParquetTableMetadata_v1 parquetTableMetadata = mapper.readValue(is, ParquetTableMetadata_v1.class); + ParquetTableMetadataBase parquetTableMetadata = mapper.readValue(is, ParquetTableMetadataBase.class); logger.info("Took {} ms to read metadata from cache file", timer.elapsed(TimeUnit.MILLISECONDS)); timer.stop(); if (tableModified(parquetTableMetadata, p)) { @@ -431,18 +449,20 @@ private ParquetTableMetadata_v1 readBlockMeta(String path) throws IOException { /** * Check if the parquet metadata needs to be updated by comparing the modification time of the directories with * the modification time of the metadata file + * * @param tableMetadata * @param metaFilePath * @return * @throws IOException */ - private boolean tableModified(ParquetTableMetadata_v1 tableMetadata, Path metaFilePath) throws IOException { + private boolean tableModified(ParquetTableMetadataBase tableMetadata, Path metaFilePath) + throws IOException { long metaFileModifyTime = fs.getFileStatus(metaFilePath).getModificationTime(); FileStatus directoryStatus = fs.getFileStatus(metaFilePath.getParent()); if (directoryStatus.getModificationTime() > metaFileModifyTime) { return true; } - for (String directory : tableMetadata.directories) { + for (String directory : tableMetadata.getDirectories()) { directoryStatus = fs.getFileStatus(new Path(directory)); if (directoryStatus.getModificationTime() > metaFileModifyTime) { return true; @@ -452,60 +472,120 @@ private boolean tableModified(ParquetTableMetadata_v1 tableMetadata, Path metaFi } @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "metadata_version") - public static class ParquetTableMetadataBase { + @JsonSubTypes({ + @JsonSubTypes.Type(value = ParquetTableMetadata_v1.class, name="v1"), + @JsonSubTypes.Type(value = ParquetTableMetadata_v2.class, name="v2") + }) + public static abstract class ParquetTableMetadataBase { + + @JsonIgnore public abstract List getDirectories(); + @JsonIgnore public abstract List getFiles(); + + @JsonIgnore public abstract void assignFiles(List newFiles); + + public abstract boolean hasColumnMetadata(); + + @JsonIgnore public abstract PrimitiveTypeName getPrimitiveType(String[] columnName); + + @JsonIgnore public abstract OriginalType getOriginalType(String[] columnName); } - /** - * Struct which contains the metadata for an entire parquet directory structure - */ - @JsonTypeName("v2") + public static abstract class ParquetFileMetadata { + @JsonIgnore public abstract String getPath(); + + @JsonIgnore public abstract Long getLength(); + + @JsonIgnore public abstract List getRowGroups(); + } + + + public static abstract class RowGroupMetadata { + @JsonIgnore public abstract Long getStart(); + + @JsonIgnore public abstract Long getLength(); + + @JsonIgnore public abstract Long getRowCount(); + + @JsonIgnore public abstract Map getHostAffinity(); + + @JsonIgnore public abstract List getColumns(); + } + + + public static abstract class ColumnMetadata { + public abstract String[] getName(); + + public abstract Long getNulls(); + + public abstract boolean hasSingleValue(); + + public abstract Object getMaxValue(); + + public abstract PrimitiveTypeName getPrimitiveType(); + + public abstract OriginalType getOriginalType(); + } + + + + @JsonTypeName("v1") public static class ParquetTableMetadata_v1 extends ParquetTableMetadataBase { - /* - ColumnTypeInfo is schema information from all the files and row groups, merged into - one. To get this info, we pass the ParquetTableMetadata object all the way dow to the - RowGroup and the column type is built there as it is read from the footer. - */ - @JsonProperty - public ConcurrentHashMap columnTypeInfo; - @JsonProperty - List files; - @JsonProperty - List directories; + @JsonProperty List files; + @JsonProperty List directories; public ParquetTableMetadata_v1() { super(); } - public ParquetTableMetadata_v1(ConcurrentHashMap columnTypeInfo, - List files, List directories) { + public ParquetTableMetadata_v1(ParquetTableMetadataBase p, List files, + List directories) { this.files = files; this.directories = directories; - this.columnTypeInfo = columnTypeInfo; } - public ColumnTypeMetadata getColumnTypeInfo(String[] name) { - return columnTypeInfo.get(new ColumnTypeMetadata.Key(name)); + @JsonIgnore @Override public List getDirectories() { + return directories; + } + + @JsonIgnore @Override public List getFiles() { + return files; + } + + @JsonIgnore @Override public void assignFiles(List newFiles) { + this.files = (List) newFiles; + } + + @Override public boolean hasColumnMetadata() { + return false; } + @JsonIgnore @Override public PrimitiveTypeName getPrimitiveType(String[] columnName) { + return null; + } + + @JsonIgnore @Override public OriginalType getOriginalType(String[] columnName) { + return null; + } } + /** * Struct which contains the metadata for a single parquet file */ - public static class ParquetFileMetadata { + public static class ParquetFileMetadata_v1 extends ParquetFileMetadata { @JsonProperty public String path; @JsonProperty public Long length; @JsonProperty - public List rowGroups; + public List rowGroups; - public ParquetFileMetadata() { + public ParquetFileMetadata_v1() { super(); } - public ParquetFileMetadata(String path, Long length, List rowGroups) { + public ParquetFileMetadata_v1(String path, Long length, List rowGroups) { this.path = path; this.length = length; this.rowGroups = rowGroups; @@ -515,12 +595,25 @@ public ParquetFileMetadata(String path, Long length, List rowG public String toString() { return String.format("path: %s rowGroups: %s", path, rowGroups); } + + @JsonIgnore @Override public String getPath() { + return path; + } + + @JsonIgnore @Override public Long getLength() { + return length; + } + + @JsonIgnore @Override public List getRowGroups() { + return rowGroups; + } } + /** * A struct that contains the metadata for a parquet row group */ - public static class RowGroupMetadata { + public static class RowGroupMetadata_v1 extends RowGroupMetadata { @JsonProperty public Long start; @JsonProperty @@ -530,38 +623,291 @@ public static class RowGroupMetadata { @JsonProperty public Map hostAffinity; @JsonProperty - public List columns; + public List columns; + + public RowGroupMetadata_v1() { + super(); + } + + public RowGroupMetadata_v1(Long start, Long length, Long rowCount, Map hostAffinity, + List columns) { + this.start = start; + this.length = length; + this.rowCount = rowCount; + this.hostAffinity = hostAffinity; + this.columns = columns; + } + + @Override public Long getStart() { + return start; + } + + @Override public Long getLength() { + return length; + } + + @Override public Long getRowCount() { + return rowCount; + } + + @Override public Map getHostAffinity() { + return hostAffinity; + } + + @Override public List getColumns() { + return columns; + } + } + + + /** + * A struct that contains the metadata for a column in a parquet file + */ + public static class ColumnMetadata_v1 extends ColumnMetadata { + @JsonProperty + public SchemaPath name; + @JsonProperty + public PrimitiveTypeName primitiveType; + @JsonProperty + public OriginalType originalType; + @JsonProperty + public Long nulls; + + // JsonProperty for these are associated with the getters and setters + public Object max; + public Object min; + + + public ColumnMetadata_v1() { + super(); + } + + public ColumnMetadata_v1(SchemaPath name, PrimitiveTypeName primitiveType, OriginalType originalType, + Object max, Object min, Long nulls) { + this.name = name; + this.primitiveType = primitiveType; + this.originalType = originalType; + this.max = max; + this.min = min; + this.nulls = nulls; + } + + @JsonProperty(value = "min") + public Object getMin() { + if (primitiveType == PrimitiveTypeName.BINARY && min != null) { + return new String(((Binary) min).getBytes()); + } + return min; + } - public RowGroupMetadata() { + @JsonProperty(value = "max") + public Object getMax() { + if (primitiveType == PrimitiveTypeName.BINARY && max != null) { + return new String(((Binary) max).getBytes()); + } + return max; + } + + @Override public PrimitiveTypeName getPrimitiveType() { + return primitiveType; + } + + @Override public OriginalType getOriginalType() { + return originalType; + } + + /** + * setter used during deserialization of the 'min' field of the metadata cache file. + * + * @param min + */ + @JsonProperty(value = "min") + public void setMin(Object min) { + this.min = min; + } + + /** + * setter used during deserialization of the 'max' field of the metadata cache file. + * + * @param max + */ + @JsonProperty(value = "max") + public void setMax(Object max) { + this.max = max; + } + + @Override public String[] getName() { + String[] s = new String[1]; + String nameString = name.toString(); + // Strip out the surrounding backticks. + s[0]=nameString.substring(1, nameString.length()-1); + return s; + } + + @Override public Long getNulls() { + return nulls; + } + + @Override public boolean hasSingleValue() { + return (max != null && min != null && max.equals(min)); + } + + @Override public Object getMaxValue() { + return max; + } + + + } + + /** + * Struct which contains the metadata for an entire parquet directory structure + */ + @JsonTypeName("v2") public static class ParquetTableMetadata_v2 extends ParquetTableMetadataBase { + /* + ColumnTypeInfo is schema information from all the files and row groups, merged into + one. To get this info, we pass the ParquetTableMetadata object all the way dow to the + RowGroup and the column type is built there as it is read from the footer. + */ + @JsonProperty public ConcurrentHashMap columnTypeInfo; + @JsonProperty List files; + @JsonProperty List directories; + + public ParquetTableMetadata_v2() { + super(); + } + + public ParquetTableMetadata_v2(ParquetTableMetadataBase parquetTable, + List files, List directories) { + this.files = files; + this.directories = directories; + this.columnTypeInfo = ((ParquetTableMetadata_v2) parquetTable).columnTypeInfo; + } + + public ColumnTypeMetadata_v2 getColumnTypeInfo(String[] name) { + return columnTypeInfo.get(new ColumnTypeMetadata_v2.Key(name)); + } + + @JsonIgnore @Override public List getDirectories() { + return directories; + } + + @JsonIgnore @Override public List getFiles() { + return files; + } + + @JsonIgnore @Override public void assignFiles(List newFiles) { + this.files = (List) newFiles; + } + + @Override public boolean hasColumnMetadata() { + return true; + } + + @JsonIgnore @Override public PrimitiveTypeName getPrimitiveType(String[] columnName) { + return getColumnTypeInfo(columnName).primitiveType; + } + + @JsonIgnore @Override public OriginalType getOriginalType(String[] columnName) { + return getColumnTypeInfo(columnName).originalType; + } + + } + + + /** + * Struct which contains the metadata for a single parquet file + */ + public static class ParquetFileMetadata_v2 extends ParquetFileMetadata { + @JsonProperty public String path; + @JsonProperty public Long length; + @JsonProperty public List rowGroups; + + public ParquetFileMetadata_v2() { + super(); + } + + public ParquetFileMetadata_v2(String path, Long length, List rowGroups) { + this.path = path; + this.length = length; + this.rowGroups = rowGroups; + } + + @Override public String toString() { + return String.format("path: %s rowGroups: %s", path, rowGroups); + } + + @JsonIgnore @Override public String getPath() { + return path; + } + + @JsonIgnore @Override public Long getLength() { + return length; + } + + @JsonIgnore @Override public List getRowGroups() { + return rowGroups; + } + } + + + /** + * A struct that contains the metadata for a parquet row group + */ + public static class RowGroupMetadata_v2 extends RowGroupMetadata { + @JsonProperty public Long start; + @JsonProperty public Long length; + @JsonProperty public Long rowCount; + @JsonProperty public Map hostAffinity; + @JsonProperty public List columns; + + public RowGroupMetadata_v2() { super(); } - public RowGroupMetadata(Long start, Long length, Long rowCount, - Map hostAffinity, List columns) { + public RowGroupMetadata_v2(Long start, Long length, Long rowCount, Map hostAffinity, + List columns) { this.start = start; this.length = length; this.rowCount = rowCount; this.hostAffinity = hostAffinity; this.columns = columns; } + + @Override public Long getStart() { + return start; + } + + @Override public Long getLength() { + return length; + } + + @Override public Long getRowCount() { + return rowCount; + } + + @Override public Map getHostAffinity() { + return hostAffinity; + } + + @Override public List getColumns() { + return columns; + } } - public static class ColumnTypeMetadata { + public static class ColumnTypeMetadata_v2 { @JsonProperty public String[] name; @JsonProperty public PrimitiveTypeName primitiveType; @JsonProperty public OriginalType originalType; - //@JsonIgnore private int hashCode = 0; - // Key to find by name only @JsonIgnore private Key key; - public ColumnTypeMetadata() { + public ColumnTypeMetadata_v2() { super(); } - public ColumnTypeMetadata(String[] name, PrimitiveTypeName primitiveType, OriginalType originalType) { + public ColumnTypeMetadata_v2(String[] name, PrimitiveTypeName primitiveType, OriginalType originalType) { this.name = name; this.primitiveType = primitiveType; this.originalType = originalType; @@ -630,42 +976,66 @@ public Object deserializeKey(String key, com.fasterxml.jackson.databind.Deserial /** * A struct that contains the metadata for a column in a parquet file */ - public static class ColumnMetadata { + public static class ColumnMetadata_v2 extends ColumnMetadata { // Use a string array for name instead of Schema Path to make serialization easier @JsonProperty public String[] name; - @JsonProperty - public Long nulls; + @JsonProperty public Long nulls; public Object mxValue; @JsonIgnore private PrimitiveTypeName primitiveType; - public ColumnMetadata() { + public ColumnMetadata_v2() { super(); } - public ColumnMetadata(String[] name, PrimitiveTypeName primitiveType, Object mxValue, Long nulls) { + public ColumnMetadata_v2(String[] name, PrimitiveTypeName primitiveType, Object mxValue, Long nulls) { this.name = name; this.mxValue = mxValue; this.nulls = nulls; - this.primitiveType=primitiveType; + this.primitiveType = primitiveType; } - @JsonProperty(value = "mxValue") - public void setMax(Object mxValue) { + @JsonProperty(value = "mxValue") public void setMax(Object mxValue) { this.mxValue = mxValue; } - public static class DeSerializer extends JsonDeserializer { - @Override public ColumnMetadata deserialize(JsonParser jp, DeserializationContext ctxt) + @Override public String[] getName() { + return name; + } + + @Override public Long getNulls() { + return nulls; + } + + public boolean hasSingleValue() { + return (mxValue != null); + } + + @Override public Object getMaxValue() { + return mxValue; + } + + @Override public PrimitiveTypeName getPrimitiveType() { + return null; + } + + @Override public OriginalType getOriginalType() { + return null; + } + + public static class DeSerializer extends JsonDeserializer { + @Override public ColumnMetadata_v2 deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException { return null; } } - // WE use a custom serializer and write only non null values. - public static class Serializer extends JsonSerializer { - @Override public void serialize(ColumnMetadata value, JsonGenerator jgen, SerializerProvider provider) + + // We use a custom serializer and write only non null values. + public static class Serializer extends JsonSerializer { + @Override + public void serialize(ColumnMetadata_v2 value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException { jgen.writeStartObject(); jgen.writeArrayFieldStart("name"); @@ -690,5 +1060,6 @@ public static class Serializer extends JsonSerializer { } } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java index 26ebfc5df01..33dccd6896e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java @@ -19,17 +19,17 @@ import com.google.common.base.Preconditions; import org.apache.drill.exec.store.dfs.FileSelection; -import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadata_v1; +import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadataBase; /** - * Parquet specific {@link FileSelection selection} that carries out {@link ParquetTableMetadata_v1 metadata} along. + * Parquet specific {@link FileSelection selection} that carries out {@link ParquetTableMetadataBase metadata} along. */ public class ParquetFileSelection extends FileSelection { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetFileSelection.class); - private final ParquetTableMetadata_v1 metadata; + private final ParquetTableMetadataBase metadata; - protected ParquetFileSelection(final FileSelection delegate, final ParquetTableMetadata_v1 metadata) { + protected ParquetFileSelection(final FileSelection delegate, final ParquetTableMetadataBase metadata) { super(delegate); this.metadata = Preconditions.checkNotNull(metadata, "Parquet metadata cannot be null"); } @@ -40,7 +40,7 @@ protected ParquetFileSelection(final FileSelection delegate, final ParquetTableM * It will always be null for non-parquet files and null for cases * where no metadata cache was created. */ - public ParquetTableMetadata_v1 getParquetMetadata() { + public ParquetTableMetadataBase getParquetMetadata() { return metadata; } @@ -52,7 +52,7 @@ public ParquetTableMetadata_v1 getParquetMetadata() { * @return null if selection is null * otherwise a new selection */ - public static ParquetFileSelection create(final FileSelection selection, final ParquetTableMetadata_v1 metadata) { + public static ParquetFileSelection create(final FileSelection selection, final ParquetTableMetadataBase metadata) { if (selection == null) { return null; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java index 4932aafb498..e2cc6708dc9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java @@ -50,8 +50,6 @@ import org.apache.drill.exec.store.dfs.FormatSelection; import org.apache.drill.exec.store.dfs.MagicString; import org.apache.drill.exec.store.mock.MockStorageEngine; -import org.apache.drill.exec.store.parquet.Metadata.ParquetFileMetadata; -import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadata_v1; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -225,10 +223,10 @@ private FileSelection expandSelection(DrillFileSystem fs, FileSelection selectio Path metaFilePath = getMetadataPath(metaRootDir); // get the metadata for the directory by reading the metadata file - ParquetTableMetadata_v1 metadata = Metadata.readBlockMeta(fs, metaFilePath.toString()); + Metadata.ParquetTableMetadataBase metadata = Metadata.readBlockMeta(fs, metaFilePath.toString()); List fileNames = Lists.newArrayList(); - for (ParquetFileMetadata file : metadata.files) { - fileNames.add(file.path); + for (Metadata.ParquetFileMetadata file : metadata.getFiles()) { + fileNames.add(file.getPath()); } // when creating the file selection, set the selection root in the form /a/b instead of // file:/a/b. The reason is that the file names above have been created in the form diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index bf40002961f..1677f8b45e1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -58,7 +58,7 @@ import org.apache.drill.exec.store.dfs.easy.FileWork; import org.apache.drill.exec.store.parquet.Metadata.ColumnMetadata; import org.apache.drill.exec.store.parquet.Metadata.ParquetFileMetadata; -import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadata_v1; +import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadataBase; import org.apache.drill.exec.store.parquet.Metadata.RowGroupMetadata; import org.apache.drill.exec.store.schedule.AffinityCreator; import org.apache.drill.exec.store.schedule.AssignmentCreator; @@ -124,7 +124,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { * from a metadata cache file earlier; we can re-use during * the ParquetGroupScan and avoid extra loading time. */ - private ParquetTableMetadata_v1 parquetTableMetadata = null; + private Metadata.ParquetTableMetadataBase parquetTableMetadata = null; /* * total number of rows (obtained from parquet footer) @@ -136,16 +136,15 @@ public class ParquetGroupScan extends AbstractFileGroupScan { */ private Map columnValueCounts; - @JsonCreator - public ParquetGroupScan( // + @JsonCreator public ParquetGroupScan( // @JsonProperty("userName") String userName, - @JsonProperty("entries") List entries, // + @JsonProperty("entries") List entries,// @JsonProperty("storage") StoragePluginConfig storageConfig, // @JsonProperty("format") FormatPluginConfig formatConfig, // @JacksonInject StoragePluginRegistry engineRegistry, // @JsonProperty("columns") List columns, // @JsonProperty("selectionRoot") String selectionRoot // - ) throws IOException, ExecutionSetupException { + ) throws IOException, ExecutionSetupException { super(ImpersonationUtil.resolveUserName(userName)); this.columns = columns; if (formatConfig == null) { @@ -169,7 +168,7 @@ public ParquetGroupScan( // ParquetFormatPlugin formatPlugin, // String selectionRoot, List columns) // - throws IOException { + throws IOException { super(userName); this.formatPlugin = formatPlugin; this.columns = columns; @@ -240,23 +239,29 @@ public Set getFileSet() { private Set fileSet; @JsonIgnore - private Map columnTypeMap = Maps.newHashMap(); + private Map columnTypeMap = Maps.newHashMap(); /** - * When reading the very first footer, any column is a potential partition column. So for the first footer, we check - * every column to see if it is single valued, and if so, add it to the list of potential partition columns. For the - * remaining footers, we will not find any new partition columns, but we may discover that what was previously a - * potential partition column now no longer qualifies, so it needs to be removed from the list. - * @return whether column is a potential partition column - */ + * When reading the very first footer, any column is a potential partition column. So for the first footer, we check + * every column to see if it is single valued, and if so, add it to the list of potential partition columns. For the + * remaining footers, we will not find any new partition columns, but we may discover that what was previously a + * potential partition column now no longer qualifies, so it needs to be removed from the list. + * @return whether column is a potential partition column + */ private boolean checkForPartitionColumn(ColumnMetadata columnMetadata, boolean first) { - SchemaPath schemaPath = SchemaPath.getCompoundPath(columnMetadata.name); - Metadata.ColumnTypeMetadata columnTypeMetadata = - this.parquetTableMetadata.getColumnTypeInfo(columnMetadata.name); + SchemaPath schemaPath = SchemaPath.getCompoundPath(columnMetadata.getName()); + final PrimitiveTypeName primitiveType; + final OriginalType originalType; + if (this.parquetTableMetadata.hasColumnMetadata()) { + primitiveType = this.parquetTableMetadata.getPrimitiveType(columnMetadata.getName()); + originalType = this.parquetTableMetadata.getOriginalType(columnMetadata.getName()); + } else { + primitiveType = columnMetadata.getPrimitiveType(); + originalType = columnMetadata.getOriginalType(); + } if (first) { if (hasSingleValue(columnMetadata)) { - columnTypeMap.put(schemaPath, getType(columnTypeMetadata.primitiveType, - columnTypeMetadata.originalType)); + columnTypeMap.put(schemaPath, getType(primitiveType, originalType)); return true; } else { return false; @@ -269,8 +274,7 @@ private boolean checkForPartitionColumn(ColumnMetadata columnMetadata, boolean f columnTypeMap.remove(schemaPath); return false; } - if (!getType(columnTypeMetadata.primitiveType, - columnTypeMetadata.originalType).equals(columnTypeMap.get(schemaPath))) { + if (!getType(primitiveType, originalType).equals(columnTypeMap.get(schemaPath))) { columnTypeMap.remove(schemaPath); return false; } @@ -282,60 +286,59 @@ private boolean checkForPartitionColumn(ColumnMetadata columnMetadata, boolean f private MajorType getType(PrimitiveTypeName type, OriginalType originalType) { if (originalType != null) { switch (originalType) { - case DECIMAL: - return Types.optional(MinorType.DECIMAL18); - case DATE: - return Types.optional(MinorType.DATE); - case TIME_MILLIS: - return Types.optional(MinorType.TIME); - case TIMESTAMP_MILLIS: - return Types.optional(MinorType.TIMESTAMP); - case UTF8: - return Types.optional(MinorType.VARCHAR); - case UINT_8: - return Types.optional(MinorType.UINT1); - case UINT_16: - return Types.optional(MinorType.UINT2); - case UINT_32: - return Types.optional(MinorType.UINT4); - case UINT_64: - return Types.optional(MinorType.UINT8); - case INT_8: - return Types.optional(MinorType.TINYINT); - case INT_16: - return Types.optional(MinorType.SMALLINT); + case DECIMAL: + return Types.optional(MinorType.DECIMAL18); + case DATE: + return Types.optional(MinorType.DATE); + case TIME_MILLIS: + return Types.optional(MinorType.TIME); + case TIMESTAMP_MILLIS: + return Types.optional(MinorType.TIMESTAMP); + case UTF8: + return Types.optional(MinorType.VARCHAR); + case UINT_8: + return Types.optional(MinorType.UINT1); + case UINT_16: + return Types.optional(MinorType.UINT2); + case UINT_32: + return Types.optional(MinorType.UINT4); + case UINT_64: + return Types.optional(MinorType.UINT8); + case INT_8: + return Types.optional(MinorType.TINYINT); + case INT_16: + return Types.optional(MinorType.SMALLINT); } } switch (type) { - case BOOLEAN: - return Types.optional(MinorType.BIT); - case INT32: - return Types.optional(MinorType.INT); - case INT64: - return Types.optional(MinorType.BIGINT); - case FLOAT: - return Types.optional(MinorType.FLOAT4); - case DOUBLE: - return Types.optional(MinorType.FLOAT8); - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - case INT96: - return Types.optional(MinorType.VARBINARY); - default: - // Should never hit this - throw new UnsupportedOperationException("Unsupported type:" + type); + case BOOLEAN: + return Types.optional(MinorType.BIT); + case INT32: + return Types.optional(MinorType.INT); + case INT64: + return Types.optional(MinorType.BIGINT); + case FLOAT: + return Types.optional(MinorType.FLOAT4); + case DOUBLE: + return Types.optional(MinorType.FLOAT8); + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + case INT96: + return Types.optional(MinorType.VARBINARY); + default: + // Should never hit this + throw new UnsupportedOperationException("Unsupported type:" + type); } } private boolean hasSingleValue(ColumnMetadata columnChunkMetaData) { // ColumnMetadata will have a non-null value iff the minValue and the maxValue for the // rowgroup are the same - return (columnChunkMetaData != null) && (columnChunkMetaData.mxValue != null); + return (columnChunkMetaData != null) && (columnChunkMetaData.hasSingleValue()); } - @Override - public void modifyFileSelection(FileSelection selection) { + @Override public void modifyFileSelection(FileSelection selection) { entries.clear(); fileSet = Sets.newHashSet(); for (String fileName : selection.getFiles()) { @@ -356,124 +359,124 @@ public MajorType getTypeForColumn(SchemaPath schemaPath) { return columnTypeMap.get(schemaPath); } - private Map> partitionValueMap = Maps.newHashMap(); + private Map> partitionValueMap = Maps.newHashMap(); public void populatePruningVector(ValueVector v, int index, SchemaPath column, String file) { String f = Path.getPathWithoutSchemeAndAuthority(new Path(file)).toString(); MinorType type = getTypeForColumn(column).getMinorType(); switch (type) { - case INT: { - NullableIntVector intVector = (NullableIntVector) v; - Integer value = (Integer) partitionValueMap.get(f).get(column); - intVector.getMutator().setSafe(index, value); - return; - } - case SMALLINT: { - NullableSmallIntVector smallIntVector = (NullableSmallIntVector) v; - Integer value = (Integer) partitionValueMap.get(f).get(column); - smallIntVector.getMutator().setSafe(index, value.shortValue()); - return; - } - case TINYINT: { - NullableTinyIntVector tinyIntVector = (NullableTinyIntVector) v; - Integer value = (Integer) partitionValueMap.get(f).get(column); - tinyIntVector.getMutator().setSafe(index, value.byteValue()); - return; - } - case UINT1: { - NullableUInt1Vector intVector = (NullableUInt1Vector) v; - Integer value = (Integer) partitionValueMap.get(f).get(column); - intVector.getMutator().setSafe(index, value.byteValue()); - return; - } - case UINT2: { - NullableUInt2Vector intVector = (NullableUInt2Vector) v; - Integer value = (Integer) partitionValueMap.get(f).get(column); - intVector.getMutator().setSafe(index, (char) value.shortValue()); - return; - } - case UINT4: { - NullableUInt4Vector intVector = (NullableUInt4Vector) v; - Integer value = (Integer) partitionValueMap.get(f).get(column); - intVector.getMutator().setSafe(index, value); - return; - } - case BIGINT: { - NullableBigIntVector bigIntVector = (NullableBigIntVector) v; - Long value = (Long) partitionValueMap.get(f).get(column); - bigIntVector.getMutator().setSafe(index, value); - return; - } - case FLOAT4: { - NullableFloat4Vector float4Vector = (NullableFloat4Vector) v; - Float value = (Float) partitionValueMap.get(f).get(column); - float4Vector.getMutator().setSafe(index, value); - return; - } - case FLOAT8: { - NullableFloat8Vector float8Vector = (NullableFloat8Vector) v; - Double value = (Double) partitionValueMap.get(f).get(column); - float8Vector.getMutator().setSafe(index, value); - return; - } - case VARBINARY: { - NullableVarBinaryVector varBinaryVector = (NullableVarBinaryVector) v; - Object s = partitionValueMap.get(f).get(column); - byte[] bytes; - if (s instanceof Binary) { - bytes = ((Binary) s).getBytes(); - } else if (s instanceof String) { - bytes = ((String) s).getBytes(); - } else if (s instanceof byte[]) { - bytes = (byte[])s; - } else { - throw new UnsupportedOperationException("Unable to create column data for type: " + type); + case INT: { + NullableIntVector intVector = (NullableIntVector) v; + Integer value = (Integer) partitionValueMap.get(f).get(column); + intVector.getMutator().setSafe(index, value); + return; } - varBinaryVector.getMutator().setSafe(index, bytes, 0, bytes.length); - return; - } - case DECIMAL18: { - NullableDecimal18Vector decimalVector = (NullableDecimal18Vector) v; - Long value = (Long) partitionValueMap.get(f).get(column); - decimalVector.getMutator().setSafe(index, value); - return; - } - case DATE: { - NullableDateVector dateVector = (NullableDateVector) v; - Integer value = (Integer) partitionValueMap.get(f).get(column); - dateVector.getMutator().setSafe(index, DateTimeUtils.fromJulianDay(value - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5)); - return; - } - case TIME: { - NullableTimeVector timeVector = (NullableTimeVector) v; - Integer value = (Integer) partitionValueMap.get(f).get(column); - timeVector.getMutator().setSafe(index, value); - return; - } - case TIMESTAMP: { - NullableTimeStampVector timeStampVector = (NullableTimeStampVector) v; - Long value = (Long) partitionValueMap.get(f).get(column); - timeStampVector.getMutator().setSafe(index, value); - return; - } - case VARCHAR: { - NullableVarCharVector varCharVector = (NullableVarCharVector) v; - Object s = partitionValueMap.get(f).get(column); - byte[] bytes; - if (s instanceof String) { // if the metadata was read from a JSON cache file it maybe a string type - bytes = ((String) s).getBytes(); - } else if (s instanceof Binary) { - bytes = ((Binary) s).getBytes(); - } else if (s instanceof byte[]) { - bytes = (byte[])s; - } else { - throw new UnsupportedOperationException("Unable to create column data for type: " + type); + case SMALLINT: { + NullableSmallIntVector smallIntVector = (NullableSmallIntVector) v; + Integer value = (Integer) partitionValueMap.get(f).get(column); + smallIntVector.getMutator().setSafe(index, value.shortValue()); + return; } - varCharVector.getMutator().setSafe(index, bytes, 0, bytes.length); - return; - } - default: - throw new UnsupportedOperationException("Unsupported type: " + type); + case TINYINT: { + NullableTinyIntVector tinyIntVector = (NullableTinyIntVector) v; + Integer value = (Integer) partitionValueMap.get(f).get(column); + tinyIntVector.getMutator().setSafe(index, value.byteValue()); + return; + } + case UINT1: { + NullableUInt1Vector intVector = (NullableUInt1Vector) v; + Integer value = (Integer) partitionValueMap.get(f).get(column); + intVector.getMutator().setSafe(index, value.byteValue()); + return; + } + case UINT2: { + NullableUInt2Vector intVector = (NullableUInt2Vector) v; + Integer value = (Integer) partitionValueMap.get(f).get(column); + intVector.getMutator().setSafe(index, (char) value.shortValue()); + return; + } + case UINT4: { + NullableUInt4Vector intVector = (NullableUInt4Vector) v; + Integer value = (Integer) partitionValueMap.get(f).get(column); + intVector.getMutator().setSafe(index, value); + return; + } + case BIGINT: { + NullableBigIntVector bigIntVector = (NullableBigIntVector) v; + Long value = (Long) partitionValueMap.get(f).get(column); + bigIntVector.getMutator().setSafe(index, value); + return; + } + case FLOAT4: { + NullableFloat4Vector float4Vector = (NullableFloat4Vector) v; + Float value = (Float) partitionValueMap.get(f).get(column); + float4Vector.getMutator().setSafe(index, value); + return; + } + case FLOAT8: { + NullableFloat8Vector float8Vector = (NullableFloat8Vector) v; + Double value = (Double) partitionValueMap.get(f).get(column); + float8Vector.getMutator().setSafe(index, value); + return; + } + case VARBINARY: { + NullableVarBinaryVector varBinaryVector = (NullableVarBinaryVector) v; + Object s = partitionValueMap.get(f).get(column); + byte[] bytes; + if (s instanceof Binary) { + bytes = ((Binary) s).getBytes(); + } else if (s instanceof String) { + bytes = ((String) s).getBytes(); + } else if (s instanceof byte[]) { + bytes = (byte[]) s; + } else { + throw new UnsupportedOperationException("Unable to create column data for type: " + type); + } + varBinaryVector.getMutator().setSafe(index, bytes, 0, bytes.length); + return; + } + case DECIMAL18: { + NullableDecimal18Vector decimalVector = (NullableDecimal18Vector) v; + Long value = (Long) partitionValueMap.get(f).get(column); + decimalVector.getMutator().setSafe(index, value); + return; + } + case DATE: { + NullableDateVector dateVector = (NullableDateVector) v; + Integer value = (Integer) partitionValueMap.get(f).get(column); + dateVector.getMutator().setSafe(index, DateTimeUtils.fromJulianDay(value - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5)); + return; + } + case TIME: { + NullableTimeVector timeVector = (NullableTimeVector) v; + Integer value = (Integer) partitionValueMap.get(f).get(column); + timeVector.getMutator().setSafe(index, value); + return; + } + case TIMESTAMP: { + NullableTimeStampVector timeStampVector = (NullableTimeStampVector) v; + Long value = (Long) partitionValueMap.get(f).get(column); + timeStampVector.getMutator().setSafe(index, value); + return; + } + case VARCHAR: { + NullableVarCharVector varCharVector = (NullableVarCharVector) v; + Object s = partitionValueMap.get(f).get(column); + byte[] bytes; + if (s instanceof String) { // if the metadata was read from a JSON cache file it maybe a string type + bytes = ((String) s).getBytes(); + } else if (s instanceof Binary) { + bytes = ((Binary) s).getBytes(); + } else if (s instanceof byte[]) { + bytes = (byte[]) s; + } else { + throw new UnsupportedOperationException("Unable to create column data for type: " + type); + } + varCharVector.getMutator().setSafe(index, bytes, 0, bytes.length); + return; + } + default: + throw new UnsupportedOperationException("Unsupported type: " + type); } } @@ -563,26 +566,28 @@ private void init() throws IOException { if (fileSet == null) { fileSet = Sets.newHashSet(); - for (ParquetFileMetadata file : parquetTableMetadata.files) { - fileSet.add(file.path); + for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) { + fileSet.add(file.getPath()); } } - Map hostEndpointMap = Maps.newHashMap(); + Map hostEndpointMap = Maps.newHashMap(); for (DrillbitEndpoint endpoint : formatPlugin.getContext().getBits()) { hostEndpointMap.put(endpoint.getAddress(), endpoint); } rowGroupInfos = Lists.newArrayList(); - for (ParquetFileMetadata file : parquetTableMetadata.files) { + for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) { int rgIndex = 0; - for (RowGroupMetadata rg : file.rowGroups) { - RowGroupInfo rowGroupInfo = new RowGroupInfo(file.path, rg.start, rg.length, rgIndex); + for (RowGroupMetadata rg : file.getRowGroups()) { + RowGroupInfo rowGroupInfo = + new RowGroupInfo(file.getPath(), rg.getStart(), rg.getLength(), rgIndex); EndpointByteMap endpointByteMap = new EndpointByteMapImpl(); - for (String host : rg.hostAffinity.keySet()) { + for (String host : rg.getHostAffinity().keySet()) { if (hostEndpointMap.containsKey(host)) { - endpointByteMap.add(hostEndpointMap.get(host), (long) (rg.hostAffinity.get(host) * rg.length)); + endpointByteMap + .add(hostEndpointMap.get(host), (long) (rg.getHostAffinity().get(host) * rg.getLength())); } } rowGroupInfo.setEndpointByteMap(endpointByteMap); @@ -596,24 +601,24 @@ private void init() throws IOException { columnValueCounts = Maps.newHashMap(); this.rowCount = 0; boolean first = true; - for (ParquetFileMetadata file : parquetTableMetadata.files) { - for (RowGroupMetadata rowGroup : file.rowGroups) { - long rowCount = rowGroup.rowCount; - for (ColumnMetadata column : rowGroup.columns) { - SchemaPath schemaPath = SchemaPath.getCompoundPath(column.name); + for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) { + for (RowGroupMetadata rowGroup : file.getRowGroups()) { + long rowCount = rowGroup.getRowCount(); + for (ColumnMetadata column : rowGroup.getColumns()) { + SchemaPath schemaPath = SchemaPath.getCompoundPath(column.getName()); Long previousCount = columnValueCounts.get(schemaPath); if (previousCount != null) { if (previousCount != GroupScan.NO_COLUMN_STATS) { - if (column.nulls != null) { - Long newCount = rowCount - column.nulls; + if (column.getNulls() != null) { + Long newCount = rowCount - column.getNulls(); columnValueCounts.put(schemaPath, columnValueCounts.get(schemaPath) + newCount); } else { } } } else { - if (column.nulls != null) { - Long newCount = rowCount - column.nulls; + if (column.getNulls() != null) { + Long newCount = rowCount - column.getNulls(); columnValueCounts.put(schemaPath, newCount); } else { columnValueCounts.put(schemaPath, GroupScan.NO_COLUMN_STATS); @@ -621,14 +626,13 @@ private void init() throws IOException { } boolean partitionColumn = checkForPartitionColumn(column, first); if (partitionColumn) { - Map map = partitionValueMap.get(file.path); + Map map = partitionValueMap.get(file.getPath()); if (map == null) { map = Maps.newHashMap(); - partitionValueMap.put(file.path, map); + partitionValueMap.put(file.getPath(), map); } Object value = map.get(schemaPath); - Object currentValue = column.mxValue; -// Object currentValue = column.getMax(); + Object currentValue = column.getMaxValue(); if (value != null) { if (value != currentValue) { columnTypeMap.remove(schemaPath); @@ -640,21 +644,21 @@ private void init() throws IOException { columnTypeMap.remove(schemaPath); } } - this.rowCount += rowGroup.rowCount; + this.rowCount += rowGroup.getRowCount(); first = false; } } } - private ParquetTableMetadata_v1 removeUnneededRowGroups(ParquetTableMetadata_v1 parquetTableMetadata) { + private ParquetTableMetadataBase removeUnneededRowGroups(ParquetTableMetadataBase parquetTableMetadata) { List newFileMetadataList = Lists.newArrayList(); - for (ParquetFileMetadata file : parquetTableMetadata.files) { - if (fileSet.contains(file.path)) { + for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) { + if (fileSet.contains(file.getPath())) { newFileMetadataList.add(file); } } - return new ParquetTableMetadata_v1(parquetTableMetadata.columnTypeInfo, newFileMetadataList, - new ArrayList()); + parquetTableMetadata.assignFiles(newFileMetadataList); + return parquetTableMetadata; } /** @@ -699,7 +703,9 @@ protected Void runInner() throws Exception { @Override protected IOException convertToIOException(Exception e) { - return new IOException(String.format("Failure while trying to get block locations for file %s starting at %d.", rgi.getPath(), rgi.getStart())); + return new IOException(String.format( + "Failure while trying to get block locations for file %s starting at %d.", rgi.getPath(), + rgi.getStart())); } } @@ -710,11 +716,10 @@ public void applyAssignments(List incomingEndpoints) throws Ph this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos, formatPlugin.getContext()); } - @Override - public ParquetRowGroupScan getSpecificScan(int minorFragmentId) { - assert minorFragmentId < mappings.size() : String.format( - "Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.size(), - minorFragmentId); + @Override public ParquetRowGroupScan getSpecificScan(int minorFragmentId) { + assert minorFragmentId < mappings.size() : String + .format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", + mappings.size(), minorFragmentId); List rowGroupsForMinor = mappings.get(minorFragmentId);