Skip to content

Commit

Permalink
DRILL-7125: REFRESH TABLE METADATA fails after upgrade from Drill 1.1…
Browse files Browse the repository at this point in the history
…3.0 to Drill 1.15.0
  • Loading branch information
sohami committed Mar 21, 2019
1 parent 189ebb4 commit bd3ee4c
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 49 deletions.
Expand Up @@ -17,18 +17,14 @@
*/ */
package org.apache.drill.exec.planner.sql.handlers; package org.apache.drill.exec.planner.sql.handlers;


import java.util.HashSet; import org.apache.calcite.schema.SchemaPlus;
import java.util.Set; import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
import static org.apache.drill.exec.planner.sql.SchemaUtilites.findSchema;

import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.SqlNode;
import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.planner.logical.DrillTable; import org.apache.drill.exec.planner.logical.DrillTable;
Expand All @@ -37,15 +33,20 @@
import org.apache.drill.exec.planner.sql.parser.SqlRefreshMetadata; import org.apache.drill.exec.planner.sql.parser.SqlRefreshMetadata;
import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.dfs.FormatSelection; import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig; import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig;
import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
import org.apache.drill.exec.store.parquet.ParquetReaderConfig; import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
import org.apache.drill.exec.store.parquet.metadata.Metadata; import org.apache.drill.exec.store.parquet.metadata.Metadata;
import org.apache.drill.exec.store.parquet.ParquetFormatConfig; import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.drill.exec.work.foreman.ForemanSetupException; import org.apache.drill.exec.work.foreman.ForemanSetupException;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;


import java.util.HashSet;
import java.util.Set;

import static org.apache.drill.exec.planner.sql.SchemaUtilites.findSchema;

public class RefreshMetadataHandler extends DefaultSqlHandler { public class RefreshMetadataHandler extends DefaultSqlHandler {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RefreshMetadataHandler.class); private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RefreshMetadataHandler.class);


Expand Down Expand Up @@ -107,18 +108,20 @@ public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException {
return notSupported(tableName); return notSupported(tableName);
} }


FormatSelection formatSelection = (FormatSelection) selection; final FormatSelection formatSelection = (FormatSelection) selection;


FormatPluginConfig formatConfig = formatSelection.getFormat(); FormatPluginConfig formatConfig = formatSelection.getFormat();
if (!((formatConfig instanceof ParquetFormatConfig) || if (!((formatConfig instanceof ParquetFormatConfig) ||
((formatConfig instanceof NamedFormatPluginConfig) && ((NamedFormatPluginConfig) formatConfig).name.equals("parquet")))) { ((formatConfig instanceof NamedFormatPluginConfig) &&
((NamedFormatPluginConfig) formatConfig).name.equals("parquet")))) {
return notSupported(tableName); return notSupported(tableName);
} }


FileSystemPlugin plugin = (FileSystemPlugin) drillTable.getPlugin(); // Always create filesystem object using process user, since it owns the metadata file
DrillFileSystem fs = new DrillFileSystem(plugin.getFormatPlugin(formatSelection.getFormat()).getFsConf()); final DrillFileSystem fs = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(),
drillTable.getPlugin().getFormatPlugin(formatConfig).getFsConf());


Path selectionRoot = formatSelection.getSelection().getSelectionRoot(); final Path selectionRoot = formatSelection.getSelection().getSelectionRoot();
if (!fs.getFileStatus(selectionRoot).isDirectory()) { if (!fs.getFileStatus(selectionRoot).isDirectory()) {
return notSupported(tableName); return notSupported(tableName);
} }
Expand All @@ -127,7 +130,7 @@ public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException {
formatConfig = new ParquetFormatConfig(); formatConfig = new ParquetFormatConfig();
} }


ParquetReaderConfig readerConfig = ParquetReaderConfig.builder() final ParquetReaderConfig readerConfig = ParquetReaderConfig.builder()
.withFormatConfig((ParquetFormatConfig) formatConfig) .withFormatConfig((ParquetFormatConfig) formatConfig)
.withOptions(context.getOptions()) .withOptions(context.getOptions())
.build(); .build();
Expand Down
Expand Up @@ -24,23 +24,21 @@
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.module.afterburner.AfterburnerModule; import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
import org.apache.drill.exec.serialization.PathSerDe;
import java.util.Set;
import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;

import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.collections.Collectors; import org.apache.drill.common.collections.Collectors;
import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.util.DrillVersionInfo; import org.apache.drill.common.util.DrillVersionInfo;
import org.apache.drill.exec.serialization.PathSerDe;
import org.apache.drill.exec.store.TimedCallable; import org.apache.drill.exec.store.TimedCallable;
import org.apache.drill.exec.store.dfs.MetadataContext; import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility; import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.util.DrillFileSystemUtil; import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.exec.util.ImpersonationUtil; import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -69,6 +67,7 @@
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
Expand All @@ -85,7 +84,9 @@
import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.RowGroupMetadata_v3; import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.RowGroupMetadata_v3;


