From c42799be63bcc3331ad9dc8a4a72bfdba9259d0c Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Wed, 27 Jan 2021 15:54:15 +0100 Subject: [PATCH 1/3] Hive: enable writing with Tez recheck --- .../mr/hive/HiveIcebergOutputCommitter.java | 26 ++-- .../mr/hive/HiveIcebergOutputFormat.java | 4 +- .../org/apache/iceberg/mr/hive/TezUtil.java | 121 ++++++++++++++++++ .../hive/TestHiveIcebergOutputCommitter.java | 3 +- ...stHiveIcebergStorageHandlerWithEngine.java | 6 + 5 files changed, 147 insertions(+), 13 deletions(-) create mode 100644 mr/src/main/java/org/apache/iceberg/mr/hive/TezUtil.java diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index eed0fdc575d1..40d0f4376988 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -82,11 +82,13 @@ public boolean needsTaskCommit(TaskAttemptContext context) { /** * Collects the generated data files and creates a commit file storing the data file list. - * @param context The job context + * @param ctx The task attempt context * @throws IOException Thrown if there is an error writing the commit file */ @Override - public void commitTask(TaskAttemptContext context) throws IOException { + public void commitTask(TaskAttemptContext ctx) throws IOException { + TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(ctx); + TaskAttemptID attemptID = context.getTaskAttemptID(); String fileForCommitLocation = generateFileForCommitLocation(context.getJobConf(), attemptID.getJobID(), attemptID.getTaskID().getId()); @@ -108,11 +110,13 @@ public void commitTask(TaskAttemptContext context) throws IOException { /** * Removes files generated by this task. - * @param context The task context + * @param ctx The task attempt context * @throws IOException Thrown if there is an error closing the writer */ @Override - public void abortTask(TaskAttemptContext context) throws IOException { + public void abortTask(TaskAttemptContext ctx) throws IOException { + TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(ctx); + // Clean up writer data from the local store HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(context.getTaskAttemptID()); @@ -125,11 +129,13 @@ public void abortTask(TaskAttemptContext context) throws IOException { /** * Reads the commit files stored in the temp directory and collects the generated committed data files. * Appends the data files to the table. At the end removes the temporary directory. - * @param jobContext The job context + * @param jobCtx The job context * @throws IOException if there is a failure deleting the files */ @Override - public void commitJob(JobContext jobContext) throws IOException { + public void commitJob(JobContext jobCtx) throws IOException { + JobContext jobContext = TezUtil.enrichContextWithVertexId(jobCtx); + JobConf conf = jobContext.getJobConf(); Table table = Catalogs.loadTable(conf); @@ -158,12 +164,14 @@ public void commitJob(JobContext jobContext) throws IOException { /** * Removes the generated data files, if there is a commit file already generated for them. * The cleanup at the end removes the temporary directory as well. - * @param jobContext The job context + * @param jobCtx The job context * @param status The status of the job * @throws IOException if there is a failure deleting the files */ @Override - public void abortJob(JobContext jobContext, int status) throws IOException { + public void abortJob(JobContext jobCtx, int status) throws IOException { + JobContext jobContext = TezUtil.enrichContextWithVertexId(jobCtx); + String location = generateJobLocation(jobContext.getJobConf(), jobContext.getJobID()); LOG.info("Job {} is aborted. Cleaning job location {}", jobContext.getJobID(), location); @@ -215,7 +223,7 @@ private static List dataFiles(JobContext jobContext, FileIO io, boolea JobConf conf = jobContext.getJobConf(); // If there are reducers, then every reducer will generate a result file. // If this is a map only task, then every mapper will generate a result file. - int expectedFiles = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks(); + int expectedFiles = conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks(); ExecutorService executor = null; try { diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java index 823490e57264..dc592cebb616 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java @@ -45,8 +45,6 @@ public class HiveIcebergOutputFormat implements OutputFormat>, HiveOutputFormat> { - private static final String TASK_ATTEMPT_ID_KEY = "mapred.task.id"; - @Override public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, Properties tableAndSerDeProperties, Progressable progress) { @@ -65,7 +63,7 @@ public void checkOutputSpecs(FileSystem ignored, JobConf job) { } private static HiveIcebergRecordWriter writer(JobConf jc) { - TaskAttemptID taskAttemptID = TaskAttemptID.forName(jc.get(TASK_ATTEMPT_ID_KEY)); + TaskAttemptID taskAttemptID = TezUtil.taskAttemptWrapper(jc); Schema schema = HiveIcebergStorageHandler.schema(jc); PartitionSpec spec = HiveIcebergStorageHandler.spec(jc); FileFormat fileFormat = FileFormat.valueOf(jc.get(InputFormatConfig.WRITE_FILE_FORMAT)); diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/TezUtil.java b/mr/src/main/java/org/apache/iceberg/mr/hive/TezUtil.java new file mode 100644 index 000000000000..a69d183fcebe --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/TezUtil.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.util.Objects; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapreduce.JobID; + +public class TezUtil { + + private static final String TASK_ATTEMPT_ID_KEY = "mapred.task.id"; + // TezProcessor (Hive) propagates the vertex id under this key - available during Task commit phase + private static final String TEZ_VERTEX_ID_HIVE = "hive.tez.vertex.index"; + // MROutputCommitter (Tez) propagates the vertex id under this key - available during DAG/Vertex commit phase + private static final String TEZ_VERTEX_ID_DAG = "mapreduce.task.vertex.id"; + + /** + * If the Tez vertex id is present in config, creates a new jobContext by appending the Tez vertex id to the jobID. + * For the rationale behind this enrichment, please refer to point #1 in the docs of {@link TaskAttemptWrapper}. + * @param jobContext original jobContext to be enriched + * @return enriched jobContext + */ + public static JobContext enrichContextWithVertexId(JobContext jobContext) { + String vertexId = jobContext.getJobConf().get(TEZ_VERTEX_ID_DAG); + if (vertexId != null) { + JobID jobID = getJobIDWithVertexAppended(jobContext.getJobID(), vertexId); + return new JobContextImpl(jobContext.getJobConf(), jobID, jobContext.getProgressible()); + } else { + return jobContext; + } + } + + /** + * Creates a new taskAttemptContext by replacing the taskAttemptID with a wrapped object. + * For the rationale behind this enrichment, please refer to point #2 in the docs of {@link TaskAttemptWrapper}. + * @param taskAttemptContext original taskAttemptContext to be enriched + * @return enriched taskAttemptContext + */ + public static TaskAttemptContext enrichContextWithAttemptWrapper(TaskAttemptContext taskAttemptContext) { + TaskAttemptID wrapped = TezUtil.taskAttemptWrapper(taskAttemptContext.getTaskAttemptID()); + return new TaskAttemptContextImpl(taskAttemptContext.getJobConf(), wrapped); + } + + public static TaskAttemptID taskAttemptWrapper(TaskAttemptID attemptID) { + return new TaskAttemptWrapper(attemptID, ""); + } + + public static TaskAttemptID taskAttemptWrapper(JobConf jc) { + return new TaskAttemptWrapper(TaskAttemptID.forName(jc.get(TASK_ATTEMPT_ID_KEY)), jc.get(TEZ_VERTEX_ID_HIVE)); + } + + private static JobID getJobIDWithVertexAppended(JobID jobID, String vertexId) { + if (vertexId != null && !vertexId.isEmpty()) { + return new JobID(jobID.getJtIdentifier() + vertexId, jobID.getId()); + } else { + return jobID; + } + } + + private TezUtil() { + } + + /** + * Subclasses {@link org.apache.hadoop.mapred.TaskAttemptID}. It has two main purposes: + * 1. Provide a way to append an optional vertex id to the Job ID. This is needed because there is a discrepancy + * between how the attempt ID is constructed in the {@link org.apache.tez.mapreduce.output.MROutput} (with vertex ID + * appended to the end of the Job ID) and how it's available in the mapper (without vertex ID) which creates and + * caches the HiveIcebergRecordWriter object. + * 2. Redefine the equals/hashcode provided by TaskAttemptID so that task type (map or reduce) does not count, and + * therefore the mapper and reducer threads can use the same attempt ID-based key to retrieve the cached + * HiveIcebergRecordWriter object. + */ + private static class TaskAttemptWrapper extends TaskAttemptID { + + TaskAttemptWrapper(TaskAttemptID attemptID, String vertexId) { + super(getJobIDWithVertexAppended(attemptID.getJobID(), vertexId).getJtIdentifier(), + attemptID.getJobID().getId(), attemptID.getTaskType(), attemptID.getTaskID().getId(), attemptID.getId()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TaskAttemptWrapper that = (TaskAttemptWrapper) o; + return getId() == that.getId() && + getTaskID().getId() == that.getTaskID().getId() && + Objects.equals(getJobID(), that.getJobID()); + } + + @Override + public int hashCode() { + return Objects.hash(getId(), getTaskID().getId(), getJobID()); + } + } +} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java index 73f9786042ef..0743f9a73889 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java @@ -264,7 +264,8 @@ private List writeRecords(int taskNum, int attemptNum, boolean commitTas new OutputFileFactory(spec, FileFormat.PARQUET, location, io, encryption, taskId.getTaskID().getId(), attemptNum, QUERY_ID + "-" + JOB_ID); HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema, spec, FileFormat.PARQUET, - new GenericAppenderFactory(schema), outputFileFactory, io, TARGET_FILE_SIZE, taskId); + new GenericAppenderFactory(schema), outputFileFactory, io, TARGET_FILE_SIZE, + TezUtil.taskAttemptWrapper(taskId)); Container container = new Container<>(); diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java index 8ff378621cbf..f74039bc895c 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -155,6 +156,11 @@ public void after() throws Exception { // HiveServer2 thread pools are using thread local Hive -> HMSClient objects. These are not cleaned up when the // HiveServer2 is stopped. Only Finalizer closes the HMS connections. System.gc(); + // Mixing mr and tez jobs within the same JVM can cause problems. Mr jobs set the ExecMapper status to done=false + // at the beginning and to done=true at the end. However, tez jobs also rely on this value to see if they should + // proceed, but they do not reset it to done=false at the beginning. Therefore, without calling this after each test + // case, any tez job that follows a completed mr job will erroneously read done=true and will not proceed. + ExecMapper.setDone(false); } @Test From 2e4cb6cc3e88d9faf32369286d85a7689709eb3e Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Wed, 3 Feb 2021 22:38:28 +0100 Subject: [PATCH 2/3] Rename input context parameters - 3 --- .../mr/hive/HiveIcebergOutputCommitter.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index 40d0f4376988..47cbf7db85b2 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -82,12 +82,12 @@ public boolean needsTaskCommit(TaskAttemptContext context) { /** * Collects the generated data files and creates a commit file storing the data file list. - * @param ctx The task attempt context + * @param originalContext The task attempt context * @throws IOException Thrown if there is an error writing the commit file */ @Override - public void commitTask(TaskAttemptContext ctx) throws IOException { - TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(ctx); + public void commitTask(TaskAttemptContext originalContext) throws IOException { + TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext); TaskAttemptID attemptID = context.getTaskAttemptID(); String fileForCommitLocation = generateFileForCommitLocation(context.getJobConf(), @@ -110,12 +110,12 @@ public void commitTask(TaskAttemptContext ctx) throws IOException { /** * Removes files generated by this task. - * @param ctx The task attempt context + * @param originalContext The task attempt context * @throws IOException Thrown if there is an error closing the writer */ @Override - public void abortTask(TaskAttemptContext ctx) throws IOException { - TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(ctx); + public void abortTask(TaskAttemptContext originalContext) throws IOException { + TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext); // Clean up writer data from the local store HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(context.getTaskAttemptID()); @@ -129,12 +129,12 @@ public void abortTask(TaskAttemptContext ctx) throws IOException { /** * Reads the commit files stored in the temp directory and collects the generated committed data files. * Appends the data files to the table. At the end removes the temporary directory. - * @param jobCtx The job context + * @param originalContext The job context * @throws IOException if there is a failure deleting the files */ @Override - public void commitJob(JobContext jobCtx) throws IOException { - JobContext jobContext = TezUtil.enrichContextWithVertexId(jobCtx); + public void commitJob(JobContext originalContext) throws IOException { + JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext); JobConf conf = jobContext.getJobConf(); Table table = Catalogs.loadTable(conf); @@ -164,13 +164,13 @@ public void commitJob(JobContext jobCtx) throws IOException { /** * Removes the generated data files, if there is a commit file already generated for them. * The cleanup at the end removes the temporary directory as well. - * @param jobCtx The job context + * @param originalContext The job context * @param status The status of the job * @throws IOException if there is a failure deleting the files */ @Override - public void abortJob(JobContext jobCtx, int status) throws IOException { - JobContext jobContext = TezUtil.enrichContextWithVertexId(jobCtx); + public void abortJob(JobContext originalContext, int status) throws IOException { + JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext); String location = generateJobLocation(jobContext.getJobConf(), jobContext.getJobID()); LOG.info("Job {} is aborted. Cleaning job location {}", jobContext.getJobID(), location); From c4d4fa078d19acbd04bf138023a569bffe6bc6dc Mon Sep 17 00:00:00 2001 From: Marton Bod Date: Thu, 4 Feb 2021 16:20:42 +0100 Subject: [PATCH 3/3] Wrap capturedId in test --- .../apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java index 0743f9a73889..d6202f943fe4 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java @@ -205,7 +205,7 @@ public void writerIsClosedAfterTaskCommitFailure() throws IOException { } Assert.assertEquals(1, argumentCaptor.getAllValues().size()); - TaskAttemptID capturedId = argumentCaptor.getValue().getTaskAttemptID(); + TaskAttemptID capturedId = TezUtil.taskAttemptWrapper(argumentCaptor.getValue().getTaskAttemptID()); // writer is still in the map after commitTask failure Assert.assertNotNull(getWriter(capturedId)); failingCommitter.abortTask(new TaskAttemptContextImpl(conf, capturedId));