Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -75,12 +76,15 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;

Expand Down Expand Up @@ -119,6 +123,10 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
private static final String CONF_MR_JOB_NAME = "mapreduce.job.name";
private static final String CONF_INPUT_FILE_GROUPER_CLASS =
"snapshot.export.input.file.grouper.class";
private static final String CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS =
"snapshot.export.input.file.location.resolver.class";
protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp";
private static final String CONF_COPY_MANIFEST_THREADS =
"snapshot.export.copy.references.threads";
Expand Down Expand Up @@ -157,13 +165,21 @@ static final class Options {
static final Option CHMOD =
new Option(null, "chmod", true, "Change the permission of the files to the specified one.");
static final Option MAPPERS = new Option(null, "mappers", true,
"Number of mappers to use during the copy (mapreduce.job.maps).");
"Number of mappers to use during the copy (mapreduce.job.maps). "
+ "If you provide a --custom-file-grouper, "
+ "then --mappers is interpreted as the number of mappers per group.");
static final Option BANDWIDTH =
new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second.");
static final Option RESET_TTL =
new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot");
static final Option STORAGE_POLICY = new Option(null, "storage-policy", true,
"Storage policy for export snapshot output directory, with format like: f=HOT&g=ALL_SDD");
static final Option CUSTOM_FILE_GROUPER = new Option(null, "custom-file-grouper", true,
"Fully qualified class name of an implementation of ExportSnapshot.CustomFileGrouper. "
+ "See JavaDoc on that class for more information.");
static final Option FILE_LOCATION_RESOLVER = new Option(null, "file-location-resolver", true,
"Fully qualified class name of an implementation of ExportSnapshot.FileLocationResolver. "
+ "See JavaDoc on that class for more information.");
}

// Export Map-Reduce Counters, to keep track of the progress
Expand All @@ -186,6 +202,54 @@ public enum ChecksumComparison {
INCOMPATIBLE, // checksum comparison is not compatible.
}

/**
* If desired, you may implement a CustomFileGrouper in order to influence how ExportSnapshot
* chooses which input files go into the MapReduce job's {@link InputSplit}s. Your implementation
* must return a data structure that contains each input file exactly once. Files that appear in
* separate entries in the top-level returned Collection are guaranteed to not be placed in the
* same InputSplit. This can be used to segregate your input files by the rack or host on which
* they are available, which, used in conjunction with {@link FileLocationResolver}, can improve
* the performance of your ExportSnapshot runs. To use this, pass the --custom-file-grouper
* argument with the fully qualified class name of an implementation of CustomFileGrouper that's
* on the classpath. If this argument is not used, no particular grouping logic will be applied.
*/
@InterfaceAudience.Public
public interface CustomFileGrouper {
Collection<Collection<Pair<SnapshotFileInfo, Long>>>
getGroupedInputFiles(final Collection<Pair<SnapshotFileInfo, Long>> snapshotFiles);
}

private static class NoopCustomFileGrouper implements CustomFileGrouper {
@Override
public Collection<Collection<Pair<SnapshotFileInfo, Long>>>
getGroupedInputFiles(final Collection<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
return ImmutableList.of(snapshotFiles);
}
}

/**
* If desired, you may implement a FileLocationResolver in order to influence the _location_
* metadata attached to each {@link InputSplit} that ExportSnapshot will submit to YARN. The
* method {@link #getLocationsForInputFiles(Collection)} method is called once for each InputSplit
* being constructed. Whatever is returned will ultimately be reported by that split's
* {@link InputSplit#getLocations()} method. This can be used to encourage YARN to schedule the
* ExportSnapshot's mappers on rack-local or host-local NodeManagers. To use this, pass the
* --file-location-resolver argument with the fully qualified class name of an implementation of
* FileLocationResolver that's on the classpath. If this argument is not used, no locations will
* be attached to the InputSplits.
*/
@InterfaceAudience.Public
public interface FileLocationResolver {
Set<String> getLocationsForInputFiles(final Collection<Pair<SnapshotFileInfo, Long>> files);
}

static class NoopFileLocationResolver implements FileLocationResolver {
@Override
public Set<String> getLocationsForInputFiles(Collection<Pair<SnapshotFileInfo, Long>> files) {
return ImmutableSet.of();
}
}

