Skip to content

Commit

Permalink
DRILL-3918: During expansion save the metadata for future use.
Browse files Browse the repository at this point in the history
close #196
  • Loading branch information
Aman Sinha committed Oct 11, 2015
1 parent 8197ba8 commit b4d47c5
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 5 deletions.
Expand Up @@ -24,6 +24,7 @@

import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadata_v1;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

Expand All @@ -44,6 +45,10 @@ public class FileSelection {
public List<String> files;
public String selectionRoot;

// this is a temporary location for the reference to Parquet metadata
// TODO: ideally this should be in a Parquet specific derived class.
private ParquetTableMetadata_v1 parquetMeta = null;

public FileSelection() {
}

Expand All @@ -60,6 +65,13 @@ public FileSelection(List<FileStatus> statuses) {
this(statuses, null);
}

public FileSelection(List<String> files, String selectionRoot,
ParquetTableMetadata_v1 meta) {
this.files = files;
this.selectionRoot = selectionRoot;
this.parquetMeta = meta;
}

public FileSelection(List<FileStatus> statuses, String selectionRoot) {
this.statuses = statuses;
this.files = Lists.newArrayList();
Expand Down Expand Up @@ -128,6 +140,16 @@ public List<FileStatus> getFileStatusList(DrillFileSystem fs) throws IOException
return statuses;
}

/**
* Return the parquet table metadata that may have been read
* from a metadata cache file during creation of this file selection.
* It will always be null for non-parquet files and null for cases
* where no metadata cache was created.
*/
public ParquetTableMetadata_v1 getParquetMetadata() {
return parquetMeta;
}

private static String commonPath(FileStatus... paths) {
String commonPath = "";
String[][] folders = new String[paths.length][];
Expand Down
Expand Up @@ -230,7 +230,7 @@ private FileSelection expandSelection(DrillFileSystem fs, FileSelection selectio
// /a/b/c.parquet and the format of the selection root must match that of the file names
// otherwise downstream operations such as partition pruning can break.
Path metaRootPath = Path.getPathWithoutSchemeAndAuthority(metaRootDir.getPath());
return new FileSelection(fileNames, metaRootPath.toString(), true);
return new FileSelection(fileNames, metaRootPath.toString(), metadata /* save metadata for future use */);
} else {
// don't expand yet; ParquetGroupScan's metadata gathering operation
// does that.
Expand Down
Expand Up @@ -119,6 +119,12 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
private List<SchemaPath> columns;
private ListMultimap<Integer, RowGroupInfo> mappings;
private List<RowGroupInfo> rowGroupInfos;
/**
* The parquet table metadata may have already been read
* from a metadata cache file earlier; we can re-use during
* the ParquetGroupScan and avoid extra loading time.
*/
private ParquetTableMetadata_v1 parquetTableMetadata = null;

/*
* total number of rows (obtained from parquet footer)
Expand Down Expand Up @@ -177,6 +183,7 @@ public ParquetGroupScan( //
}

this.selectionRoot = selectionRoot;
this.parquetTableMetadata = selection.getParquetMetadata();

init();
}
Expand All @@ -201,6 +208,7 @@ private ParquetGroupScan(ParquetGroupScan that) {
this.partitionValueMap = that.partitionValueMap == null ? null : new HashMap(that.partitionValueMap);
this.fileSet = that.fileSet == null ? null : new HashSet(that.fileSet);
this.usedMetadataCache = that.usedMetadataCache;
this.parquetTableMetadata = that.parquetTableMetadata;
}


Expand Down Expand Up @@ -486,7 +494,6 @@ public void setEndpointByteMap(EndpointByteMap byteMap) {
}

private void init() throws IOException {
ParquetTableMetadata_v1 parquetTableMetadata;
List<FileStatus> fileStatuses = null;
if (entries.size() == 1) {
Path p = Path.getPathWithoutSchemeAndAuthority(new Path(entries.get(0).getPath()));
Expand All @@ -498,7 +505,9 @@ private void init() throws IOException {
}
if (metaPath != null && fs.exists(metaPath)) {
usedMetadataCache = true;
parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString());
if (parquetTableMetadata == null) {
parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString());
}
} else {
parquetTableMetadata = Metadata.getParquetTableMetadata(fs, p.toString());
}
Expand All @@ -508,9 +517,15 @@ private void init() throws IOException {
if (fs.isDirectory(new Path(selectionRoot)) && fs.exists(metaPath)) {
usedMetadataCache = true;
if (fileSet != null) {
parquetTableMetadata = removeUnneededRowGroups(Metadata.readBlockMeta(fs, metaPath.toString()));
if (parquetTableMetadata == null) {
parquetTableMetadata = removeUnneededRowGroups(Metadata.readBlockMeta(fs, metaPath.toString()));
} else {
parquetTableMetadata = removeUnneededRowGroups(parquetTableMetadata);
}
} else {
parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString());
if (parquetTableMetadata == null) {
parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString());
}
}
} else {
fileStatuses = Lists.newArrayList();
Expand Down

0 comments on commit b4d47c5

Please sign in to comment.