Skip to content

Commit

Permalink
DRILL-4720: Fix SchemaPartitionExplorer.getSubPartitions method imple…
Browse files Browse the repository at this point in the history
…mentations to return only Drill file system directories

1. Added file system util helper classes to standardize list directory and file statuses usage in Drill with appropriate unit tests.
2. Fixed SchemaPartitionExplorer.getSubPartitions method implementations to return only directories that can be partitions according to Drill file system rules
(excluded all files and directories that start with dot or underscore).
3. Added unit test for directory explorers UDFs with and without metadata cache presence.
4. Minor refactoring.

closes #864
  • Loading branch information
arina-ielchiieva committed Jul 21, 2017
1 parent 368bc38 commit a0c178b
Show file tree
Hide file tree
Showing 17 changed files with 942 additions and 166 deletions.
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -33,6 +33,7 @@
import org.apache.drill.exec.planner.sql.parser.SqlShowFiles;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.util.FileSystemUtil;
import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
Expand All @@ -50,8 +51,8 @@ public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConv

SqlIdentifier from = ((SqlShowFiles) sqlNode).getDb();

DrillFileSystem fs = null;
String defaultLocation = null;
DrillFileSystem fs;
String defaultLocation;
String fromDir = "./";

SchemaPlus defaultSchema = config.getConverter().getDefaultSchema();
Expand Down Expand Up @@ -93,9 +94,9 @@ public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConv

List<ShowFilesCommandResult> rows = new ArrayList<>();

