From 712020f4b296115538889ed3e00631b02e894ab3 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Tue, 26 Jan 2016 03:03:21 +0900 Subject: [PATCH] TAJO-2063: Refactor FileTablespace::commitOutputData into well-defined methods. --- .../apache/tajo/storage/FileTablespace.java | 290 +++++++++--------- 1 file changed, 152 insertions(+), 138 deletions(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index e50d587a2b..77e773d39f 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -792,174 +792,188 @@ public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc protected Path commitOutputData(OverridableConf queryContext, boolean changeFileSeq) throws IOException { Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR)); Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); - Path finalOutputDir; + + Path finalOutputDir = null; if (!queryContext.get(QueryVars.OUTPUT_TABLE_URI, "").isEmpty()) { finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_URI)); - try { - FileSystem fs = stagingResultDir.getFileSystem(conf); - if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO + boolean hasPartition = !queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty() ? true : false; - // It moves the original table into the temporary location. - // Then it moves the new result table into the original table location. - // Upon failed, it recovers the original table if possible. - boolean movedToOldTable = false; - boolean committed = false; + try { + if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); - ContentSummary summary = fs.getContentSummary(stagingResultDir); // When inserting empty data into a partitioned table, check if keep existing data need to be remove or not. boolean overwriteEnabled = queryContext.getBool(SessionVars.PARTITION_NO_RESULT_OVERWRITE_ENABLED); + if (hasPartition && !overwriteEnabled) { + commitInsertOverwriteWihProtectivePartition(stagingResultDir, finalOutputDir, oldTableDir); + } else { + commitInsertOverwrite(stagingResultDir, finalOutputDir, oldTableDir); + } + } else { + String queryType = queryContext.get(QueryVars.COMMAND_TYPE); + Preconditions.checkNotNull(queryContext); + if (queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table + commitInsert(stagingResultDir, finalOutputDir, hasPartition, changeFileSeq); + } else if (queryType.equals(NodeType.CREATE_TABLE.name())){ // CREATE TABLE AS SELECT (CTAS) + commitCreate(stagingResultDir, finalOutputDir); + } else { + throw new IOException("Cannot handle query type:" + queryType); + } + } + // remove the staging directory if the final output dir is given. + Path stagingDirRoot = stagingDir.getParent(); + fs.delete(stagingDirRoot, true); + } catch (Throwable t) { + LOG.error(t); + throw new IOException(t); + } + } else { + finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + } + return finalOutputDir; + } - // If existing data doesn't need to keep, check if there are some files. - if ( (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) - && (!overwriteEnabled || (overwriteEnabled && summary.getFileCount() > 0L))) { - // This is a map for existing non-leaf directory to rename. A key is current directory and a value is - // renaming directory. - Map renameDirs = new HashMap<>(); - // This is a map for recovering existing partition directory. A key is current directory and a value is - // temporary directory to back up. - Map recoveryDirs = new HashMap<>(); - - try { - if (!fs.exists(finalOutputDir)) { - fs.mkdirs(finalOutputDir); - } - - visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(), - renameDirs, oldTableDir); - - // Rename target partition directories - for(Map.Entry entry : renameDirs.entrySet()) { - // Backup existing data files for recovering - if (fs.exists(entry.getValue())) { - String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(), - oldTableDir.toString()); - Path recoveryPath = new Path(recoveryPathString); - fs.rename(entry.getValue(), recoveryPath); - fs.exists(recoveryPath); - recoveryDirs.put(entry.getValue(), recoveryPath); - } - // Delete existing directory - fs.delete(entry.getValue(), true); - // Rename staging directory to final output directory - fs.rename(entry.getKey(), entry.getValue()); - } + private void commitInsertOverwriteWihProtectivePartition(Path stagingResultDir, Path finalOutputDir, + Path oldTableDir) throws IOException { + // This is a map for existing non-leaf directory to rename. A key is current directory and a value is + // renaming directory. + Map renameDirs = new HashMap<>(); + // This is a map for recovering existing partition directory. A key is current directory and a value is + // temporary directory to back up. + Map recoveryDirs = new HashMap<>(); - } catch (IOException ioe) { - // Remove created dirs - for(Map.Entry entry : renameDirs.entrySet()) { - fs.delete(entry.getValue(), true); - } + try { + if (!fs.exists(finalOutputDir)) { + fs.mkdirs(finalOutputDir); + } - // Recovery renamed dirs - for(Map.Entry entry : recoveryDirs.entrySet()) { - fs.delete(entry.getValue(), true); - fs.rename(entry.getValue(), entry.getKey()); - } + visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(), + renameDirs, oldTableDir); - throw new IOException(ioe.getMessage()); - } - } else { // no partition - try { + // Rename target partition directories + for(Map.Entry entry : renameDirs.entrySet()) { + // Backup existing data files for recovering + if (fs.exists(entry.getValue())) { + String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(), + oldTableDir.toString()); + Path recoveryPath = new Path(recoveryPathString); + fs.rename(entry.getValue(), recoveryPath); + fs.exists(recoveryPath); + recoveryDirs.put(entry.getValue(), recoveryPath); + } + // Delete existing directory + fs.delete(entry.getValue(), true); + // Rename staging directory to final output directory + fs.rename(entry.getKey(), entry.getValue()); + } - // if the final output dir exists, move all contents to the temporary table dir. - // Otherwise, just make the final output dir. As a result, the final output dir will be empty. - if (fs.exists(finalOutputDir)) { - fs.mkdirs(oldTableDir); + } catch (IOException ioe) { + // Remove created dirs + for(Map.Entry entry : renameDirs.entrySet()) { + fs.delete(entry.getValue(), true); + } - for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) { - fs.rename(status.getPath(), oldTableDir); - } + // Recovery renamed dirs + for(Map.Entry entry : recoveryDirs.entrySet()) { + fs.delete(entry.getValue(), true); + fs.rename(entry.getValue(), entry.getKey()); + } - movedToOldTable = fs.exists(oldTableDir); - } else { // if the parent does not exist, make its parent directory. - fs.mkdirs(finalOutputDir); - } + throw new IOException(ioe.getMessage()); + } + } - // Move the results to the final output dir. - for (FileStatus status : fs.listStatus(stagingResultDir)) { - fs.rename(status.getPath(), finalOutputDir); - } + private void commitInsertOverwrite(Path stagingResultDir, Path finalOutputDir, Path oldTableDir) throws IOException { + // It moves the original table into the temporary location. + // Then it moves the new result table into the original table location. + // Upon failed, it recovers the original table if possible. + boolean movedToOldTable = false; + boolean committed = false; - // Check the final output dir - committed = fs.exists(finalOutputDir); + try { + // if the final output dir exists, move all contents to the temporary table dir. + // Otherwise, just make the final output dir. As a result, the final output dir will be empty. + if (fs.exists(finalOutputDir)) { + fs.mkdirs(oldTableDir); - } catch (IOException ioe) { - // recover the old table - if (movedToOldTable && !committed) { + for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) { + fs.rename(status.getPath(), oldTableDir); + } - // if commit is failed, recover the old data - for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) { - fs.delete(status.getPath(), true); - } + movedToOldTable = fs.exists(oldTableDir); + } else { // if the parent does not exist, make its parent directory. + fs.mkdirs(finalOutputDir); + } - for (FileStatus status : fs.listStatus(oldTableDir)) { - fs.rename(status.getPath(), finalOutputDir); - } - } + // Move the results to the final output dir. + for (FileStatus status : fs.listStatus(stagingResultDir)) { + fs.rename(status.getPath(), finalOutputDir); + } - throw new IOException(ioe.getMessage()); - } - } - } else { - String queryType = queryContext.get(QueryVars.COMMAND_TYPE); + // Check the final output dir + committed = fs.exists(finalOutputDir); - if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table + } catch (IOException ioe) { + // recover the old table + if (movedToOldTable && !committed) { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(3); + // if commit is failed, recover the old data + for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) { + fs.delete(status.getPath(), true); + } - if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) { - for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { - if (eachFile.isFile()) { - LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath()); - continue; - } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq); - } - } else { - int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; - for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { - if (eachFile.getPath().getName().startsWith("_")) { - continue; - } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq); - } - } - // checking all file moved and remove empty dir - verifyAllFileMoved(fs, stagingResultDir); - FileStatus[] files = fs.listStatus(stagingResultDir); - if (files != null && files.length != 0) { - for (FileStatus eachFile: files) { - LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); - } - } - } else { // CREATE TABLE AS SELECT (CTAS) - if (fs.exists(finalOutputDir)) { - for (FileStatus status : fs.listStatus(stagingResultDir)) { - fs.rename(status.getPath(), finalOutputDir); - } - } else { - fs.rename(stagingResultDir, finalOutputDir); - } - LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); - } + for (FileStatus status : fs.listStatus(oldTableDir)) { + fs.rename(status.getPath(), finalOutputDir); } + } - // remove the staging directory if the final output dir is given. - Path stagingDirRoot = stagingDir.getParent(); - fs.delete(stagingDirRoot, true); - } catch (Throwable t) { - LOG.error(t); - throw new IOException(t); + throw new IOException(ioe.getMessage()); + } + } + + private void commitInsert(Path stagingResultDir, Path finalOutputDir, boolean hasPartition, boolean changeFileSeq) + throws IOException { + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(3); + + if (hasPartition) { + for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { + if (eachFile.isFile()) { + LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath()); + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq); } } else { - finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; + for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { + if (eachFile.getPath().getName().startsWith("_")) { + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq); + } + } + // checking all file moved and remove empty dir + verifyAllFileMoved(fs, stagingResultDir); + FileStatus[] files = fs.listStatus(stagingResultDir); + if (files != null && files.length != 0) { + for (FileStatus eachFile: files) { + LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); + } } + } - return finalOutputDir; + private void commitCreate(Path stagingResultDir, Path finalOutputDir) throws IOException { + if (fs.exists(finalOutputDir)) { + for (FileStatus status : fs.listStatus(stagingResultDir)) { + fs.rename(status.getPath(), finalOutputDir); + } + } else { + fs.rename(stagingResultDir, finalOutputDir); + } + LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); } /**