Skip to content

Commit

Permalink
DRILL-5127: Revert the fix for DRILL-4831
Browse files Browse the repository at this point in the history
close #718
  • Loading branch information
ppadma authored and Aman Sinha committed Jan 8, 2017
1 parent d375eeb commit 167f0ce
Showing 1 changed file with 8 additions and 59 deletions.
Expand Up @@ -23,7 +23,6 @@
import java.util.List;
import java.util.Map;
import java.util.Iterator;
import java.util.UUID;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
Expand All @@ -41,8 +40,6 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Options;

import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.ParquetFileReader;
Expand Down Expand Up @@ -213,17 +210,11 @@ private Metadata(FileSystem fs, ParquetFormatConfig formatConfig) {
for (String oldname : OLD_METADATA_FILENAMES) {
fs.delete(new Path(p, oldname), false);
}
// writeFile creates and writes to a tmp file first and then renames it
// to final metadata cache file name. We want the UUID appended to tmp file
// to be same for METADATA_FILENAME and METADATA_DIRECTORIES_FILENAME
// so we can track/debug things better.
// Generate UUID used for tmp file creation here
UUID tmpUUID = UUID.randomUUID();
writeFile(parquetTableMetadata, path, tmpUUID);
writeFile(parquetTableMetadata, new Path(p, METADATA_FILENAME));

if (directoryList.size() > 0 && childFiles.size() == 0) {
ParquetTableMetadataDirs parquetTableMetadataDirs = new ParquetTableMetadataDirs(directoryList);
writeFile(parquetTableMetadataDirs, path, tmpUUID);
writeFile(parquetTableMetadataDirs, new Path(p, METADATA_DIRECTORIES_FILENAME));
logger.info("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS));
timer.stop();
return Pair.of(parquetTableMetadata, parquetTableMetadataDirs);
Expand Down Expand Up @@ -500,76 +491,38 @@ private Map<String, Float> getHostAffinity(FileStatus fileStatus, long start, lo
return hostAffinityMap;
}

/**
* Renames Path srcPath to Path dstPath.
*
* @param srcPath
* @param dstPath
* @throws IOException
*/
private void renameFile(Path srcPath, Path dstPath) throws IOException {
try {
// Use fileContext API as FileSystem rename is deprecated.
FileContext fileContext = FileContext.getFileContext(srcPath.toUri());
fileContext.rename(srcPath, dstPath, Options.Rename.OVERWRITE);
} catch (Exception e) {
logger.info("Metadata cache file rename from {} to {} failed", srcPath.toString(), dstPath.toString(), e);
throw new IOException("metadata cache file rename failed", e);
} finally {
if (fs.exists(srcPath)) {
fs.delete(srcPath, false);
}
}
}

/**
* Serialize parquet metadata to json and write to a file
*
* @param parquetTableMetadata
* @param p
* @throws IOException
*/
private void writeFile(ParquetTableMetadata_v3 parquetTableMetadata, String path, UUID tmpUUID) throws IOException {
private void writeFile(ParquetTableMetadata_v3 parquetTableMetadata, Path p) throws IOException {
JsonFactory jsonFactory = new JsonFactory();
jsonFactory.configure(Feature.AUTO_CLOSE_TARGET, false);
jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
ObjectMapper mapper = new ObjectMapper(jsonFactory);
SimpleModule module = new SimpleModule();
module.addSerializer(ColumnMetadata_v3.class, new ColumnMetadata_v3.Serializer());
mapper.registerModule(module);

// If multiple clients are updating metadata cache file concurrently, the cache file
// can get corrupted. To prevent this, write to a unique temporary file and then do
// atomic rename.
Path tmpPath = new Path(path, METADATA_FILENAME + "." + tmpUUID);
FSDataOutputStream os = fs.create(tmpPath);
FSDataOutputStream os = fs.create(p);
mapper.writerWithDefaultPrettyPrinter().writeValue(os, parquetTableMetadata);
os.flush();
os.close();

Path finalPath = new Path(path, METADATA_FILENAME);
renameFile(tmpPath, finalPath);
}

private void writeFile(ParquetTableMetadataDirs parquetTableMetadataDirs, String path, UUID tmpUUID) throws IOException {
private void writeFile(ParquetTableMetadataDirs parquetTableMetadataDirs, Path p) throws IOException {
JsonFactory jsonFactory = new JsonFactory();
jsonFactory.configure(Feature.AUTO_CLOSE_TARGET, false);
jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
ObjectMapper mapper = new ObjectMapper(jsonFactory);
SimpleModule module = new SimpleModule();
mapper.registerModule(module);

// If multiple clients are updating metadata cache file concurrently, the cache file
// can get corrupted. To prevent this, write to a unique temporary file and then do
// atomic rename.
Path tmpPath = new Path(path, METADATA_DIRECTORIES_FILENAME + "." + tmpUUID);
FSDataOutputStream os = fs.create(tmpPath);
FSDataOutputStream os = fs.create(p);
mapper.writerWithDefaultPrettyPrinter().writeValue(os, parquetTableMetadataDirs);
os.flush();
os.close();

Path finalPath = new Path(path, METADATA_DIRECTORIES_FILENAME);
renameFile(tmpPath, finalPath);
}

/**
Expand Down Expand Up @@ -612,21 +565,17 @@ private void readBlockMeta(String path,
logger.info("Took {} ms to read directories from directory cache file", timer.elapsed(TimeUnit.MILLISECONDS));
timer.stop();
if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), p, parentDir, metaContext)) {
// Do not remove scheme and authority from the path passed to createMetaFilesRecursively
// as we need full path to obtain proper fileContext in writeFile
parquetTableMetadataDirs =
(createMetaFilesRecursively(p.getParent().toString())).getRight();
(createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString())).getRight();
newMetadata = true;
}
} else {
parquetTableMetadata = mapper.readValue(is, ParquetTableMetadataBase.class);
logger.info("Took {} ms to read metadata from cache file", timer.elapsed(TimeUnit.MILLISECONDS));
timer.stop();
if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), p, parentDir, metaContext)) {
// Do not remove scheme and authority from the path passed to createMetaFilesRecursively
// as we need full path to obtain proper fileContext in writeFile
parquetTableMetadata =
(createMetaFilesRecursively(p.getParent().toString())).getLeft();
(createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString())).getLeft();
newMetadata = true;
}

Expand Down

0 comments on commit 167f0ce

Please sign in to comment.