Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MAPREDUCE-7403. manifest-committer dynamic partitioning support. #4728

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
Expand Down Expand Up @@ -60,7 +64,8 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class BindingPathOutputCommitter extends PathOutputCommitter {
public class BindingPathOutputCommitter extends PathOutputCommitter
implements IOStatisticsSource, StreamCapabilities {

/**
* The classname for use in configurations.
Expand Down Expand Up @@ -181,4 +186,22 @@ public String toString() {
public PathOutputCommitter getCommitter() {
return committer;
}

/**
* Pass through if the inner committer supports StreamCapabilities.
* {@inheritDoc}
*/
@Override
public boolean hasCapability(final String capability) {
if (committer instanceof StreamCapabilities) {
return ((StreamCapabilities) committer).hasCapability(capability);
} else {
return false;
}
}

@Override
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(committer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.mapreduce.JobContext;
Expand All @@ -55,6 +56,7 @@

import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtDebug;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.CAPABILITY_DYNAMIC_PARTITIONING;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_DIAGNOSTICS_MANIFEST_DIR;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_SUMMARY_REPORT_DIR;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASKS_COMPLETED_COUNT;
Expand Down Expand Up @@ -84,7 +86,7 @@
@InterfaceAudience.Public
@InterfaceStability.Stable
public class ManifestCommitter extends PathOutputCommitter implements
IOStatisticsSource, StageEventCallbacks {
IOStatisticsSource, StageEventCallbacks, StreamCapabilities {

public static final Logger LOG = LoggerFactory.getLogger(
ManifestCommitter.class);
Expand Down Expand Up @@ -758,4 +760,15 @@ private static Path maybeSaveSummary(
public IOStatisticsStore getIOStatistics() {
return iostatistics;
}

/**
* The committer is compatible with spark's dynamic partitioning
* algorithm.
* @param capability string to query the stream support for.
* @return true if the requested capability is supported.
*/
@Override
public boolean hasCapability(final String capability) {
return CAPABILITY_DYNAMIC_PARTITIONING.equals(capability);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,12 @@ public final class ManifestCommitterConstants {
*/
public static final String CONTEXT_ATTR_TASK_ATTEMPT_ID = "ta";

/**
* Stream Capabilities probe for spark dynamic partitioning compatibility.
*/
public static final String CAPABILITY_DYNAMIC_PARTITIONING =
"mapreduce.job.committer.dynamic.partitioning";

private ManifestCommitterConstants() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,76 @@ appending data are creating and writing into new partitions.
job to create unique files. This is foundational for
any job to generate correct data.

# <a name="dynamic"></a> Spark Dynamic Partition overwriting

Spark has a feature called "Dynamic Partition Overwrites",

This can be initiated in SQL
```SQL
INSERT OVERWRITE TABLE ...
```
Or through DataSet writes where the mode is `overwrite` and the partitioning matches
that of the existing table
```scala
sparkConf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
// followed by an overwrite of a Dataset into an existing partitioned table.
eventData2
.write
.mode("overwrite")
.partitionBy("year", "month")
.format("parquet")
.save(existingDir)
```

This feature is implemented in Spark, which
1. Directs the job to write its new data to a temporary directory
1. After job commit completes, scans the output to identify the leaf directories "partitions" into which data was written.
1. Deletes the content of those directories in the destination table
1. Renames the new files into the partitions.

This is all done in spark, which takes over the tasks of scanning
the intermediate output tree, deleting partitions and of
renaming the new files.

This feature also adds the ability for a job to write data entirely outside
the destination table, which is done by
1. writing new files into the working directory
1. spark moving them to the final destination in job commit


The manifest committer is compatible with dynamic partition overwrites
on Azure and Google cloud storage as together they meet the core requirements of
the extension:
1. The working directory returned in `getWorkPath()` is in the same filesystem
as the final output.
2. `rename()` is an `O(1)` operation which is safe and fast to use when committing a job.

None of the S3A committers support this. Condition (1) is not met by
the staging committers, while (2) is not met by S3 itself.

To use the manifest committer with dynamic partition overwrites, the
spark version must contain
[SPARK-40034](https://issues.apache.org/jira/browse/SPARK-40034)
_PathOutputCommitters to work with dynamic partition overwrite_.

Be aware that the rename phase of the operation will be slow
if many files are renamed -this is done sequentially.
Parallel renaming would speed this up, *but could trigger the abfs overload
problems the manifest committer is designed to both minimize the risk
of and support recovery from*

The spark side of the commit operation will be listing/treewalking
the temporary output directory (some overhead), followed by
the file promotion, done with a classic filesystem `rename()`
call. There will be no explicit rate limiting here.

*What does this mean?*

It means that _dynamic partitioning should not be used on Azure Storage
for SQL queries/Spark DataSet operations where many thousands of files are created.
The fact that these will suffer from performance problems before
throttling scale issues surface, should be considered a warning.

# <a name="SUCCESS"></a> Job Summaries in `_SUCCESS` files

The original hadoop committer creates a zero byte `_SUCCESS` file in the root of the output directory
Expand Down Expand Up @@ -585,7 +655,7 @@ There is no need to alter these values, except when writing new implementations
something which is only needed if the store provides extra integration support for the
committer.

## <a name="concurrent"></a> Support for concurrent test runs.
## <a name="concurrent"></a> Support for concurrent jobs to the same directory

It *may* be possible to run multiple jobs targeting the same directory tree.

Expand All @@ -600,6 +670,8 @@ For this to work, a number of conditions must be met:
`mapreduce.fileoutputcommitter.cleanup.skipped` to `true`.
* All jobs/tasks must create files with unique filenames.
* All jobs must create output with the same directory partition structure.
* The job/queries MUST NOT be using Spark Dynamic Partitioning "INSERT OVERWRITE TABLE"; data may be lost.
This holds for *all* committers, not just the manifest committer.
* Remember to delete the `_temporary` directory later!

This has *NOT BEEN TESTED*
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.BindingPathOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory;
Expand Down Expand Up @@ -1549,6 +1550,23 @@ public void testOutputFormatIntegration() throws Throwable {
ManifestCommitter committer = (ManifestCommitter)
outputFormat.getOutputCommitter(tContext);

// check path capabilities directly
Assertions.assertThat(committer.hasCapability(
ManifestCommitterConstants.CAPABILITY_DYNAMIC_PARTITIONING))
.describedAs("dynamic partitioning capability in committer %s",
committer)
.isTrue();
// and through a binding committer -passthrough is critical
// for the spark binding.
BindingPathOutputCommitter bindingCommitter =
new BindingPathOutputCommitter(outputDir, tContext);
Assertions.assertThat(bindingCommitter.hasCapability(
ManifestCommitterConstants.CAPABILITY_DYNAMIC_PARTITIONING))
.describedAs("dynamic partitioning capability in committer %s",
bindingCommitter)
.isTrue();


// setup
JobData jobData = new JobData(job, jContext, tContext, committer);
setupJob(jobData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* <li>Nothing else got through either.</li>
* </ol>
*/
public class AWSStatus500Exception extends AWSServiceIOException {
public class jAWSStatus500Exception extends AWSServiceIOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a typo in Intellij.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I think Yetus failed beacuse of this only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aah

public AWSStatus500Exception(String operation,
AmazonServiceException cause) {
super(operation, cause);
Expand Down