private static class ExportMapper
extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class);
Expand Down Expand Up @@ -724,8 +788,9 @@ private static Pair<SnapshotFileInfo, Long> getSnapshotFileAndSize(FileSystem fs
* The algorithm used is pretty straightforward; the file list is sorted by size, and then each
* group fetch the bigger file available, iterating through groups alternating the direction.
*/
static List<List<Pair<SnapshotFileInfo, Long>>>
getBalancedSplits(final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) {
static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits(
final Collection<Pair<SnapshotFileInfo, Long>> unsortedFiles, final int ngroups) {
List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(unsortedFiles);
// Sort files by size, from small to big
Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() {
public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) {
Expand All @@ -736,7 +801,6 @@ public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long>

// create balanced groups
List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>();
long[] sizeGroups = new long[ngroups];
int hi = files.size() - 1;
int lo = 0;

Expand All @@ -755,7 +819,6 @@ public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long>
Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--);

// add the hi one
sizeGroups[g] += fileInfo.getSecond();
group.add(fileInfo);

// change direction when at the end or the beginning
Expand All @@ -769,16 +832,10 @@ public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long>
}
}

if (LOG.isDebugEnabled()) {
for (int i = 0; i < sizeGroups.length; ++i) {
LOG.debug("export split=" + i + " size=" + Strings.humanReadableInt(sizeGroups[i]));
}
}

Comment on lines -772 to -777
Copy link
Contributor Author

@charlesconnell charlesconnell Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took this log out because it makes less sense now that getBalancedSplits() can be run multiple times, plus I added new logging that supplants it.

return fileGroups;
}

private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> {
@Override
public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split,
TaskAttemptContext tac) throws IOException, InterruptedException {
Expand All @@ -792,37 +849,78 @@ public List<InputSplit> getSplits(JobContext context) throws IOException, Interr
FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);

List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);

Collection<List<Pair<SnapshotFileInfo, Long>>> balancedGroups =
groupFilesForSplits(conf, snapshotFiles);

Class<? extends FileLocationResolver> fileLocationResolverClass =
conf.getClass(CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS, NoopFileLocationResolver.class,
FileLocationResolver.class);
FileLocationResolver fileLocationResolver =
ReflectionUtils.newInstance(fileLocationResolverClass, conf);
LOG.info("FileLocationResolver {} will provide location metadata for each InputSplit",
fileLocationResolverClass);

List<InputSplit> splits = new ArrayList<>(balancedGroups.size());
for (Collection<Pair<SnapshotFileInfo, Long>> files : balancedGroups) {
splits.add(new ExportSnapshotInputSplit(files, fileLocationResolver));
}
return splits;
}

