From de494e1aedece311ba5a53ba89895aa0d8fe29ee Mon Sep 17 00:00:00 2001 From: Sourabh Badhya Date: Mon, 29 Apr 2024 10:15:28 +0530 Subject: [PATCH] HIVE-28258: Use Iceberg semantics for Merge task --- .../iceberg/mr/hive/FilesForCommit.java | 16 +- .../mr/hive/HiveIcebergInputFormat.java | 12 +- .../mr/hive/HiveIcebergOutputCommitter.java | 177 +++++++-- .../mr/hive/HiveIcebergStorageHandler.java | 43 +-- .../mr/hive/IcebergMergeTaskProperties.java | 36 +- .../iceberg/mr/hive/IcebergTableUtil.java | 1 + .../AbstractMapredIcebergRecordReader.java | 4 +- .../mr/mapred/MapredIcebergInputFormat.java | 14 +- .../AbstractIcebergRecordReader.java | 156 ++++++++ .../mr/mapreduce/IcebergInputFormat.java | 353 +----------------- .../mapreduce/IcebergMergeRecordReader.java | 169 +++++++++ .../mr/mapreduce/IcebergMergeSplit.java | 79 ++++ .../mr/mapreduce/IcebergRecordReader.java | 308 +++++++++++++++ .../iceberg/mr/mapreduce/IcebergSplit.java | 2 +- .../mr/mapreduce/IcebergSplitContainer.java | 4 +- .../hive/ql/io/CombineHiveInputFormat.java | 18 +- .../hive/ql/io/CombineHiveRecordReader.java | 31 +- .../plan/ConditionalResolverMergeFiles.java | 23 +- .../apache/hadoop/hive/ql/plan/MapWork.java | 22 +- .../hive/ql/plan/MergeTaskProperties.java | 8 +- 20 files changed, 985 insertions(+), 491 deletions(-) create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/AbstractIcebergRecordReader.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeRecordReader.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeSplit.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java index 2e25f5a8c2e6..1ca3872b424e 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; @@ -35,27 +36,31 @@ public class FilesForCommit implements Serializable { private final Collection deleteFiles; private final Collection replacedDataFiles; private final Collection referencedDataFiles; + private final Collection mergedAndDeletedFiles; public FilesForCommit(Collection dataFiles, Collection deleteFiles) { this(dataFiles, deleteFiles, Collections.emptyList()); } public FilesForCommit(Collection dataFiles, Collection deleteFiles, - Collection replacedDataFiles, Collection referencedDataFiles) { + Collection replacedDataFiles, Collection referencedDataFiles, + Collection mergedAndDeletedFiles) { this.dataFiles = dataFiles; this.deleteFiles = deleteFiles; this.replacedDataFiles = replacedDataFiles; this.referencedDataFiles = referencedDataFiles; + this.mergedAndDeletedFiles = mergedAndDeletedFiles; } public FilesForCommit(Collection dataFiles, Collection deleteFiles, Collection replacedDataFiles) { - this(dataFiles, deleteFiles, replacedDataFiles, Collections.emptySet()); + this(dataFiles, deleteFiles, replacedDataFiles, Collections.emptySet(), Collections.emptySet()); } public static FilesForCommit onlyDelete(Collection deleteFiles, Collection referencedDataFiles) { - return new FilesForCommit(Collections.emptyList(), deleteFiles, Collections.emptyList(), referencedDataFiles); + return new FilesForCommit(Collections.emptyList(), deleteFiles, Collections.emptyList(), + referencedDataFiles, Collections.emptySet()); } public static FilesForCommit onlyData(Collection dataFiles) { @@ -86,6 +91,10 @@ public Collection referencedDataFiles() { return referencedDataFiles; } + public Collection mergedAndDeletedFiles() { + return mergedAndDeletedFiles; + } + public Collection allFiles() { return Stream.concat(dataFiles.stream(), deleteFiles.stream()).collect(Collectors.toList()); } @@ -101,6 +110,7 @@ public String toString() { .add("deleteFiles", deleteFiles.toString()) .add("replacedDataFiles", replacedDataFiles.toString()) .add("referencedDataFiles", referencedDataFiles.toString()) + .add("mergedAndDeletedFiles", mergedAndDeletedFiles.toString()) .toString(); } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java index 1ea78eeba54e..98fd4b30163f 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.TableName; @@ -39,6 +40,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; @@ -56,6 +58,7 @@ import org.apache.iceberg.mr.mapred.Container; import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat; import org.apache.iceberg.mr.mapreduce.IcebergInputFormat; +import org.apache.iceberg.mr.mapreduce.IcebergMergeSplit; import org.apache.iceberg.mr.mapreduce.IcebergSplit; import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -64,7 +67,7 @@ import org.slf4j.LoggerFactory; public class HiveIcebergInputFormat extends MapredIcebergInputFormat - implements CombineHiveInputFormat.AvoidSplitCombination, VectorizedInputFormatInterface, + implements CombineHiveInputFormat.MergeSplits, VectorizedInputFormatInterface, LlapCacheOnlyInputFormatInterface.VectorizedOnly { private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergInputFormat.class); @@ -225,4 +228,11 @@ public static String getVectorizationConfName(String tableName) { String dbAndTableName = TableName.fromString(tableName, null, null).getNotEmptyDbTable(); return ICEBERG_DISABLE_VECTORIZATION_PREFIX + dbAndTableName; } + + @Override + public FileSplit createMergeSplit(Configuration conf, + CombineHiveInputFormat.CombineHiveInputSplit split, + Integer partition, Properties properties) throws IOException { + return new IcebergMergeSplit(conf, split, partition, properties); + } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index d9f3116ff848..cb61d545b3df 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -52,12 +52,14 @@ import org.apache.hadoop.hive.ql.session.SessionStateUtil; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobContextImpl; import org.apache.hadoop.mapred.OutputCommitter; import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.DeleteFiles; @@ -83,6 +85,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.iceberg.util.Tasks; @@ -97,6 +100,11 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { private static final String FOR_COMMIT_EXTENSION = ".forCommit"; private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergOutputCommitter.class); + private static final HiveIcebergOutputCommitter OUTPUT_COMMITTER = new HiveIcebergOutputCommitter(); + + public static HiveIcebergOutputCommitter getInstance() { + return OUTPUT_COMMITTER; + } @Override public void setupJob(JobContext jobContext) { @@ -126,6 +134,7 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException { TaskAttemptID attemptID = context.getTaskAttemptID(); JobConf jobConf = context.getJobConf(); + Set mergedPaths = getCombinedLocations(jobConf); Set outputs = HiveIcebergStorageHandler.outputTables(context.getJobConf()); Map> writers = Optional.ofNullable(WriterRegistry.writers(attemptID)) .orElseGet(() -> { @@ -158,7 +167,8 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException { replacedDataFiles.addAll(files.replacedDataFiles()); referencedDataFiles.addAll(files.referencedDataFiles()); } - createFileForCommit(new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles), + createFileForCommit(new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles, + mergedPaths), fileForCommitLocation, table.io()); } else { LOG.info("CommitTask found no writer for specific table: {}, attemptID: {}", output, attemptID); @@ -170,8 +180,6 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException { LOG.info("CommitTask found no serialized table in config for table: {}.", output); } }, IOException.class); - - cleanMergeTaskInputFiles(jobConf, tableExecutor, context); } finally { if (tableExecutor != null) { tableExecutor.shutdown(); @@ -281,6 +289,9 @@ public void commitJobs(List originalContextList, Operation operation .collect(Collectors.toList())); commitTable(table.io(), fileExecutor, output, operation); }); + + // Cleanup any merge input files. + cleanMergeTaskInputFiles(jobContextList, tableExecutor); } finally { fileExecutor.shutdown(); if (tableExecutor != null) { @@ -423,6 +434,7 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output List deleteFiles = Lists.newArrayList(); List replacedDataFiles = Lists.newArrayList(); Set referencedDataFiles = Sets.newHashSet(); + Set mergedAndDeletedFiles = Sets.newHashSet(); Table table = null; String branchName = null; @@ -459,9 +471,14 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output deleteFiles.addAll(writeResults.deleteFiles()); replacedDataFiles.addAll(writeResults.replacedDataFiles()); referencedDataFiles.addAll(writeResults.referencedDataFiles()); + mergedAndDeletedFiles.addAll(writeResults.mergedAndDeletedFiles()); } - FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles); + dataFiles.removeIf(dataFile -> mergedAndDeletedFiles.contains(new Path(String.valueOf(dataFile.path())))); + deleteFiles.removeIf(deleteFile -> mergedAndDeletedFiles.contains(new Path(String.valueOf(deleteFile.path())))); + + FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles, + Collections.emptySet()); long startTime = System.currentTimeMillis(); if (Operation.IOW != operation) { @@ -687,6 +704,7 @@ private static FilesForCommit collectResults(int numTasks, ExecutorService execu Collection deleteFiles = new ConcurrentLinkedQueue<>(); Collection replacedDataFiles = new ConcurrentLinkedQueue<>(); Collection referencedDataFiles = new ConcurrentLinkedQueue<>(); + Collection mergedAndDeletedFiles = new ConcurrentLinkedQueue<>(); Tasks.range(numTasks) .throwFailureWhenFinished(throwOnFailure) .executeWith(executor) @@ -698,9 +716,10 @@ private static FilesForCommit collectResults(int numTasks, ExecutorService execu deleteFiles.addAll(files.deleteFiles()); replacedDataFiles.addAll(files.replacedDataFiles()); referencedDataFiles.addAll(files.referencedDataFiles()); + mergedAndDeletedFiles.addAll(files.mergedAndDeletedFiles()); }); - return new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles); + return new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles, mergedAndDeletedFiles); } /** @@ -747,11 +766,18 @@ private static FilesForCommit readFileForCommit(String fileForCommitLocation, Fi } } + /** + * Generates a list of file statuses of the output files in the jobContexts. + * @param jobContexts List of jobContexts + * @return Returns the list of file statuses of the output files in the jobContexts + * @throws IOException Throws IOException + */ public List getOutputFiles(List jobContexts) throws IOException { List outputs = collectOutputs(jobContexts); ExecutorService fileExecutor = fileExecutor(jobContexts.get(0).getJobConf()); ExecutorService tableExecutor = tableExecutor(jobContexts.get(0).getJobConf(), outputs.size()); - Collection dataFiles = new ConcurrentLinkedQueue<>(); + Map> parentDirToDataFile = Maps.newConcurrentMap(); + Map> parentDirToDeleteFile = Maps.newConcurrentMap(); try { Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream() .map(jobContext -> new SimpleImmutableEntry<>(kv.table, jobContext)))) @@ -773,8 +799,15 @@ public List getOutputFiles(List jobContexts) throws IOEx FilesForCommit results = collectResults(numTasks, fileExecutor, table.location(), jobContext, table.io(), false); for (DataFile dataFile : results.dataFiles()) { - FileStatus fileStatus = fileSystem.getFileStatus(new Path(dataFile.path().toString())); - dataFiles.add(fileStatus); + Path filePath = new Path(dataFile.path().toString()); + FileStatus fileStatus = fileSystem.getFileStatus(filePath); + parentDirToDataFile.computeIfAbsent(filePath.getParent(), k -> Lists.newArrayList()).add(fileStatus); + } + for (DeleteFile deleteFile : results.deleteFiles()) { + Path filePath = new Path(deleteFile.path().toString()); + FileStatus fileStatus = fileSystem.getFileStatus(filePath); + parentDirToDeleteFile.computeIfAbsent(filePath.getParent(), + k -> Lists.newArrayList()).add(fileStatus); } }, IOException.class); } finally { @@ -783,28 +816,126 @@ public List getOutputFiles(List jobContexts) throws IOEx tableExecutor.shutdown(); } } - return Lists.newArrayList(dataFiles); + List dataFiles = Lists.newArrayList(); + dataFiles.addAll(parentDirToDataFile.values().stream() + .flatMap(List::stream).collect(Collectors.toList())); + dataFiles.addAll(parentDirToDeleteFile.values().stream() + .flatMap(List::stream).collect(Collectors.toList())); + return dataFiles; } - private void cleanMergeTaskInputFiles(JobConf jobConf, - ExecutorService tableExecutor, - TaskAttemptContext context) throws IOException { + /** + * Generates a list of ContentFile objects of the output files in the jobContexts. + * @param jobContexts List of jobContexts + * @return Returns the list of file statuses of the output files in the jobContexts + * @throws IOException Throws IOException + */ + public List getOutputContentFiles(List jobContexts) throws IOException { + List outputs = collectOutputs(jobContexts); + ExecutorService fileExecutor = fileExecutor(jobContexts.get(0).getJobConf()); + ExecutorService tableExecutor = tableExecutor(jobContexts.get(0).getJobConf(), outputs.size()); + Collection files = new ConcurrentLinkedQueue<>(); + try { + Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream() + .map(jobContext -> new SimpleImmutableEntry<>(kv.table, jobContext)))) + .suppressFailureWhenFinished() + .executeWith(tableExecutor) + .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge input file for the table {}", output, exc)) + .run(output -> { + JobContext jobContext = output.getValue(); + JobConf jobConf = jobContext.getJobConf(); + LOG.info("Cleaning job for jobID: {}, table: {}", jobContext.getJobID(), output); + + Table table = output.getKey(); + FileSystem fileSystem = new Path(table.location()).getFileSystem(jobConf); + String jobLocation = generateJobLocation(table.location(), jobConf, jobContext.getJobID()); + // list jobLocation to get number of forCommit files + // we do this because map/reduce num in jobConf is unreliable + // and we have no access to vertex status info + int numTasks = listForCommits(jobConf, jobLocation).size(); + FilesForCommit results = collectResults(numTasks, fileExecutor, table.location(), jobContext, + table.io(), false); + files.addAll(results.dataFiles()); + files.addAll(results.deleteFiles()); + }, IOException.class); + } finally { + fileExecutor.shutdown(); + if (tableExecutor != null) { + tableExecutor.shutdown(); + } + } + return Lists.newArrayList(files); + } + + private void cleanMergeTaskInputFiles(List jobContexts, + ExecutorService tableExecutor) throws IOException { // Merge task has merged several files into one. Hence we need to remove the stale files. // At this stage the file is written and task-committed, but the old files are still present. + for (JobContext jobContext : jobContexts) { + JobConf jobConf = jobContext.getJobConf(); + if (jobConf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class)) { + MapWork mrwork = Utilities.getMapWork(jobConf); + if (mrwork != null) { + List mergedPaths = mrwork.getInputPaths(); + if (mergedPaths != null) { + Tasks.foreach(mergedPaths) + .retry(3) + .executeWith(tableExecutor) + .run(path -> { + FileSystem fs = path.getFileSystem(jobConf); + if (fs.exists(path)) { + fs.delete(path, true); + } + }, IOException.class); + } + } + } + } + } + + /** + * Generates {@link JobContext}s for the OutputCommitter for the specific table. + * @param configuration The configuration used for as a base of the JobConf + * @param tableName The name of the table we are planning to commit + * @param branchName the name of the branch + * @return The generated Optional JobContext list or empty if not presents. + */ + static List generateJobContext(Configuration configuration, String tableName, + String branchName) { + JobConf jobConf = new JobConf(configuration); + Optional> commitInfoMap = + SessionStateUtil.getCommitInfo(jobConf, tableName); + if (commitInfoMap.isPresent()) { + List jobContextList = Lists.newLinkedList(); + for (SessionStateUtil.CommitInfo commitInfo : commitInfoMap.get().values()) { + org.apache.hadoop.mapred.JobID jobID = org.apache.hadoop.mapred.JobID.forName(commitInfo.getJobIdStr()); + commitInfo.getProps().forEach(jobConf::set); + + // we should only commit this current table because + // for multi-table inserts, this hook method will be called sequentially for each target table + jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName); + if (branchName != null) { + jobConf.set(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF, branchName); + } + + jobContextList.add(new JobContextImpl(jobConf, jobID, null)); + } + return jobContextList; + } else { + // most likely empty write scenario + LOG.debug("Unable to find commit information in query state for table: {}", tableName); + return Collections.emptyList(); + } + } + + private Set getCombinedLocations(JobConf jobConf) { + Set mergedPaths = Sets.newHashSet(); if (jobConf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class)) { MapWork mrwork = Utilities.getMapWork(jobConf); - if (mrwork != null) { - List mergedPaths = mrwork.getInputPaths(); - if (mergedPaths != null) { - Tasks.foreach(mergedPaths) - .retry(3) - .executeWith(tableExecutor) - .run(path -> { - FileSystem fs = path.getFileSystem(context.getJobConf()); - fs.delete(path, true); - }, IOException.class); - } + if (mrwork != null && mrwork.getInputPaths() != null) { + mergedPaths.addAll(mrwork.getInputPaths()); } } + return mergedPaths; } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 3157fbb04110..7ccb4bfb3bfa 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -132,8 +132,6 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContext; -import org.apache.hadoop.mapred.JobContextImpl; -import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; @@ -763,7 +761,8 @@ public void storageHandlerCommit(Properties commitProperties, Operation operatio if (location != null) { HiveTableUtil.cleanupTableObjectFile(location, configuration); } - List jobContextList = generateJobContext(configuration, tableName, snapshotRef); + List jobContextList = HiveIcebergOutputCommitter + .generateJobContext(configuration, tableName, snapshotRef); if (jobContextList.isEmpty()) { return; } @@ -1569,41 +1568,6 @@ private static boolean hasParquetNestedTypeWithinListOrMap(Properties tableProps return true; } - /** - * Generates {@link JobContext}s for the OutputCommitter for the specific table. - * @param configuration The configuration used for as a base of the JobConf - * @param tableName The name of the table we are planning to commit - * @param branchName the name of the branch - * @return The generated Optional JobContext list or empty if not presents. - */ - private List generateJobContext(Configuration configuration, String tableName, - String branchName) { - JobConf jobConf = new JobConf(configuration); - Optional> commitInfoMap = - SessionStateUtil.getCommitInfo(jobConf, tableName); - if (commitInfoMap.isPresent()) { - List jobContextList = Lists.newLinkedList(); - for (SessionStateUtil.CommitInfo commitInfo : commitInfoMap.get().values()) { - JobID jobID = JobID.forName(commitInfo.getJobIdStr()); - commitInfo.getProps().forEach(jobConf::set); - - // we should only commit this current table because - // for multi-table inserts, this hook method will be called sequentially for each target table - jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName); - if (branchName != null) { - jobConf.set(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF, branchName); - } - - jobContextList.add(new JobContextImpl(jobConf, jobID, null)); - } - return jobContextList; - } else { - // most likely empty write scenario - LOG.debug("Unable to find commit information in query state for table: {}", tableName); - return Collections.emptyList(); - } - } - private String getOperationType() { return SessionStateUtil.getProperty(conf, Operation.class.getSimpleName()) .orElse(Operation.OTHER.name()); @@ -2172,7 +2136,8 @@ public List getMergeTaskInputFiles(Properties properties) throws IOE String tableName = properties.getProperty(Catalogs.NAME); String snapshotRef = properties.getProperty(Catalogs.SNAPSHOT_REF); Configuration configuration = SessionState.getSessionConf(); - List originalContextList = generateJobContext(configuration, tableName, snapshotRef); + List originalContextList = HiveIcebergOutputCommitter + .generateJobContext(configuration, tableName, snapshotRef); List jobContextList = originalContextList.stream() .map(TezUtil::enrichContextWithVertexId) .collect(Collectors.toList()); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergMergeTaskProperties.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergMergeTaskProperties.java index ff47a801a5b5..e3e6df6ceba1 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergMergeTaskProperties.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergMergeTaskProperties.java @@ -19,19 +19,20 @@ package org.apache.iceberg.mr.hive; import java.io.IOException; +import java.util.List; import java.util.Properties; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor; -import org.apache.hadoop.hive.ql.io.StorageFormatFactory; import org.apache.hadoop.hive.ql.plan.MergeTaskProperties; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.TableProperties; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.mapred.JobContext; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.mr.Catalogs; public class IcebergMergeTaskProperties implements MergeTaskProperties { private final Properties properties; - private static final StorageFormatFactory storageFormatFactory = new StorageFormatFactory(); IcebergMergeTaskProperties(Properties properties) { this.properties = properties; @@ -42,14 +43,23 @@ public Path getTmpLocation() { return new Path(location + "/data/"); } - public StorageFormatDescriptor getStorageFormatDescriptor() throws IOException { - FileFormat fileFormat = FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT, - TableProperties.DEFAULT_FILE_FORMAT_DEFAULT)); - StorageFormatDescriptor descriptor = storageFormatFactory.get(fileFormat.name()); - if (descriptor == null) { - throw new IOException("Unsupported storage format descriptor"); + @Override + public Properties getSplitProperties() throws IOException { + String tableName = properties.getProperty(Catalogs.NAME); + String snapshotRef = properties.getProperty(Catalogs.SNAPSHOT_REF); + Configuration configuration = SessionState.getSessionConf(); + List originalContextList = HiveIcebergOutputCommitter + .generateJobContext(configuration, tableName, snapshotRef); + List jobContextList = originalContextList.stream() + .map(TezUtil::enrichContextWithVertexId) + .collect(Collectors.toList()); + if (jobContextList.isEmpty()) { + return null; } - return descriptor; + List contentFiles = HiveIcebergOutputCommitter.getInstance().getOutputContentFiles(jobContextList); + Properties pathToContentFile = new Properties(); + contentFiles.forEach(contentFile -> + pathToContentFile.put(new Path(String.valueOf(contentFile.path())), contentFile)); + return pathToContentFile; } - } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index 79454f45ee87..653372d7988f 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -344,4 +344,5 @@ public static PartitionData toPartitionData(StructLike key, Types.StructType key } return data; } + } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java index 92e9c4a688dc..2e892900d029 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java @@ -23,8 +23,8 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.iceberg.mr.mapreduce.IcebergSplit; @SuppressWarnings("checkstyle:VisibilityModifier") public abstract class AbstractMapredIcebergRecordReader implements RecordReader { @@ -32,7 +32,7 @@ public abstract class AbstractMapredIcebergRecordReader implements RecordRead protected final org.apache.hadoop.mapreduce.RecordReader innerReader; public AbstractMapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat mapreduceInputFormat, - IcebergSplit split, JobConf job, Reporter reporter) throws IOException { + InputSplit split, JobConf job, Reporter reporter) throws IOException { TaskAttemptContext context = MapredIcebergInputFormat.newTaskAttemptContext(job, reporter); try { diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java index 822ce52a7e46..473e8fe1486e 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java @@ -37,7 +37,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.iceberg.data.Record; import org.apache.iceberg.mr.InputFormatConfig; -import org.apache.iceberg.mr.mapreduce.IcebergSplit; +import org.apache.iceberg.mr.mapreduce.IcebergMergeSplit; import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer; /** @@ -75,8 +75,13 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { @Override public RecordReader> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { - IcebergSplit icebergSplit = ((IcebergSplitContainer) split).icebergSplit(); - return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit, job, reporter); + try { + org.apache.hadoop.mapreduce.InputSplit inputSplit = split instanceof IcebergMergeSplit ? + (IcebergMergeSplit) split : ((IcebergSplitContainer) split).icebergSplit(); + return new MapredIcebergRecordReader<>(innerInputFormat, inputSplit, job, reporter); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } @@ -85,7 +90,8 @@ private static final class MapredIcebergRecordReader extends AbstractMapredIc private final long splitLength; // for getPos() MapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat mapreduceInputFormat, - IcebergSplit split, JobConf job, Reporter reporter) throws IOException { + org.apache.hadoop.mapreduce.InputSplit split, JobConf job, Reporter reporter) + throws IOException, InterruptedException { super(mapreduceInputFormat, split, job, reporter); splitLength = split.getLength(); } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/AbstractIcebergRecordReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/AbstractIcebergRecordReader.java new file mode 100644 index 000000000000..53ee0d870f8b --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/AbstractIcebergRecordReader.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.InternalRecordWrapper; +import org.apache.iceberg.expressions.Evaluator; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler; +import org.apache.iceberg.mr.hive.IcebergAcidUtil; + +public abstract class AbstractIcebergRecordReader extends RecordReader { + + private TaskAttemptContext context; + private Configuration conf; + private Table table; + private Schema expectedSchema; + private String nameMapping; + private boolean reuseContainers; + private boolean caseSensitive; + private InputFormatConfig.InMemoryDataModel inMemoryDataModel; + private boolean fetchVirtualColumns; + + @Override + public void initialize(InputSplit split, TaskAttemptContext newContext) { + // For now IcebergInputFormat does its own split planning and does not accept FileSplit instances + this.context = newContext; + this.conf = newContext.getConfiguration(); + this.table = HiveIcebergStorageHandler.table(conf, conf.get(InputFormatConfig.TABLE_IDENTIFIER)); + HiveIcebergStorageHandler.checkAndSetIoConfig(conf, table); + this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); + this.caseSensitive = conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT); + this.expectedSchema = readSchema(conf, table, caseSensitive); + this.reuseContainers = conf.getBoolean(InputFormatConfig.REUSE_CONTAINERS, false); + this.inMemoryDataModel = conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL, + InputFormatConfig.InMemoryDataModel.GENERIC); + this.fetchVirtualColumns = InputFormatConfig.fetchVirtualColumns(conf); + } + + private static Schema readSchema(Configuration conf, Table table, boolean caseSensitive) { + Schema readSchema = InputFormatConfig.readSchema(conf); + + if (readSchema != null) { + return readSchema; + } + + String[] selectedColumns = InputFormatConfig.selectedColumns(conf); + readSchema = table.schema(); + + if (selectedColumns != null) { + readSchema = + caseSensitive ? readSchema.select(selectedColumns) : readSchema.caseInsensitiveSelect(selectedColumns); + } + + if (InputFormatConfig.fetchVirtualColumns(conf)) { + return IcebergAcidUtil.createFileReadSchemaWithVirtualColums(readSchema.columns(), table); + } + + return readSchema; + } + + CloseableIterable applyResidualFiltering(CloseableIterable iter, Expression residual, + Schema readSchema) { + boolean applyResidual = !getContext().getConfiguration() + .getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false); + + if (applyResidual && residual != null && residual != Expressions.alwaysTrue()) { + // Date and timestamp values are not the correct type for Evaluator. + // Wrapping to return the expected type. + InternalRecordWrapper wrapper = new InternalRecordWrapper(readSchema.asStruct()); + Evaluator filter = new Evaluator(readSchema.asStruct(), residual, caseSensitive); + return CloseableIterable.filter(iter, record -> filter.eval(wrapper.wrap((StructLike) record))); + } else { + return iter; + } + } + + public TaskAttemptContext getContext() { + return context; + } + + public Configuration getConf() { + return conf; + } + + public boolean isReuseContainers() { + return reuseContainers; + } + + public Schema getExpectedSchema() { + return expectedSchema; + } + + public String getNameMapping() { + return nameMapping; + } + + public Table getTable() { + return table; + } + + public InputFormatConfig.InMemoryDataModel getInMemoryDataModel() { + return inMemoryDataModel; + } + + public boolean isFetchVirtualColumns() { + return fetchVirtualColumns; + } + + public boolean isCaseSensitive() { + return caseSensitive; + } + + @Override + public Void getCurrentKey() { + return null; + } + + @Override + public float getProgress() { + // TODO: We could give a more accurate progress based on records read from the file. Context.getProgress does not + // have enough information to give an accurate progress value. This isn't that easy, since we don't know how much + // of the input split has been processed and we are pushing filters into Parquet and ORC. But we do know when a + // file is opened and could count the number of rows returned, so we can estimate. And we could also add a row + // count to the readers so that we can get an accurate count of rows that have been either returned or filtered + // out. + return getContext().getProgress(); + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java index efb3988688d0..00e63740e28c 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java @@ -21,23 +21,15 @@ import java.io.IOException; import java.io.UncheckedIOException; -import java.util.Collections; -import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.function.BiFunction; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.LlapHiveUtils; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.AuthorizationException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -49,54 +41,23 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.iceberg.CombinedScanTask; -import org.apache.iceberg.DataFile; import org.apache.iceberg.DataTableScan; -import org.apache.iceberg.DataTask; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.IncrementalAppendScan; -import org.apache.iceberg.MetadataColumns; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Partitioning; import org.apache.iceberg.Scan; import org.apache.iceberg.Schema; import org.apache.iceberg.SnapshotRef; -import org.apache.iceberg.StructLike; import org.apache.iceberg.SystemConfigs; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; -import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.common.DynMethods; -import org.apache.iceberg.data.DeleteFilter; -import org.apache.iceberg.data.GenericDeleteFilter; -import org.apache.iceberg.data.IdentityPartitionConverters; -import org.apache.iceberg.data.InternalRecordWrapper; -import org.apache.iceberg.data.avro.DataReader; -import org.apache.iceberg.data.orc.GenericOrcReader; -import org.apache.iceberg.data.parquet.GenericParquetReaders; -import org.apache.iceberg.encryption.EncryptedFiles; -import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.hive.HiveVersion; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.CloseableIterator; -import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; -import org.apache.iceberg.mr.hive.HiveIcebergInputFormat; import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler; -import org.apache.iceberg.mr.hive.IcebergAcidUtil; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.PartitionUtil; import org.apache.iceberg.util.SerializationUtil; import org.apache.iceberg.util.ThreadPools; @@ -293,319 +254,7 @@ private static void checkResiduals(CombinedScanTask task) { @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { - return new IcebergRecordReader<>(); + return split instanceof IcebergMergeSplit ? new IcebergMergeRecordReader<>() : new IcebergRecordReader<>(); } - private static final class IcebergRecordReader extends RecordReader { - - private static final String HIVE_VECTORIZED_READER_CLASS = "org.apache.iceberg.mr.hive.vector.HiveVectorizedReader"; - private static final DynMethods.StaticMethod HIVE_VECTORIZED_READER_BUILDER; - - static { - if (HiveVersion.min(HiveVersion.HIVE_3)) { - HIVE_VECTORIZED_READER_BUILDER = DynMethods.builder("reader") - .impl(HIVE_VECTORIZED_READER_CLASS, - Table.class, - Path.class, - FileScanTask.class, - Map.class, - TaskAttemptContext.class, - Expression.class, - Schema.class) - .buildStatic(); - } else { - HIVE_VECTORIZED_READER_BUILDER = null; - } - } - - private TaskAttemptContext context; - private Configuration conf; - private Schema expectedSchema; - private String nameMapping; - private boolean reuseContainers; - private boolean caseSensitive; - private InputFormatConfig.InMemoryDataModel inMemoryDataModel; - private Iterator tasks; - private T current; - private CloseableIterator currentIterator; - private Table table; - private boolean fetchVirtualColumns; - - @Override - public void initialize(InputSplit split, TaskAttemptContext newContext) { - // For now IcebergInputFormat does its own split planning and does not accept FileSplit instances - CombinedScanTask task = ((IcebergSplit) split).task(); - this.context = newContext; - this.conf = newContext.getConfiguration(); - this.table = SerializationUtil.deserializeFromBase64( - conf.get(InputFormatConfig.SERIALIZED_TABLE_PREFIX + conf.get(InputFormatConfig.TABLE_IDENTIFIER))); - HiveIcebergStorageHandler.checkAndSetIoConfig(conf, table); - this.tasks = task.files().iterator(); - this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); - this.caseSensitive = conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT); - this.expectedSchema = readSchema(conf, table, caseSensitive); - this.reuseContainers = conf.getBoolean(InputFormatConfig.REUSE_CONTAINERS, false); - this.inMemoryDataModel = conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL, - InputFormatConfig.InMemoryDataModel.GENERIC); - this.fetchVirtualColumns = InputFormatConfig.fetchVirtualColumns(conf); - this.currentIterator = nextTask(); - } - - private CloseableIterator nextTask() { - CloseableIterator closeableIterator = open(tasks.next(), expectedSchema).iterator(); - if (!fetchVirtualColumns || Utilities.getIsVectorized(conf)) { - return closeableIterator; - } - return new IcebergAcidUtil.VirtualColumnAwareIterator(closeableIterator, expectedSchema, conf); - } - - @Override - public boolean nextKeyValue() throws IOException { - while (true) { - if (currentIterator.hasNext()) { - current = currentIterator.next(); - return true; - } else if (tasks.hasNext()) { - currentIterator.close(); - this.currentIterator = nextTask(); - } else { - currentIterator.close(); - return false; - } - } - } - - @Override - public Void getCurrentKey() { - return null; - } - - @Override - public T getCurrentValue() { - return current; - } - - @Override - public float getProgress() { - // TODO: We could give a more accurate progress based on records read from the file. Context.getProgress does not - // have enough information to give an accurate progress value. This isn't that easy, since we don't know how much - // of the input split has been processed and we are pushing filters into Parquet and ORC. But we do know when a - // file is opened and could count the number of rows returned, so we can estimate. And we could also add a row - // count to the readers so that we can get an accurate count of rows that have been either returned or filtered - // out. - return context.getProgress(); - } - - @Override - public void close() throws IOException { - currentIterator.close(); - } - - private CloseableIterable openVectorized(FileScanTask task, Schema readSchema) { - Preconditions.checkArgument(!task.file().format().equals(FileFormat.AVRO), - "Vectorized execution is not yet supported for Iceberg avro tables. " + - "Please turn off vectorization and retry the query."); - Preconditions.checkArgument(HiveVersion.min(HiveVersion.HIVE_3), - "Vectorized read is unsupported for Hive 2 integration."); - - Path path = new Path(task.file().path().toString()); - Map idToConstant = constantsMap(task, HiveIdentityPartitionConverters::convertConstant); - Expression residual = HiveIcebergInputFormat.residualForTask(task, context.getConfiguration()); - - // TODO: We have to take care of the EncryptionManager when LLAP and vectorization is used - CloseableIterable iterator = HIVE_VECTORIZED_READER_BUILDER.invoke(table, path, task, - idToConstant, context, residual, readSchema); - - return applyResidualFiltering(iterator, residual, readSchema); - } - - private CloseableIterable openGeneric(FileScanTask task, Schema readSchema) { - if (task.isDataTask()) { - // When querying metadata tables, the currentTask is a DataTask and the data has to - // be fetched from the task instead of reading it from files. - IcebergInternalRecordWrapper wrapper = - new IcebergInternalRecordWrapper(table.schema().asStruct(), readSchema.asStruct()); - return (CloseableIterable) CloseableIterable.transform(((DataTask) task).rows(), row -> wrapper.wrap(row)); - } - - DataFile file = task.file(); - InputFile inputFile = table.encryption().decrypt(EncryptedFiles.encryptedInput( - table.io().newInputFile(file.path().toString()), - file.keyMetadata())); - - CloseableIterable iterable; - switch (file.format()) { - case AVRO: - iterable = newAvroIterable(inputFile, task, readSchema); - break; - case ORC: - iterable = newOrcIterable(inputFile, task, readSchema); - break; - case PARQUET: - iterable = newParquetIterable(inputFile, task, readSchema); - break; - default: - throw new UnsupportedOperationException( - String.format("Cannot read %s file: %s", file.format().name(), file.path())); - } - - return iterable; - } - - @SuppressWarnings("unchecked") - private CloseableIterable open(FileScanTask currentTask, Schema readSchema) { - switch (inMemoryDataModel) { - case PIG: - // TODO: Support Pig and Hive object models for IcebergInputFormat - throw new UnsupportedOperationException("Pig and Hive object models are not supported."); - case HIVE: - return openVectorized(currentTask, readSchema); - case GENERIC: - DeleteFilter deletes = new GenericDeleteFilter(table.io(), currentTask, table.schema(), readSchema); - Schema requiredSchema = deletes.requiredSchema(); - return deletes.filter(openGeneric(currentTask, requiredSchema)); - default: - throw new UnsupportedOperationException("Unsupported memory model"); - } - } - - private CloseableIterable applyResidualFiltering(CloseableIterable iter, Expression residual, - Schema readSchema) { - boolean applyResidual = !context.getConfiguration().getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false); - - if (applyResidual && residual != null && residual != Expressions.alwaysTrue()) { - // Date and timestamp values are not the correct type for Evaluator. - // Wrapping to return the expected type. - InternalRecordWrapper wrapper = new InternalRecordWrapper(readSchema.asStruct()); - Evaluator filter = new Evaluator(readSchema.asStruct(), residual, caseSensitive); - return CloseableIterable.filter(iter, record -> filter.eval(wrapper.wrap((StructLike) record))); - } else { - return iter; - } - } - - private CloseableIterable newAvroIterable( - InputFile inputFile, FileScanTask task, Schema readSchema) { - Expression residual = HiveIcebergInputFormat.residualForTask(task, context.getConfiguration()); - Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile) - .project(readSchema) - .split(task.start(), task.length()); - - if (reuseContainers) { - avroReadBuilder.reuseContainers(); - } - - if (nameMapping != null) { - avroReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping)); - } - - avroReadBuilder.createReaderFunc( - (expIcebergSchema, expAvroSchema) -> - DataReader.create(expIcebergSchema, expAvroSchema, - constantsMap(task, IdentityPartitionConverters::convertConstant))); - - return applyResidualFiltering(avroReadBuilder.build(), residual, readSchema); - } - - private CloseableIterable newParquetIterable(InputFile inputFile, FileScanTask task, Schema readSchema) { - Expression residual = HiveIcebergInputFormat.residualForTask(task, context.getConfiguration()); - - Parquet.ReadBuilder parquetReadBuilder = Parquet.read(inputFile) - .project(readSchema) - .filter(residual) - .caseSensitive(caseSensitive) - .split(task.start(), task.length()); - - if (reuseContainers) { - parquetReadBuilder.reuseContainers(); - } - - if (nameMapping != null) { - parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping)); - } - - parquetReadBuilder.createReaderFunc( - fileSchema -> GenericParquetReaders.buildReader( - readSchema, fileSchema, constantsMap(task, IdentityPartitionConverters::convertConstant))); - - return applyResidualFiltering(parquetReadBuilder.build(), residual, readSchema); - } - - private CloseableIterable newOrcIterable(InputFile inputFile, FileScanTask task, Schema readSchema) { - Map idToConstant = constantsMap(task, IdentityPartitionConverters::convertConstant); - Schema readSchemaWithoutConstantAndMetadataFields = schemaWithoutConstantsAndMeta(readSchema, idToConstant); - Expression residual = HiveIcebergInputFormat.residualForTask(task, context.getConfiguration()); - - ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile) - .project(readSchemaWithoutConstantAndMetadataFields) - .filter(residual) - .caseSensitive(caseSensitive) - .split(task.start(), task.length()); - - if (nameMapping != null) { - orcReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping)); - } - - orcReadBuilder.createReaderFunc( - fileSchema -> GenericOrcReader.buildReader( - readSchema, fileSchema, idToConstant)); - - return applyResidualFiltering(orcReadBuilder.build(), residual, readSchema); - } - - private Map constantsMap(FileScanTask task, BiFunction converter) { - PartitionSpec spec = task.spec(); - Set idColumns = spec.identitySourceIds(); - Schema partitionSchema = TypeUtil.select(expectedSchema, idColumns); - boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty(); - if (expectedSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) { - Types.StructType partitionType = Partitioning.partitionType(table); - return PartitionUtil.constantsMap(task, partitionType, converter); - } else if (projectsIdentityPartitionColumns) { - Types.StructType partitionType = Partitioning.partitionType(table); - return PartitionUtil.constantsMap(task, partitionType, converter); - } else { - return Collections.emptyMap(); - } - } - - private static Schema readSchema(Configuration conf, Table table, boolean caseSensitive) { - Schema readSchema = InputFormatConfig.readSchema(conf); - - if (readSchema != null) { - return readSchema; - } - - String[] selectedColumns = InputFormatConfig.selectedColumns(conf); - readSchema = table.schema(); - - if (selectedColumns != null) { - readSchema = - caseSensitive ? readSchema.select(selectedColumns) : readSchema.caseInsensitiveSelect(selectedColumns); - } - - if (InputFormatConfig.fetchVirtualColumns(conf)) { - return IcebergAcidUtil.createFileReadSchemaWithVirtualColums(readSchema.columns(), table); - } - - return readSchema; - } - - private static Schema schemaWithoutConstantsAndMeta(Schema readSchema, Map idToConstant) { - // remove the nested fields of the partition struct - Set partitionFields = Optional.ofNullable(readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID)) - .map(Types.NestedField::type) - .map(Type::asStructType) - .map(Types.StructType::fields) - .map(fields -> fields.stream().map(Types.NestedField::fieldId).collect(Collectors.toSet())) - .orElseGet(Collections::emptySet); - - // remove constants and meta columns too - Set collect = Stream.of(idToConstant.keySet(), MetadataColumns.metadataFieldIds(), partitionFields) - .flatMap(Set::stream) - .collect(Collectors.toSet()); - - return TypeUtil.selectNot(readSchema, collect); - } - } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeRecordReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeRecordReader.java new file mode 100644 index 000000000000..60995a1988d6 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeRecordReader.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapreduce; + +import java.io.IOException; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +public final class IcebergMergeRecordReader extends AbstractIcebergRecordReader { + private IcebergMergeSplit mergeSplit; + private CloseableIterator currentIterator; + private T current; + + @Override + public void initialize(InputSplit split, TaskAttemptContext newContext) { + // For now IcebergInputFormat does its own split planning and does not accept FileSplit instances + super.initialize(split, newContext); + mergeSplit = (IcebergMergeSplit) split; + this.currentIterator = nextTask(); + } + + private CloseableIterator nextTask() { + return openGeneric(mergeSplit.getContentFile(), getTable().schema()).iterator(); + } + + @Override + public boolean nextKeyValue() throws IOException { + while (true) { + if (currentIterator.hasNext()) { + current = currentIterator.next(); + return true; + } else { + currentIterator.close(); + return false; + } + } + } + + @Override + public T getCurrentValue() { + return current; + } + + @Override + public void close() throws IOException { + currentIterator.close(); + } + + private CloseableIterable openGeneric(ContentFile contentFile, Schema readSchema) { + InputFile inputFile = null; + Schema schema = null; + if (contentFile instanceof DataFile) { + DataFile dataFile = (DataFile) contentFile; + inputFile = getTable().encryption().decrypt(EncryptedFiles.encryptedInput( + getTable().io().newInputFile(dataFile.path().toString()), + dataFile.keyMetadata())); + schema = readSchema; + } + CloseableIterable iterable; + switch (contentFile.format()) { + case AVRO: + iterable = newAvroIterable(inputFile, schema); + break; + case ORC: + iterable = newOrcIterable(inputFile, schema); + break; + case PARQUET: + iterable = newParquetIterable(inputFile, schema); + break; + default: + throw new UnsupportedOperationException( + String.format("Cannot read %s file: %s", contentFile.format().name(), contentFile.path())); + } + + return iterable; + } + + private CloseableIterable newAvroIterable( + InputFile inputFile, Schema readSchema) { + Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile) + .project(readSchema) + .split(mergeSplit.getStart(), mergeSplit.getLength()); + + if (isReuseContainers()) { + avroReadBuilder.reuseContainers(); + } + + if (getNameMapping() != null) { + avroReadBuilder.withNameMapping(NameMappingParser.fromJson(getNameMapping())); + } + + avroReadBuilder.createReaderFunc( + (expIcebergSchema, expAvroSchema) -> + DataReader.create(expIcebergSchema, expAvroSchema, Maps.newHashMap())); + + return applyResidualFiltering(avroReadBuilder.build(), null, readSchema); + } + + private CloseableIterable newOrcIterable(InputFile inputFile, Schema readSchema) { + ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile) + .project(readSchema) + .caseSensitive(isCaseSensitive()) + .split(mergeSplit.getStart(), mergeSplit.getLength()); + + if (getNameMapping() != null) { + orcReadBuilder.withNameMapping(NameMappingParser.fromJson(getNameMapping())); + } + + orcReadBuilder.createReaderFunc( + fileSchema -> GenericOrcReader.buildReader( + readSchema, fileSchema, Maps.newHashMap())); + + return applyResidualFiltering(orcReadBuilder.build(), null, readSchema); + } + + private CloseableIterable newParquetIterable(InputFile inputFile, Schema readSchema) { + + Parquet.ReadBuilder parquetReadBuilder = Parquet.read(inputFile) + .project(readSchema) + .caseSensitive(isCaseSensitive()) + .split(mergeSplit.getStart(), mergeSplit.getLength()); + + if (isReuseContainers()) { + parquetReadBuilder.reuseContainers(); + } + + if (getNameMapping() != null) { + parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(getNameMapping())); + } + + parquetReadBuilder.createReaderFunc( + fileSchema -> GenericParquetReaders.buildReader( + readSchema, fileSchema, Maps.newHashMap())); + + return applyResidualFiltering(parquetReadBuilder.build(), null, readSchema); + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeSplit.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeSplit.java new file mode 100644 index 000000000000..64a4b6737be0 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeSplit.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Properties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.util.SerializationUtil; + +public class IcebergMergeSplit extends FileSplit implements org.apache.hadoop.mapred.InputSplit { + private transient Configuration conf; + private ContentFile contentFile; + + // public no-argument constructor for deserialization + public IcebergMergeSplit() { + } + + public IcebergMergeSplit(Configuration conf, + CombineHiveInputFormat.CombineHiveInputSplit split, + Integer partition, Properties properties) throws IOException { + super(split.getPaths()[partition], split + .getStartOffsets()[partition], split.getLengths()[partition], split + .getLocations()); + this.conf = conf; + Path path = split.getPaths()[partition]; + contentFile = (ContentFile) properties.get(path); + } + + @Override + public long getLength() { + return contentFile.fileSizeInBytes(); + } + + @Override + public String[] getLocations() { + return new String[]{"*"}; + } + + @Override + public void write(DataOutput out) throws IOException { + byte[] data = SerializationUtil.serializeToBytes(this.contentFile); + out.writeInt(data.length); + out.write(data); + } + + @Override + public void readFields(DataInput in) throws IOException { + byte[] data = new byte[in.readInt()]; + in.readFully(data); + this.contentFile = SerializationUtil.deserializeFromBytes(data); + } + + public ContentFile getContentFile() { + return contentFile; + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java new file mode 100644 index 000000000000..0c5b719aaa92 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapreduce; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataTask; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Partitioning; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.data.GenericDeleteFilter; +import org.apache.iceberg.data.IdentityPartitionConverters; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.hive.HiveVersion; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.mr.hive.HiveIcebergInputFormat; +import org.apache.iceberg.mr.hive.IcebergAcidUtil; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PartitionUtil; + +public final class IcebergRecordReader extends AbstractIcebergRecordReader { + + private static final String HIVE_VECTORIZED_READER_CLASS = "org.apache.iceberg.mr.hive.vector.HiveVectorizedReader"; + private static final DynMethods.StaticMethod HIVE_VECTORIZED_READER_BUILDER; + + static { + if (HiveVersion.min(HiveVersion.HIVE_3)) { + HIVE_VECTORIZED_READER_BUILDER = DynMethods.builder("reader") + .impl(HIVE_VECTORIZED_READER_CLASS, + Table.class, + Path.class, + FileScanTask.class, + Map.class, + TaskAttemptContext.class, + Expression.class, + Schema.class) + .buildStatic(); + } else { + HIVE_VECTORIZED_READER_BUILDER = null; + } + } + + private Iterator tasks; + private CloseableIterator currentIterator; + private T current; + + @Override + public void initialize(InputSplit split, TaskAttemptContext newContext) { + // For now IcebergInputFormat does its own split planning and does not accept FileSplit instances + super.initialize(split, newContext); + CombinedScanTask task = ((IcebergSplit) split).task(); + this.tasks = task.files().iterator(); + this.currentIterator = nextTask(); + } + + private CloseableIterator nextTask() { + CloseableIterator closeableIterator = open(tasks.next(), getExpectedSchema()).iterator(); + if (!isFetchVirtualColumns() || Utilities.getIsVectorized(getConf())) { + return closeableIterator; + } + return new IcebergAcidUtil.VirtualColumnAwareIterator(closeableIterator, getExpectedSchema(), getConf()); + } + + @Override + public boolean nextKeyValue() throws IOException { + while (true) { + if (currentIterator.hasNext()) { + current = currentIterator.next(); + return true; + } else if (tasks.hasNext()) { + currentIterator.close(); + this.currentIterator = nextTask(); + } else { + currentIterator.close(); + return false; + } + } + } + + @Override + public T getCurrentValue() { + return current; + } + + @Override + public void close() throws IOException { + currentIterator.close(); + } + + private CloseableIterable openVectorized(FileScanTask task, Schema readSchema) { + Preconditions.checkArgument(!task.file().format().equals(FileFormat.AVRO), + "Vectorized execution is not yet supported for Iceberg avro tables. " + + "Please turn off vectorization and retry the query."); + Preconditions.checkArgument(HiveVersion.min(HiveVersion.HIVE_3), + "Vectorized read is unsupported for Hive 2 integration."); + + Path path = new Path(task.file().path().toString()); + Map idToConstant = constantsMap(task, HiveIdentityPartitionConverters::convertConstant); + Expression residual = HiveIcebergInputFormat.residualForTask(task, getContext().getConfiguration()); + + // TODO: We have to take care of the EncryptionManager when LLAP and vectorization is used + CloseableIterable iterator = HIVE_VECTORIZED_READER_BUILDER.invoke(getTable(), path, task, + idToConstant, getContext(), residual, readSchema); + + return applyResidualFiltering(iterator, residual, readSchema); + } + + private CloseableIterable openGeneric(FileScanTask task, Schema readSchema) { + if (task.isDataTask()) { + // When querying metadata tables, the currentTask is a DataTask and the data has to + // be fetched from the task instead of reading it from files. + IcebergInternalRecordWrapper wrapper = + new IcebergInternalRecordWrapper(getTable().schema().asStruct(), readSchema.asStruct()); + return (CloseableIterable) CloseableIterable.transform(((DataTask) task).rows(), row -> wrapper.wrap(row)); + } + + DataFile file = task.file(); + InputFile inputFile = getTable().encryption().decrypt(EncryptedFiles.encryptedInput( + getTable().io().newInputFile(file.path().toString()), + file.keyMetadata())); + + CloseableIterable iterable; + switch (file.format()) { + case AVRO: + iterable = newAvroIterable(inputFile, task, readSchema); + break; + case ORC: + iterable = newOrcIterable(inputFile, task, readSchema); + break; + case PARQUET: + iterable = newParquetIterable(inputFile, task, readSchema); + break; + default: + throw new UnsupportedOperationException( + String.format("Cannot read %s file: %s", file.format().name(), file.path())); + } + + return iterable; + } + + @SuppressWarnings("unchecked") + private CloseableIterable open(FileScanTask currentTask, Schema readSchema) { + switch (getInMemoryDataModel()) { + case PIG: + // TODO: Support Pig and Hive object models for IcebergInputFormat + throw new UnsupportedOperationException("Pig and Hive object models are not supported."); + case HIVE: + return openVectorized(currentTask, readSchema); + case GENERIC: + DeleteFilter deletes = new GenericDeleteFilter(getTable().io(), currentTask, getTable().schema(), readSchema); + Schema requiredSchema = deletes.requiredSchema(); + return deletes.filter(openGeneric(currentTask, requiredSchema)); + default: + throw new UnsupportedOperationException("Unsupported memory model"); + } + } + + private CloseableIterable newAvroIterable( + InputFile inputFile, FileScanTask task, Schema readSchema) { + Expression residual = HiveIcebergInputFormat.residualForTask(task, getContext().getConfiguration()); + Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile) + .project(readSchema) + .split(task.start(), task.length()); + + if (isReuseContainers()) { + avroReadBuilder.reuseContainers(); + } + + if (getNameMapping() != null) { + avroReadBuilder.withNameMapping(NameMappingParser.fromJson(getNameMapping())); + } + + avroReadBuilder.createReaderFunc( + (expIcebergSchema, expAvroSchema) -> + DataReader.create(expIcebergSchema, expAvroSchema, + constantsMap(task, IdentityPartitionConverters::convertConstant))); + + return applyResidualFiltering(avroReadBuilder.build(), residual, readSchema); + } + + private CloseableIterable newParquetIterable(InputFile inputFile, FileScanTask task, Schema readSchema) { + Expression residual = HiveIcebergInputFormat.residualForTask(task, getContext().getConfiguration()); + + Parquet.ReadBuilder parquetReadBuilder = Parquet.read(inputFile) + .project(readSchema) + .filter(residual) + .caseSensitive(isCaseSensitive()) + .split(task.start(), task.length()); + + if (isReuseContainers()) { + parquetReadBuilder.reuseContainers(); + } + + if (getNameMapping() != null) { + parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(getNameMapping())); + } + + parquetReadBuilder.createReaderFunc( + fileSchema -> GenericParquetReaders.buildReader( + readSchema, fileSchema, constantsMap(task, IdentityPartitionConverters::convertConstant))); + + return applyResidualFiltering(parquetReadBuilder.build(), residual, readSchema); + } + + private CloseableIterable newOrcIterable(InputFile inputFile, FileScanTask task, Schema readSchema) { + Map idToConstant = constantsMap(task, IdentityPartitionConverters::convertConstant); + Schema readSchemaWithoutConstantAndMetadataFields = schemaWithoutConstantsAndMeta(readSchema, idToConstant); + Expression residual = HiveIcebergInputFormat.residualForTask(task, getContext().getConfiguration()); + + ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile) + .project(readSchemaWithoutConstantAndMetadataFields) + .filter(residual) + .caseSensitive(isCaseSensitive()) + .split(task.start(), task.length()); + + if (getNameMapping() != null) { + orcReadBuilder.withNameMapping(NameMappingParser.fromJson(getNameMapping())); + } + + orcReadBuilder.createReaderFunc( + fileSchema -> GenericOrcReader.buildReader( + readSchema, fileSchema, idToConstant)); + + return applyResidualFiltering(orcReadBuilder.build(), residual, readSchema); + } + + private Map constantsMap(FileScanTask task, BiFunction converter) { + PartitionSpec spec = task.spec(); + Set idColumns = spec.identitySourceIds(); + Schema partitionSchema = TypeUtil.select(getExpectedSchema(), idColumns); + boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty(); + if (getExpectedSchema().findField(MetadataColumns.PARTITION_COLUMN_ID) != null) { + Types.StructType partitionType = Partitioning.partitionType(getTable()); + return PartitionUtil.constantsMap(task, partitionType, converter); + } else if (projectsIdentityPartitionColumns) { + Types.StructType partitionType = Partitioning.partitionType(getTable()); + return PartitionUtil.constantsMap(task, partitionType, converter); + } else { + return Collections.emptyMap(); + } + } + + private static Schema schemaWithoutConstantsAndMeta(Schema readSchema, Map idToConstant) { + // remove the nested fields of the partition struct + Set partitionFields = Optional.ofNullable(readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID)) + .map(Types.NestedField::type) + .map(Type::asStructType) + .map(Types.StructType::fields) + .map(fields -> fields.stream().map(Types.NestedField::fieldId).collect(Collectors.toSet())) + .orElseGet(Collections::emptySet); + + // remove constants and meta columns too + Set collect = Stream.of(idToConstant.keySet(), MetadataColumns.metadataFieldIds(), partitionFields) + .flatMap(Set::stream) + .collect(Collectors.toSet()); + + return TypeUtil.selectNot(readSchema, collect); + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java index 420bf9ea3b18..395ea35fe47f 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java @@ -32,7 +32,7 @@ // Since this class extends `mapreduce.InputSplit and implements `mapred.InputSplit`, it can be returned by both MR v1 // and v2 file formats. -public class IcebergSplit extends InputSplit implements org.apache.hadoop.mapred.InputSplit, IcebergSplitContainer { +public class IcebergSplit extends InputSplit implements IcebergSplitContainer { public static final String[] ANYWHERE = new String[]{"*"}; diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplitContainer.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplitContainer.java index c77543763b1a..b989444957c4 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplitContainer.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplitContainer.java @@ -19,7 +19,9 @@ package org.apache.iceberg.mr.mapreduce; -public interface IcebergSplitContainer { +import org.apache.hadoop.mapred.InputSplit; + +public interface IcebergSplitContainer extends InputSplit { IcebergSplit icebergSplit(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index d057bf4f4603..e5f407ee15cc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -56,6 +57,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -368,18 +370,14 @@ private InputSplit[] getCombineSplits(JobConf job, int numSplits, PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively( pathToPartitionInfo, path, IOPrepareCache.get().allocatePartitionDescMap()); TableDesc tableDesc = part.getTableDesc(); - if (tableDesc != null) { - boolean useDefaultFileFormat = part.getInputFileFormatClass() - .isAssignableFrom(tableDesc.getInputFileFormatClass()); - if (tableDesc.isNonNative() && useDefaultFileFormat) { - return super.getSplits(job, numSplits); - } - } - // Use HiveInputFormat if any of the paths is not splittable Class inputFormatClass = part.getInputFileFormatClass(); String inputFormatClassName = inputFormatClass.getName(); InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); + if (tableDesc != null && tableDesc.isNonNative() && mrwork.getMergeSplitProperties() == null) { + return super.getSplits(job, numSplits); + } + String deserializerClassName = null; try { deserializerClassName = part.getDeserializer(job).getClass().getName(); @@ -765,4 +763,8 @@ public String toString() { public interface AvoidSplitCombination { boolean shouldSkipCombine(Path path, Configuration conf) throws IOException; } + + public interface MergeSplits extends AvoidSplitCombination { + FileSplit createMergeSplit(Configuration conf, CombineHiveInputSplit split, Integer partition, Properties splitProperties) throws IOException; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java index 7a687864cea4..c0118d5e3d81 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java @@ -70,14 +70,12 @@ public CombineHiveRecordReader(InputSplit split, Configuration conf, + inputFormatClassName); } InputFormat inputFormat = HiveInputFormat.getInputFormatFromCache(inputFormatClass, jobConf); + MapWork mrwork = null; if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon())) { try { // TODO : refactor this out if (pathToPartInfo == null) { - MapWork mrwork = (MapWork) Utilities.getMergeWork(jobConf); - if (mrwork == null) { - mrwork = Utilities.getMapWork(jobConf); - } + mrwork = getMrWork(jobConf); pathToPartInfo = mrwork.getPathToPartitionInfo(); } @@ -88,14 +86,21 @@ public CombineHiveRecordReader(InputSplit split, Configuration conf, } } + FileSplit inputSplit; // create a split for the given partition - FileSplit fsplit = new FileSplit(hsplit.getPaths()[partition], hsplit - .getStartOffsets()[partition], hsplit.getLengths()[partition], hsplit - .getLocations()); + if (inputFormat instanceof CombineHiveInputFormat.MergeSplits) { + mrwork = getMrWork(jobConf); + inputSplit = ((CombineHiveInputFormat.MergeSplits) inputFormat).createMergeSplit(jobConf, hsplit, partition, + mrwork.getMergeSplitProperties()); + } else { + inputSplit = new FileSplit(hsplit.getPaths()[partition], hsplit + .getStartOffsets()[partition], hsplit.getLengths()[partition], hsplit + .getLocations()); + } - this.setRecordReader(inputFormat.getRecordReader(fsplit, jobConf, reporter)); + this.setRecordReader(inputFormat.getRecordReader(inputSplit, jobConf, reporter)); - this.initIOContext(fsplit, jobConf, inputFormatClass, this.recordReader); + this.initIOContext(inputSplit, jobConf, inputFormatClass, this.recordReader); //If current split is from the same file as preceding split and the preceding split has footerbuffer, //the current split should use the preceding split's footerbuffer in order to skip footer correctly. @@ -127,6 +132,14 @@ private PartitionDesc extractSinglePartSpec(CombineHiveInputSplit hsplit) throws return part; } + private MapWork getMrWork(JobConf jobConf) { + MapWork mrwork = (MapWork) Utilities.getMergeWork(jobConf); + if (mrwork == null) { + mrwork = Utilities.getMapWork(jobConf); + } + return mrwork; + } + @Override public void doClose() throws IOException { recordReader.close(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java index 05e54798840a..4e6d4c3bcfec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java @@ -349,9 +349,6 @@ private void generateActualTasks(HiveConf conf, List> resTsks, Utilities.FILE_OP_LOGGER.warn("merger ignoring invalid DP path " + status[i].getPath()); continue; } - if (useCustomStorageHandler) { - updatePartDescProperties(pDesc, mergeProperties); - } Utilities.FILE_OP_LOGGER.debug("merge resolver will merge " + status[i].getPath()); work.resolveDynamicPartitionStoredAsSubDirsMerge(conf, status[i].getPath(), tblDesc, aliases, pDesc); @@ -568,11 +565,11 @@ private void setupWorkWithCustomHandler(MapWork mapWork, Path dirPath, mapWork.setAliasToWork(aliasToWork); } if (partitionDesc != null) { - updatePartDescProperties(partitionDesc, mergeProperties); pathToPartitionInfo.remove(dirPath); pathToPartitionInfo.put(tmpDir, partitionDesc); mapWork.setPathToPartitionInfo(pathToPartitionInfo); } + mapWork.setMergeSplitProperties(mergeProperties.getSplitProperties()); mapWork.removePathToAlias(dirPath); mapWork.addPathToAlias(tmpDir, tmpDir.toString()); mapWork.setUseInputPathsDirectly(true); @@ -593,22 +590,4 @@ private Map> getParentDirToFileMap(FileSystem inpFs } return manifestDirsToPaths; } - - private void updatePartDescProperties(PartitionDesc partitionDesc, - MergeTaskProperties mergeProperties) throws IOException, ClassNotFoundException { - if (mergeProperties != null) { - String inputFileFormatClassName = mergeProperties.getStorageFormatDescriptor().getInputFormat(); - String outputFileFormatClassName = mergeProperties.getStorageFormatDescriptor().getOutputFormat(); - String serdeClassName = mergeProperties.getStorageFormatDescriptor().getSerde(); - if (inputFileFormatClassName != null) { - partitionDesc.setInputFileFormatClass(JavaUtils.loadClass(inputFileFormatClassName)); - } - if (outputFileFormatClassName != null) { - partitionDesc.setOutputFileFormatClass(JavaUtils.loadClass(outputFileFormatClassName)); - } - if (serdeClassName != null) { - partitionDesc.getTableDesc().getProperties().setProperty(serdeConstants.SERIALIZATION_LIB, serdeClassName); - } - } - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 076ef0a99b79..e705af31dea8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -22,18 +22,8 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator.ProbeDecodeContext; import org.apache.hadoop.hive.ql.exec.Utilities; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.BitSet; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -184,6 +174,8 @@ public enum LlapIODescriptor { private boolean useInputPathsDirectly; + private Properties mergeSplitProperties; + public MapWork() {} public MapWork(String name) { @@ -954,4 +946,12 @@ public void setUseInputPathsDirectly(boolean useInputPathsDirectly) { public boolean isUseInputPathsDirectly() { return useInputPathsDirectly; } + + public Properties getMergeSplitProperties() { + return mergeSplitProperties; + } + + public void setMergeSplitProperties(Properties mergeSplitProperties) { + this.mergeSplitProperties = mergeSplitProperties; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java index 6083b1b117f5..fdf8451d2738 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java @@ -19,12 +19,16 @@ package org.apache.hadoop.hive.ql.plan; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor; import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; public interface MergeTaskProperties { public Path getTmpLocation(); - public StorageFormatDescriptor getStorageFormatDescriptor() throws IOException; + default Properties getSplitProperties() throws IOException { + return null; + } }