for (FileStatus fileStatus : fs.list(false, new Path(defaultLocation, fromDir))) {
ShowFilesCommandResult result = new ShowFilesCommandResult(fileStatus.getPath().getName(), fileStatus.isDir(),
!fileStatus.isDir(), fileStatus.getLen(),
for (FileStatus fileStatus : FileSystemUtil.listAll(fs, new Path(defaultLocation, fromDir), false)) {
ShowFilesCommandResult result = new ShowFilesCommandResult(fileStatus.getPath().getName(), fileStatus.isDirectory(),
fileStatus.isFile(), fileStatus.getLen(),
fileStatus.getOwner(), fileStatus.getGroup(),
fileStatus.getPermission().toString(),
fileStatus.getAccessTime(), fileStatus.getModificationTime());
Expand Down
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -62,7 +62,6 @@
import org.apache.hadoop.util.Progressable;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

/**
Expand All @@ -75,8 +74,8 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFileSystem.class);
private final static boolean TRACKING_ENABLED = AssertionUtil.isAssertionsEnabled();

public static final String HIDDEN_FILE_PREFIX = "_";
public static final String DOT_FILE_PREFIX = ".";
public static final String UNDERSCORE_PREFIX = "_";
public static final String DOT_PREFIX = ".";

private final ConcurrentMap<DrillFSDataInputStream, DebugStackTrace> openedFiles = Maps.newConcurrentMap();

Expand Down Expand Up @@ -747,35 +746,6 @@ public void removeXAttr(final Path path, final String name) throws IOException {
underlyingFs.removeXAttr(path, name);
}

public List<FileStatus> list(boolean recursive, Path... paths) throws IOException {
if (recursive) {
List<FileStatus> statuses = Lists.newArrayList();
for (Path p : paths) {
addRecursiveStatus(underlyingFs.getFileStatus(p), statuses);
}
return statuses;

} else {
return Lists.newArrayList(underlyingFs.listStatus(paths));
}
}

private void addRecursiveStatus(FileStatus parent, List<FileStatus> listToFill) throws IOException {
if (parent.isDir()) {
Path pattern = new Path(parent.getPath(), "*");
FileStatus[] sub = underlyingFs.globStatus(pattern, new DrillPathFilter());
for(FileStatus s : sub){
if (s.isDir()) {
addRecursiveStatus(s, listToFill);
} else {
listToFill.add(s);
}
}
} else {
listToFill.add(parent);
}
}

public InputStream openPossiblyCompressedStream(Path path) throws IOException {
CompressionCodec codec = codecFactory.getCodec(path); // infers from file ext.
if (codec != null) {
Expand Down

This file was deleted.

Expand Up @@ -20,20 +20,15 @@
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

Expand Down Expand Up @@ -166,23 +161,16 @@ public FileSelection minusDirectories(DrillFileSystem fs) throws IOException {
return this;
}
Stopwatch timer = Stopwatch.createStarted();
final List<FileStatus> statuses = getStatuses(fs);
final int total = statuses.size();
final Path[] paths = new Path[total];
for (int i=0; i<total; i++) {
paths[i] = statuses.get(i).getPath();
List<FileStatus> statuses = getStatuses(fs);

List<FileStatus> nonDirectories = Lists.newArrayList();
for (FileStatus status : statuses) {
nonDirectories.addAll(DrillFileSystemUtil.listFiles(fs, status.getPath(), true));
}
final List<FileStatus> allStats = fs.list(true, paths);
final List<FileStatus> nonDirectories = Lists.newArrayList(Iterables.filter(allStats, new Predicate<FileStatus>() {
@Override
public boolean apply(@Nullable FileStatus status) {
return !status.isDirectory();
}
}));

final FileSelection fileSel = create(nonDirectories, null, selectionRoot);
logger.debug("FileSelection.minusDirectories() took {} ms, numFiles: {}",
timer.elapsed(TimeUnit.MILLISECONDS), total);
timer.elapsed(TimeUnit.MILLISECONDS), statuses.size());

// fileSel will be null if we query an empty folder
if (fileSel != null) {
Expand Down Expand Up @@ -425,7 +413,7 @@ public MetadataContext getMetaContext() {
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("root=" + this.selectionRoot);
sb.append("root=").append(this.selectionRoot);

sb.append("files=[");
boolean isFirst = true;
Expand Down
Expand Up @@ -37,6 +37,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

Expand Down Expand Up @@ -95,7 +96,7 @@ public Iterable<String> getSubPartitions(String table,
) throws PartitionNotFoundException {
List<FileStatus> fileStatuses;
try {
fileStatuses = defaultSchema.getFS().list(false, new Path(defaultSchema.getDefaultLocation(), table));
fileStatuses = DrillFileSystemUtil.listDirectories(defaultSchema.getFS(), new Path(defaultSchema.getDefaultLocation(), table), false);
} catch (IOException e) {
throw new PartitionNotFoundException("Error finding partitions for table " + table, e);
}
Expand Down
Expand Up @@ -64,6 +64,7 @@
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.PartitionNotFoundException;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -404,7 +405,7 @@ public Iterable<String> getSubPartitions(String table,

List<FileStatus> fileStatuses;
try {
fileStatuses = getFS().list(false, new Path(getDefaultLocation(), table));
fileStatuses = DrillFileSystemUtil.listDirectories(getFS(), new Path(getDefaultLocation(), table), false);
} catch (IOException e) {
throw new PartitionNotFoundException("Error finding partitions for table " + table, e);
}
Expand Down Expand Up @@ -639,12 +640,12 @@ public void destroy(DrillTable value) {
}

/**
* Check if the table contains homogenenous files that can be read by Drill. Eg: parquet, json csv etc.
* Check if the table contains homogeneous files that can be read by Drill. Eg: parquet, json csv etc.
* However if it contains more than one of these formats or a totally different file format that Drill cannot
* understand then we will raise an exception.
* @param tableName - name of the table to be checked for homogeneous property
* @return
* @throws IOException
* @param tableName name of the table to be checked for homogeneous property
* @return true if table contains homogeneous files, false otherwise
* @throws IOException is case of problems accessing table files
*/
private boolean isHomogeneous(String tableName) throws IOException {
FileSelection fileSelection = FileSelection.create(fs, config.getLocation(), tableName);
Expand All @@ -663,7 +664,7 @@ private boolean isHomogeneous(String tableName) throws IOException {
while (!listOfFiles.isEmpty()) {
FileStatus currentFile = listOfFiles.poll();
if (currentFile.isDirectory()) {
listOfFiles.addAll(fs.list(true, currentFile.getPath()));
listOfFiles.addAll(DrillFileSystemUtil.listFiles(fs, currentFile.getPath(), true));
} else {
if (matcher != null) {
if (!matcher.isFileReadable(fs, currentFile)) {
Expand Down Expand Up @@ -709,7 +710,7 @@ public void dropTable(String table) {
long time = (System.currentTimeMillis()/1000);
Long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt();
Long p2 = r.nextLong();
final String fileNameDelimiter = DrillFileSystem.HIDDEN_FILE_PREFIX;
final String fileNameDelimiter = DrillFileSystem.UNDERSCORE_PREFIX;
String[] pathSplit = table.split(Path.SEPARATOR);
/*
* Builds the string for the renamed table
Expand All @@ -718,7 +719,7 @@ public void dropTable(String table) {
* separated by underscores
*/
tableRenameBuilder
.append(DrillFileSystem.HIDDEN_FILE_PREFIX)
.append(DrillFileSystem.UNDERSCORE_PREFIX)
.append(pathSplit[pathSplit.length - 1])
.append(fileNameDelimiter)
.append(p1.toString())
Expand Down
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -24,7 +24,7 @@

import org.apache.commons.lang3.ArrayUtils;
import org.apache.drill.exec.store.TimedRunnable;
import org.apache.drill.exec.store.dfs.DrillPathFilter;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -68,10 +68,10 @@ private static void checkMagicBytes(FileStatus status, byte[] data, int offset)
public static List<Footer> getFooters(final Configuration conf, List<FileStatus> statuses, int parallelism) throws IOException {
final List<TimedRunnable<Footer>> readers = Lists.newArrayList();
List<Footer> foundFooters = Lists.newArrayList();
for(FileStatus status : statuses){
for (FileStatus status : statuses) {


if(status.isDirectory()){
if (status.isDirectory()){
// first we check for summary file.
FileSystem fs = status.getPath().getFileSystem(conf);

Expand All @@ -83,10 +83,10 @@ public static List<Footer> getFooters(final Configuration conf, List<FileStatus>
}

// else we handle as normal file.
for(FileStatus inStatus : fs.listStatus(status.getPath(), new DrillPathFilter())){
for (FileStatus inStatus : DrillFileSystemUtil.listFiles(fs, status.getPath(), false)){
readers.add(new FooterReader(conf, inStatus));
}
}else{
} else {
readers.add(new FooterReader(conf, status));
}

Expand Down
Expand Up @@ -31,7 +31,7 @@
import org.apache.drill.common.util.DrillVersionInfo;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.TimedRunnable;
import org.apache.drill.exec.store.dfs.DrillPathFilter;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.fs.BlockLocation;
Expand Down Expand Up @@ -179,7 +179,7 @@ private Metadata(FileSystem fs, ParquetFormatConfig formatConfig) {

final List<FileStatus> childFiles = Lists.newArrayList();

for (final FileStatus file : fs.listStatus(p, new DrillPathFilter())) {
for (final FileStatus file : DrillFileSystemUtil.listAll(fs, p, false)) {
if (file.isDirectory()) {
ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath().toString())).getLeft();
metaDataList.addAll(subTableMetadata.files);
Expand Down Expand Up @@ -233,17 +233,22 @@ private Metadata(FileSystem fs, ParquetFormatConfig formatConfig) {
}

/**
* Get the parquet metadata for the parquet files in a directory
* Get the parquet metadata for the parquet files in a directory.
*
* @param path the path of the directory
* @return
* @throws IOException
* @return metadata object for an entire parquet directory structure
* @throws IOException in case of problems during accessing files
*/
private ParquetTableMetadata_v3 getParquetTableMetadata(String path) throws IOException {
Path p = new Path(path);
FileStatus fileStatus = fs.getFileStatus(p);
final Stopwatch watch = Stopwatch.createStarted();
List<FileStatus> fileStatuses = getFileStatuses(fileStatus);
List<FileStatus> fileStatuses = new ArrayList<>();
if (fileStatus.isFile()) {
fileStatuses.add(fileStatus);
} else {
fileStatuses.addAll(DrillFileSystemUtil.listFiles(fs, p, true));
}
logger.info("Took {} ms to get file statuses", watch.elapsed(TimeUnit.MILLISECONDS));
watch.reset();
watch.start();
Expand Down Expand Up @@ -289,25 +294,6 @@ private List<ParquetFileMetadata_v3> getParquetFileMetadata_v3(
return metaDataList;
}

/**
* Recursively get a list of files
*
* @param fileStatus
* @return
* @throws IOException
*/
private List<FileStatus> getFileStatuses(FileStatus fileStatus) throws IOException {
List<FileStatus> statuses = Lists.newArrayList();
if (fileStatus.isDirectory()) {
for (FileStatus child : fs.listStatus(fileStatus.getPath(), new DrillPathFilter())) {
statuses.addAll(getFileStatuses(child));
}
} else {
statuses.add(fileStatus);
}
return statuses;
}

/**
* TimedRunnable that reads the footer from parquet and collects file metadata
*/
Expand Down

0 comments on commit a0c178b

Please sign in to comment.