Collection<List<Pair<SnapshotFileInfo, Long>>> groupFilesForSplits(Configuration conf,
List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
if (mappers == 0 && snapshotFiles.size() > 0) {
if (mappers == 0 && !snapshotFiles.isEmpty()) {
mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
mappers = Math.min(mappers, snapshotFiles.size());
conf.setInt(CONF_NUM_SPLITS, mappers);
conf.setInt(MR_NUM_MAPS, mappers);
}

List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
List<InputSplit> splits = new ArrayList(groups.size());
for (List<Pair<SnapshotFileInfo, Long>> files : groups) {
splits.add(new ExportSnapshotInputSplit(files));
}
return splits;
Class<? extends CustomFileGrouper> inputFileGrouperClass = conf.getClass(
CONF_INPUT_FILE_GROUPER_CLASS, NoopCustomFileGrouper.class, CustomFileGrouper.class);
CustomFileGrouper customFileGrouper =
ReflectionUtils.newInstance(inputFileGrouperClass, conf);
Collection<Collection<Pair<SnapshotFileInfo, Long>>> groups =
customFileGrouper.getGroupedInputFiles(snapshotFiles);

LOG.info("CustomFileGrouper {} split input files into {} groups", inputFileGrouperClass,
groups.size());
int mappersPerGroup = groups.isEmpty() ? 1 : Math.max(mappers / groups.size(), 1);
LOG.info(
"Splitting each group into {} InputSplits, "
+ "to achieve closest possible amount of mappers to target of {}",
mappersPerGroup, mappers);

// Within each group, create splits of equal size. Groups are not mixed together.
return groups.stream().map(g -> getBalancedSplits(g, mappersPerGroup))
.flatMap(Collection::stream).toList();
}

private static class ExportSnapshotInputSplit extends InputSplit implements Writable {
static class ExportSnapshotInputSplit extends InputSplit implements Writable {

private List<Pair<BytesWritable, Long>> files;
private String[] locations;
private long length;

public ExportSnapshotInputSplit() {
this.files = null;
this.locations = null;
}

public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) {
this.files = new ArrayList(snapshotFiles.size());
public ExportSnapshotInputSplit(final Collection<Pair<SnapshotFileInfo, Long>> snapshotFiles,
FileLocationResolver fileLocationResolver) {
this.files = new ArrayList<>(snapshotFiles.size());
for (Pair<SnapshotFileInfo, Long> fileInfo : snapshotFiles) {
this.files.add(
new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond()));
this.length += fileInfo.getSecond();
}
this.locations =
fileLocationResolver.getLocationsForInputFiles(snapshotFiles).toArray(new String[0]);
LOG.trace("This ExportSnapshotInputSplit has files {} of collective size {}, "
+ "with location hints: {}", files, length, locations);
}

private List<Pair<BytesWritable, Long>> getSplitKeys() {
Expand All @@ -836,7 +934,7 @@ public long getLength() throws IOException, InterruptedException {

@Override
public String[] getLocations() throws IOException, InterruptedException {
return new String[] {};
return locations;
}

@Override
Expand All @@ -851,6 +949,12 @@ public void readFields(DataInput in) throws IOException {
files.add(new Pair<>(fileInfo, size));
length += size;
}
int locationCount = in.readInt();
List<String> locations = new ArrayList<>(locationCount);
for (int i = 0; i < locationCount; ++i) {
locations.add(in.readUTF());
}
this.locations = locations.toArray(new String[0]);
}

@Override
Expand All @@ -860,6 +964,10 @@ public void write(DataOutput out) throws IOException {
fileInfo.getFirst().write(out);
out.writeLong(fileInfo.getSecond());
}
out.writeInt(locations.length);
for (String location : locations) {
out.writeUTF(location);
}
}
}

Expand Down Expand Up @@ -920,7 +1028,8 @@ public boolean nextKeyValue() {
private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName,
final Path snapshotDir, final boolean verifyChecksum, final String filesUser,
final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB,
final String storagePolicy) throws IOException, InterruptedException, ClassNotFoundException {
final String storagePolicy, final String customFileGrouper, final String fileLocationResolver)
throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = getConf();
if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
Expand All @@ -940,6 +1049,12 @@ private void runCopyJob(final Path inputRoot, final Path outputRoot, final Strin
conf.set(generateFamilyStoragePolicyKey(entry.getKey()), entry.getValue());
}
}
if (customFileGrouper != null) {
conf.set(CONF_INPUT_FILE_GROUPER_CLASS, customFileGrouper);
}
if (fileLocationResolver != null) {
conf.set(CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS, fileLocationResolver);
}

String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName);
Job job = new Job(conf);
Expand Down Expand Up @@ -1058,6 +1173,8 @@ private static String generateFamilyStoragePolicyKey(String family) {
private int mappers = 0;
private boolean resetTtl = false;
private String storagePolicy = null;
private String customFileGrouper = null;
private String fileLocationResolver = null;

@Override
protected void processOptions(CommandLine cmd) {
Expand All @@ -1083,6 +1200,12 @@ protected void processOptions(CommandLine cmd) {
if (cmd.hasOption(Options.STORAGE_POLICY.getLongOpt())) {
storagePolicy = cmd.getOptionValue(Options.STORAGE_POLICY.getLongOpt());
}
if (cmd.hasOption(Options.CUSTOM_FILE_GROUPER.getLongOpt())) {
customFileGrouper = cmd.getOptionValue(Options.CUSTOM_FILE_GROUPER.getLongOpt());
}
if (cmd.hasOption(Options.FILE_LOCATION_RESOLVER.getLongOpt())) {
fileLocationResolver = cmd.getOptionValue(Options.FILE_LOCATION_RESOLVER.getLongOpt());
}
}

/**
Expand Down Expand Up @@ -1251,7 +1374,8 @@ public int doWork() throws IOException {
// by the HFileArchiver, since they have no references.
try {
runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser,
filesGroup, filesMode, mappers, bandwidthMB, storagePolicy);
filesGroup, filesMode, mappers, bandwidthMB, storagePolicy, customFileGrouper,
fileLocationResolver);

LOG.info("Finalize the Snapshot Export");
if (!skipTmp) {
Expand Down Expand Up @@ -1309,6 +1433,8 @@ protected void addOptions() {
addOption(Options.MAPPERS);
addOption(Options.BANDWIDTH);
addOption(Options.RESET_TTL);
addOption(Options.CUSTOM_FILE_GROUPER);
addOption(Options.FILE_LOCATION_RESOLVER);
}

public static void main(String[] args) {
Expand Down
Loading