/** /**
* This is an utility class, holder for Parquet Table Metadata and {@link ParquetReaderConfig} * This is an utility class, holder for Parquet Table Metadata and {@link ParquetReaderConfig}. All the creation of
* parquet metadata cache using create api's are forced to happen using the process user since only that user will have
* write permission for the cache file
*/ */
public class Metadata { public class Metadata {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Metadata.class); private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Metadata.class);
Expand Down Expand Up @@ -114,7 +115,7 @@ private Metadata(ParquetReaderConfig readerConfig) {
*/ */
public static void createMeta(FileSystem fs, Path path, ParquetReaderConfig readerConfig, boolean allColumns, Set<String> columnSet) throws IOException { public static void createMeta(FileSystem fs, Path path, ParquetReaderConfig readerConfig, boolean allColumns, Set<String> columnSet) throws IOException {
Metadata metadata = new Metadata(readerConfig); Metadata metadata = new Metadata(readerConfig);
metadata.createMetaFilesRecursively(path, fs, allColumns, columnSet); metadata.createMetaFilesRecursivelyAsProcessUser(path, fs, allColumns, columnSet);
} }


/** /**
Expand Down Expand Up @@ -203,6 +204,26 @@ private static boolean ignoreReadingMetadata(MetadataContext metaContext, Path p
return false; return false;
} }


/**
* Wrapper which makes sure that in all cases metadata file is created as a process user no matter what the caller
* is passing.
* @param path to the directory of the parquet table
* @param fs file system
* @param allColumns if set, store column metadata for all the columns
* @param columnSet Set of columns for which column metadata has to be stored
* @return Pair of parquet metadata. The left one is a parquet metadata for the table. The right one of the Pair is
* a metadata for all subdirectories (if they are present and there are no any parquet files in the
* {@code path} directory).
* @throws IOException if parquet metadata can't be serialized and written to the json file
*/
private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs>
createMetaFilesRecursivelyAsProcessUser(final Path path, FileSystem fs, boolean allColumns, Set<String> columnSet)
throws IOException {
final FileSystem processUserFileSystem = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(),
fs.getConf());
return createMetaFilesRecursively(path, processUserFileSystem, allColumns, columnSet);
}

/** /**
* Create the parquet metadata files for the directory at the given path and for any subdirectories. * Create the parquet metadata files for the directory at the given path and for any subdirectories.
* Metadata cache files written to the disk contain relative paths. Returned Pair of metadata contains absolute paths. * Metadata cache files written to the disk contain relative paths. Returned Pair of metadata contains absolute paths.
Expand All @@ -216,21 +237,23 @@ private static boolean ignoreReadingMetadata(MetadataContext metaContext, Path p
* {@code path} directory). * {@code path} directory).
* @throws IOException if parquet metadata can't be serialized and written to the json file * @throws IOException if parquet metadata can't be serialized and written to the json file
*/ */
private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs> createMetaFilesRecursively(final Path path, FileSystem fs, boolean allColumns, Set<String> columnSet) throws IOException { private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs>
createMetaFilesRecursively(final Path path, FileSystem fs, boolean allColumns, Set<String> columnSet)
throws IOException {
Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
List<ParquetFileMetadata_v3> metaDataList = Lists.newArrayList(); List<ParquetFileMetadata_v3> metaDataList = Lists.newArrayList();
List<Path> directoryList = Lists.newArrayList(); List<Path> directoryList = Lists.newArrayList();
ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> columnTypeInfoSet = ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> columnTypeInfoSet =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
Path p = path; FileStatus fileStatus = fs.getFileStatus(path);
FileStatus fileStatus = fs.getFileStatus(p);
assert fileStatus.isDirectory() : "Expected directory"; assert fileStatus.isDirectory() : "Expected directory";


final Map<FileStatus, FileSystem> childFiles = new LinkedHashMap<>(); final Map<FileStatus, FileSystem> childFiles = new LinkedHashMap<>();


for (final FileStatus file : DrillFileSystemUtil.listAll(fs, p, false)) { for (final FileStatus file : DrillFileSystemUtil.listAll(fs, path, false)) {
if (file.isDirectory()) { if (file.isDirectory()) {
ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath(), fs, allColumns, columnSet)).getLeft(); ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath(), fs, allColumns,
columnSet)).getLeft();
metaDataList.addAll(subTableMetadata.files); metaDataList.addAll(subTableMetadata.files);
directoryList.addAll(subTableMetadata.directories); directoryList.addAll(subTableMetadata.directories);
directoryList.add(file.getPath()); directoryList.add(file.getPath());
Expand Down Expand Up @@ -259,17 +282,17 @@ private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs> createMetaFilesR
parquetTableMetadata.columnTypeInfo.putAll(columnTypeInfoSet); parquetTableMetadata.columnTypeInfo.putAll(columnTypeInfoSet);


for (String oldName : OLD_METADATA_FILENAMES) { for (String oldName : OLD_METADATA_FILENAMES) {
fs.delete(new Path(p, oldName), false); fs.delete(new Path(path, oldName), false);
} }
// relative paths in the metadata are only necessary for meta cache files. // relative paths in the metadata are only necessary for meta cache files.
ParquetTableMetadata_v3 metadataTableWithRelativePaths = ParquetTableMetadata_v3 metadataTableWithRelativePaths =
MetadataPathUtils.createMetadataWithRelativePaths(parquetTableMetadata, path); MetadataPathUtils.createMetadataWithRelativePaths(parquetTableMetadata, path);
writeFile(metadataTableWithRelativePaths, new Path(p, METADATA_FILENAME), fs); writeFile(metadataTableWithRelativePaths, new Path(path, METADATA_FILENAME), fs);


if (directoryList.size() > 0 && childFiles.size() == 0) { if (directoryList.size() > 0 && childFiles.size() == 0) {
ParquetTableMetadataDirs parquetTableMetadataDirsRelativePaths = ParquetTableMetadataDirs parquetTableMetadataDirsRelativePaths =
new ParquetTableMetadataDirs(metadataTableWithRelativePaths.directories); new ParquetTableMetadataDirs(metadataTableWithRelativePaths.directories);
writeFile(parquetTableMetadataDirsRelativePaths, new Path(p, METADATA_DIRECTORIES_FILENAME), fs); writeFile(parquetTableMetadataDirsRelativePaths, new Path(path, METADATA_DIRECTORIES_FILENAME), fs);
if (timer != null) { if (timer != null) {
logger.debug("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS)); logger.debug("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS));
} }
Expand Down Expand Up @@ -610,7 +633,7 @@ private void readBlockMeta(Path path, boolean dirsOnly, MetadataContext metaCont
parquetTableMetadataDirs.updateRelativePaths(metadataParentDirPath); parquetTableMetadataDirs.updateRelativePaths(metadataParentDirPath);
if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), path, metadataParentDir, metaContext, fs)) { if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), path, metadataParentDir, metaContext, fs)) {
parquetTableMetadataDirs = parquetTableMetadataDirs =
(createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null)).getRight(); (createMetaFilesRecursivelyAsProcessUser(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null)).getRight();
newMetadata = true; newMetadata = true;
} }
} else { } else {
Expand All @@ -625,7 +648,7 @@ private void readBlockMeta(Path path, boolean dirsOnly, MetadataContext metaCont
if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), path, metadataParentDir, metaContext, fs)) { if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), path, metadataParentDir, metaContext, fs)) {
// TODO change with current columns in existing metadata (auto refresh feature) // TODO change with current columns in existing metadata (auto refresh feature)
parquetTableMetadata = parquetTableMetadata =
(createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null)).getLeft(); (createMetaFilesRecursivelyAsProcessUser(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null)).getLeft();
newMetadata = true; newMetadata = true;
} }


Expand Down Expand Up @@ -664,32 +687,29 @@ private boolean tableModified(List<Path> directories, Path metaFilePath, Path pa
FileStatus directoryStatus = fs.getFileStatus(parentDir); FileStatus directoryStatus = fs.getFileStatus(parentDir);
int numDirs = 1; int numDirs = 1;
if (directoryStatus.getModificationTime() > metaFileModifyTime) { if (directoryStatus.getModificationTime() > metaFileModifyTime) {
if (timer != null) { return logAndStopTimer(true, directoryStatus.getPath().toString(), timer, numDirs);
logger.debug("Directory {} was modified. Took {} ms to check modification time of {} directories",
directoryStatus.getPath().toString(), timer.elapsed(TimeUnit.MILLISECONDS), numDirs);
timer.stop();
}
return true;
} }
boolean isModified = false;
for (Path directory : directories) { for (Path directory : directories) {
numDirs++; numDirs++;
metaContext.setStatus(directory); metaContext.setStatus(directory);
directoryStatus = fs.getFileStatus(directory); directoryStatus = fs.getFileStatus(directory);
if (directoryStatus.getModificationTime() > metaFileModifyTime) { if (directoryStatus.getModificationTime() > metaFileModifyTime) {
if (timer != null) { isModified = true;
logger.debug("Directory {} was modified. Took {} ms to check modification time of {} directories", break;
directoryStatus.getPath().toString(), timer.elapsed(TimeUnit.MILLISECONDS), numDirs);
timer.stop();
}
return true;
} }
} }
return logAndStopTimer(isModified, directoryStatus.getPath().toString(), timer, numDirs);
}

private boolean logAndStopTimer(boolean isModified, String directoryName,
Stopwatch timer, int numDirectories) {
if (timer != null) { if (timer != null) {
logger.debug("No directories were modified. Took {} ms to check modification time of {} directories", logger.debug("{} directory was modified. Took {} ms to check modification time of {} directories",
timer.elapsed(TimeUnit.MILLISECONDS), numDirs); isModified ? directoryName : "No", timer.elapsed(TimeUnit.MILLISECONDS), numDirectories);
timer.stop(); timer.stop();
} }
return false; return isModified;
} }


} }
Expand Down

0 comments on commit bd3ee4c

Please sign in to comment.