From de10df305952e2eb34860f5400735e3384f5b5d8 Mon Sep 17 00:00:00 2001 From: Charles Connell Date: Sat, 21 Jun 2025 10:30:21 -0400 Subject: [PATCH 1/4] HBASE-29432: Provide mechanism to plug in rack or host locality logic into ExportSnapshot --- .../hadoop/hbase/snapshot/ExportSnapshot.java | 153 ++++++++++++++++-- 1 file changed, 136 insertions(+), 17 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java index 5e87075d70a5..ca7c9b1761d4 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -37,6 +38,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.function.BiConsumer; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -75,12 +77,15 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; @@ -119,6 +124,10 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool { private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; private static final String CONF_MR_JOB_NAME = "mapreduce.job.name"; + private static final String CONF_INPUT_FILE_GROUPER_CLASS = + "snapshot.export.input.file.grouper.class"; + private static final String CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS = + "snapshot.export.input.file.location.resolver.class"; protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp"; private static final String CONF_COPY_MANIFEST_THREADS = "snapshot.export.copy.references.threads"; @@ -164,6 +173,10 @@ static final class Options { new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot"); static final Option STORAGE_POLICY = new Option(null, "storage-policy", true, "Storage policy for export snapshot output directory, with format like: f=HOT&g=ALL_SDD"); + static final Option CUSTOM_FILE_GROUPER = new Option(null, "custom-file-grouper", true, + "Fully qualified class name of an implementation of ExportSnapshot.CustomFileGrouper. See JavaDoc on that class for more information."); + static final Option FILE_LOCATION_RESOLVER = new Option(null, "file-location-resolver", true, + "Fully qualified class name of an implementation of ExportSnapshot.FileLocationResolver. See JavaDoc on that class for more information."); } // Export Map-Reduce Counters, to keep track of the progress @@ -186,6 +199,54 @@ public enum ChecksumComparison { INCOMPATIBLE, // checksum comparison is not compatible. } + /** + * If desired, you may implement a CustomFileGrouper in order to influence how ExportSnapshot + * chooses which input files go into the MapReduce job's {@link InputSplit}s. Your implementation + * must return a data structure that contains each input file exactly once. Files that appear in + * separate entries in the top-level returned Collection are guaranteed to not be placed in the + * same InputSplit. This can be used to segregate your input files by the rack or host on which + * they are available, which, used in conjunction with {@link FileLocationResolver}, can improve + * the performance of your ExportSnapshot runs. To use this, pass the --custom-file-grouper + * argument with the fully qualified class name of an implementation of CustomFileGrouper that's + * on the classpath. If this argument is not used, no particular grouping logic will be applied. + */ + @InterfaceAudience.Public + public interface CustomFileGrouper { + Collection>> + getGroupedInputFiles(final Collection> snapshotFiles); + } + + private static class NoopCustomFileGrouper implements CustomFileGrouper { + @Override + public Collection>> + getGroupedInputFiles(final Collection> snapshotFiles) { + return ImmutableList.of(snapshotFiles); + } + } + + /** + * If desired, you may implement a FileLocationResolver in order to influence the _location_ + * metadata attached to each {@link InputSplit} that ExportSnapshot will submit to YARN. The + * method {@link #getLocationsForInputFiles(Collection)} method is called once for each InputSplit + * being constructed. Whatever is returned will ultimately be reported by that split's + * {@link InputSplit#getLocations()} method. This can be used to encourage YARN to schedule the + * ExportSnapshot's mappers on rack-local or host-local NodeManagers. To use this, pass the + * --file-location-resolver argument with the fully qualified class name of an implementation of + * FileLocationResolver that's on the classpath. If this argument is not used, no locations will + * be attached to the InputSplits. + */ + @InterfaceAudience.Public + public interface FileLocationResolver { + Set getLocationsForInputFiles(final Collection> files); + } + + private static class NoopFileLocationResolver implements FileLocationResolver { + @Override + public Set getLocationsForInputFiles(Collection> files) { + return ImmutableSet.of(); + } + } + private static class ExportMapper extends Mapper { private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class); @@ -724,8 +785,9 @@ private static Pair getSnapshotFileAndSize(FileSystem fs * The algorithm used is pretty straightforward; the file list is sorted by size, and then each * group fetch the bigger file available, iterating through groups alternating the direction. */ - static List>> - getBalancedSplits(final List> files, final int ngroups) { + static List>> getBalancedSplits( + final Collection> unsortedFiles, final int ngroups) { + List> files = new ArrayList<>(unsortedFiles); // Sort files by size, from small to big Collections.sort(files, new Comparator>() { public int compare(Pair a, Pair b) { @@ -769,12 +831,6 @@ public int compare(Pair a, Pair } } - if (LOG.isDebugEnabled()) { - for (int i = 0; i < sizeGroups.length; ++i) { - LOG.debug("export split=" + i + " size=" + Strings.humanReadableInt(sizeGroups[i])); - } - } - return fileGroups; } @@ -792,6 +848,7 @@ public List getSplits(JobContext context) throws IOException, Interr FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf); List> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir); + int mappers = conf.getInt(CONF_NUM_SPLITS, 0); if (mappers == 0 && snapshotFiles.size() > 0) { mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10)); @@ -800,29 +857,63 @@ public List getSplits(JobContext context) throws IOException, Interr conf.setInt(MR_NUM_MAPS, mappers); } - List>> groups = getBalancedSplits(snapshotFiles, mappers); - List splits = new ArrayList(groups.size()); - for (List> files : groups) { - splits.add(new ExportSnapshotInputSplit(files)); + Class inputFileGrouperClass = conf.getClass( + CONF_INPUT_FILE_GROUPER_CLASS, NoopCustomFileGrouper.class, CustomFileGrouper.class); + CustomFileGrouper customFileGrouper = + ReflectionUtils.newInstance(inputFileGrouperClass, conf); + Collection>> groups = + customFileGrouper.getGroupedInputFiles(snapshotFiles); + + LOG.info("CustomFileGrouper {} split input files into {} groups", inputFileGrouperClass, + groups.size()); + int mappersPerGroup = groups.isEmpty() ? 1 : Math.max(mappers / groups.size(), 1); + LOG.info( + "Splitting each group into {} InputSplits, to achieve closest possible amount of mappers to target of {}", + mappersPerGroup, mappers); + + Collection>> balancedGroups = + groups.stream().map(g -> getBalancedSplits(g, mappersPerGroup)).flatMap(Collection::stream) + .collect(Collectors.toList()); + + Class fileLocationResolverClass = + conf.getClass(CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS, NoopFileLocationResolver.class, + FileLocationResolver.class); + FileLocationResolver fileLocationResolver = + ReflectionUtils.newInstance(fileLocationResolverClass, conf); + LOG.info("FileLocationResolver {} will provide location metadata for each InputSplit", + fileLocationResolverClass); + + List splits = new ArrayList<>(balancedGroups.size()); + for (Collection> files : balancedGroups) { + splits.add(new ExportSnapshotInputSplit(files, fileLocationResolver)); } return splits; } private static class ExportSnapshotInputSplit extends InputSplit implements Writable { + private List> files; + private String[] locations; private long length; public ExportSnapshotInputSplit() { this.files = null; + this.locations = null; } - public ExportSnapshotInputSplit(final List> snapshotFiles) { - this.files = new ArrayList(snapshotFiles.size()); + public ExportSnapshotInputSplit(final Collection> snapshotFiles, + FileLocationResolver fileLocationResolver) { + this.files = new ArrayList<>(snapshotFiles.size()); for (Pair fileInfo : snapshotFiles) { this.files.add( new Pair<>(new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond())); this.length += fileInfo.getSecond(); } + this.locations = + fileLocationResolver.getLocationsForInputFiles(snapshotFiles).toArray(new String[0]); + LOG.trace( + "This ExportSnapshotInputSplit has files {} of collective size {}, with location hints: {}", + files, length, locations); } private List> getSplitKeys() { @@ -836,7 +927,7 @@ public long getLength() throws IOException, InterruptedException { @Override public String[] getLocations() throws IOException, InterruptedException { - return new String[] {}; + return locations; } @Override @@ -851,6 +942,12 @@ public void readFields(DataInput in) throws IOException { files.add(new Pair<>(fileInfo, size)); length += size; } + int locationCount = in.readInt(); + List locations = new ArrayList<>(locationCount); + for (int i = 0; i < locationCount; ++i) { + locations.add(in.readUTF()); + } + this.locations = locations.toArray(new String[0]); } @Override @@ -860,6 +957,10 @@ public void write(DataOutput out) throws IOException { fileInfo.getFirst().write(out); out.writeLong(fileInfo.getSecond()); } + out.writeInt(locations.length); + for (String location : locations) { + out.writeUTF(location); + } } } @@ -920,7 +1021,8 @@ public boolean nextKeyValue() { private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName, final Path snapshotDir, final boolean verifyChecksum, final String filesUser, final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB, - final String storagePolicy) throws IOException, InterruptedException, ClassNotFoundException { + final String storagePolicy, final String customFileGrouper, final String fileLocationResolver) + throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = getConf(); if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); if (filesUser != null) conf.set(CONF_FILES_USER, filesUser); @@ -940,6 +1042,12 @@ private void runCopyJob(final Path inputRoot, final Path outputRoot, final Strin conf.set(generateFamilyStoragePolicyKey(entry.getKey()), entry.getValue()); } } + if (customFileGrouper != null) { + conf.set(CONF_INPUT_FILE_GROUPER_CLASS, customFileGrouper); + } + if (fileLocationResolver != null) { + conf.set(CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS, fileLocationResolver); + } String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName); Job job = new Job(conf); @@ -1058,6 +1166,8 @@ private static String generateFamilyStoragePolicyKey(String family) { private int mappers = 0; private boolean resetTtl = false; private String storagePolicy = null; + private String customFileGrouper = null; + private String fileLocationResolver = null; @Override protected void processOptions(CommandLine cmd) { @@ -1083,6 +1193,12 @@ protected void processOptions(CommandLine cmd) { if (cmd.hasOption(Options.STORAGE_POLICY.getLongOpt())) { storagePolicy = cmd.getOptionValue(Options.STORAGE_POLICY.getLongOpt()); } + if (cmd.hasOption(Options.CUSTOM_FILE_GROUPER.getLongOpt())) { + customFileGrouper = cmd.getOptionValue(Options.CUSTOM_FILE_GROUPER.getLongOpt()); + } + if (cmd.hasOption(Options.FILE_LOCATION_RESOLVER.getLongOpt())) { + fileLocationResolver = cmd.getOptionValue(Options.FILE_LOCATION_RESOLVER.getLongOpt()); + } } /** @@ -1251,7 +1367,8 @@ public int doWork() throws IOException { // by the HFileArchiver, since they have no references. try { runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser, - filesGroup, filesMode, mappers, bandwidthMB, storagePolicy); + filesGroup, filesMode, mappers, bandwidthMB, storagePolicy, customFileGrouper, + fileLocationResolver); LOG.info("Finalize the Snapshot Export"); if (!skipTmp) { @@ -1309,6 +1426,8 @@ protected void addOptions() { addOption(Options.MAPPERS); addOption(Options.BANDWIDTH); addOption(Options.RESET_TTL); + addOption(Options.CUSTOM_FILE_GROUPER); + addOption(Options.FILE_LOCATION_RESOLVER); } public static void main(String[] args) { From d0eb852aecd1511ace1b0a398e3189e6480bdfcf Mon Sep 17 00:00:00 2001 From: Charles Connell Date: Fri, 27 Jun 2025 11:52:30 -0400 Subject: [PATCH 2/4] nits from the robots --- .../hadoop/hbase/snapshot/ExportSnapshot.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java index ca7c9b1761d4..f15e9b00ae1c 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java @@ -174,9 +174,11 @@ static final class Options { static final Option STORAGE_POLICY = new Option(null, "storage-policy", true, "Storage policy for export snapshot output directory, with format like: f=HOT&g=ALL_SDD"); static final Option CUSTOM_FILE_GROUPER = new Option(null, "custom-file-grouper", true, - "Fully qualified class name of an implementation of ExportSnapshot.CustomFileGrouper. See JavaDoc on that class for more information."); + "Fully qualified class name of an implementation of ExportSnapshot.CustomFileGrouper. " + + "See JavaDoc on that class for more information."); static final Option FILE_LOCATION_RESOLVER = new Option(null, "file-location-resolver", true, - "Fully qualified class name of an implementation of ExportSnapshot.FileLocationResolver. See JavaDoc on that class for more information."); + "Fully qualified class name of an implementation of ExportSnapshot.FileLocationResolver. " + + "See JavaDoc on that class for more information."); } // Export Map-Reduce Counters, to keep track of the progress @@ -798,7 +800,6 @@ public int compare(Pair a, Pair // create balanced groups List>> fileGroups = new LinkedList<>(); - long[] sizeGroups = new long[ngroups]; int hi = files.size() - 1; int lo = 0; @@ -817,7 +818,6 @@ public int compare(Pair a, Pair Pair fileInfo = files.get(hi--); // add the hi one - sizeGroups[g] += fileInfo.getSecond(); group.add(fileInfo); // change direction when at the end or the beginning @@ -868,7 +868,8 @@ public List getSplits(JobContext context) throws IOException, Interr groups.size()); int mappersPerGroup = groups.isEmpty() ? 1 : Math.max(mappers / groups.size(), 1); LOG.info( - "Splitting each group into {} InputSplits, to achieve closest possible amount of mappers to target of {}", + "Splitting each group into {} InputSplits, " + + "to achieve closest possible amount of mappers to target of {}", mappersPerGroup, mappers); Collection>> balancedGroups = @@ -911,9 +912,8 @@ public ExportSnapshotInputSplit(final Collection> s } this.locations = fileLocationResolver.getLocationsForInputFiles(snapshotFiles).toArray(new String[0]); - LOG.trace( - "This ExportSnapshotInputSplit has files {} of collective size {}, with location hints: {}", - files, length, locations); + LOG.trace("This ExportSnapshotInputSplit has files {} of collective size {}, " + + "with location hints: {}", files, length, locations); } private List> getSplitKeys() { From 2206318bc7c028851c297249e5704c043866316f Mon Sep 17 00:00:00 2001 From: Charles Connell Date: Mon, 30 Jun 2025 13:42:02 -0400 Subject: [PATCH 3/4] Unit testing --- .../hadoop/hbase/snapshot/ExportSnapshot.java | 52 +++--- .../snapshot/TestExportSnapshotHelpers.java | 175 +++++++++++++++++- 2 files changed, 203 insertions(+), 24 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java index f15e9b00ae1c..163af11e854b 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java @@ -38,7 +38,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.function.BiConsumer; -import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -166,7 +165,8 @@ static final class Options { static final Option CHMOD = new Option(null, "chmod", true, "Change the permission of the files to the specified one."); static final Option MAPPERS = new Option(null, "mappers", true, - "Number of mappers to use during the copy (mapreduce.job.maps)."); + "Number of mappers to use during the copy (mapreduce.job.maps). " + + "If you provide a --custom-file-grouper, then --mappers is interpreted as the number of mappers per group."); static final Option BANDWIDTH = new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second."); static final Option RESET_TTL = @@ -242,7 +242,7 @@ public interface FileLocationResolver { Set getLocationsForInputFiles(final Collection> files); } - private static class NoopFileLocationResolver implements FileLocationResolver { + static class NoopFileLocationResolver implements FileLocationResolver { @Override public Set getLocationsForInputFiles(Collection> files) { return ImmutableSet.of(); @@ -834,7 +834,7 @@ public int compare(Pair a, Pair return fileGroups; } - private static class ExportSnapshotInputFormat extends InputFormat { + static class ExportSnapshotInputFormat extends InputFormat { @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext tac) throws IOException, InterruptedException { @@ -849,8 +849,28 @@ public List getSplits(JobContext context) throws IOException, Interr List> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir); + Collection>> balancedGroups = + groupFilesForSplits(conf, snapshotFiles); + + Class fileLocationResolverClass = + conf.getClass(CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS, NoopFileLocationResolver.class, + FileLocationResolver.class); + FileLocationResolver fileLocationResolver = + ReflectionUtils.newInstance(fileLocationResolverClass, conf); + LOG.info("FileLocationResolver {} will provide location metadata for each InputSplit", + fileLocationResolverClass); + + List splits = new ArrayList<>(balancedGroups.size()); + for (Collection> files : balancedGroups) { + splits.add(new ExportSnapshotInputSplit(files, fileLocationResolver)); + } + return splits; + } + + Collection>> groupFilesForSplits(Configuration conf, + List> snapshotFiles) { int mappers = conf.getInt(CONF_NUM_SPLITS, 0); - if (mappers == 0 && snapshotFiles.size() > 0) { + if (mappers == 0 && !snapshotFiles.isEmpty()) { mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10)); mappers = Math.min(mappers, snapshotFiles.size()); conf.setInt(CONF_NUM_SPLITS, mappers); @@ -872,26 +892,12 @@ public List getSplits(JobContext context) throws IOException, Interr + "to achieve closest possible amount of mappers to target of {}", mappersPerGroup, mappers); - Collection>> balancedGroups = - groups.stream().map(g -> getBalancedSplits(g, mappersPerGroup)).flatMap(Collection::stream) - .collect(Collectors.toList()); - - Class fileLocationResolverClass = - conf.getClass(CONF_INPUT_FILE_LOCATION_RESOLVER_CLASS, NoopFileLocationResolver.class, - FileLocationResolver.class); - FileLocationResolver fileLocationResolver = - ReflectionUtils.newInstance(fileLocationResolverClass, conf); - LOG.info("FileLocationResolver {} will provide location metadata for each InputSplit", - fileLocationResolverClass); - - List splits = new ArrayList<>(balancedGroups.size()); - for (Collection> files : balancedGroups) { - splits.add(new ExportSnapshotInputSplit(files, fileLocationResolver)); - } - return splits; + // Within each group, create splits of equal size. Groups are not mixed together. + return groups.stream().map(g -> getBalancedSplits(g, mappersPerGroup)) + .flatMap(Collection::stream).toList(); } - private static class ExportSnapshotInputSplit extends InputSplit implements Writable { + static class ExportSnapshotInputSplit extends InputSplit implements Writable { private List> files; private String[] locations; diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotHelpers.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotHelpers.java index 71402d0989de..72ca0c3f7c29 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotHelpers.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotHelpers.java @@ -18,9 +18,14 @@ package org.apache.hadoop.hbase.snapshot; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -48,7 +53,7 @@ public class TestExportSnapshotHelpers { * assign to each group a file, going back and forth through the groups. */ @Test - public void testBalanceSplit() throws Exception { + public void testBalanceSplit() { // Create a list of files List> files = new ArrayList<>(21); for (long i = 0; i <= 20; i++) { @@ -89,4 +94,172 @@ private void verifyBalanceSplit(final List> split, } assertEquals(expectedSize, totalSize); } + + @Test + public void testGroupFilesForSplitsWithoutCustomFileGrouper() { + List> files = new ArrayList<>(); + for (long i = 0; i < 10; i++) { + SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE) + .setHfile("file-" + i).build(); + files.add(new Pair<>(fileInfo, i * 10)); + } + + Configuration conf = new Configuration(); + conf.setInt("snapshot.export.format.splits", 3); + + ExportSnapshot.ExportSnapshotInputFormat inputFormat = + new ExportSnapshot.ExportSnapshotInputFormat(); + Collection>> groups = + inputFormat.groupFilesForSplits(conf, files); + + assertEquals("Should create 3 groups", 3, groups.size()); + + long totalSize = 0; + int totalFiles = 0; + for (List> group : groups) { + for (Pair file : group) { + totalSize += file.getSecond(); + totalFiles++; + } + } + + assertEquals("All files should be included", 10, totalFiles); + assertEquals("Total size should be preserved", 450, totalSize); + } + + @Test + public void testGroupFilesForSplitsWithCustomFileGrouper() { + List> files = new ArrayList<>(); + for (long i = 0; i < 8; i++) { + SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE) + .setHfile("file-" + i).build(); + files.add(new Pair<>(fileInfo, i * 5)); + } + + Configuration conf = new Configuration(); + conf.setInt("snapshot.export.format.splits", 4); + conf.setClass("snapshot.export.input.file.grouper.class", TestCustomFileGrouper.class, + ExportSnapshot.CustomFileGrouper.class); + + ExportSnapshot.ExportSnapshotInputFormat inputFormat = + new ExportSnapshot.ExportSnapshotInputFormat(); + Collection>> groups = + inputFormat.groupFilesForSplits(conf, files); + + assertEquals("Should create splits based on custom grouper output", 4, groups.size()); + + long totalSize = 0; + int totalFiles = 0; + for (List> group : groups) { + for (Pair file : group) { + totalSize += file.getSecond(); + totalFiles++; + } + } + + assertEquals("All files should be included", 8, totalFiles); + assertEquals("Total size should be preserved", 140, totalSize); + } + + @Test + public void testFileLocationResolverWithNoopResolver() { + List> files = new ArrayList<>(); + for (long i = 0; i < 3; i++) { + SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE) + .setHfile("file-" + i).build(); + files.add(new Pair<>(fileInfo, i * 10)); + } + + ExportSnapshot.NoopFileLocationResolver resolver = + new ExportSnapshot.NoopFileLocationResolver(); + Set locations = resolver.getLocationsForInputFiles(files); + + assertTrue("NoopFileLocationResolver should return empty locations", locations.isEmpty()); + } + + @Test + public void testFileLocationResolverWithCustomResolver() { + List> files = new ArrayList<>(); + for (long i = 0; i < 3; i++) { + SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE) + .setHfile("file-" + i).build(); + files.add(new Pair<>(fileInfo, i * 10)); + } + + TestFileLocationResolver resolver = new TestFileLocationResolver(); + Set locations = resolver.getLocationsForInputFiles(files); + + assertEquals("Should return expected locations", 2, locations.size()); + assertTrue("Should contain rack1", locations.contains("rack1")); + assertTrue("Should contain rack2", locations.contains("rack2")); + } + + @Test + public void testInputSplitWithFileLocationResolver() { + List> files = new ArrayList<>(); + for (long i = 0; i < 3; i++) { + SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder().setType(SnapshotFileInfo.Type.HFILE) + .setHfile("file-" + i).build(); + files.add(new Pair<>(fileInfo, i * 10)); + } + + TestFileLocationResolver resolver = new TestFileLocationResolver(); + ExportSnapshot.ExportSnapshotInputFormat.ExportSnapshotInputSplit split = + new ExportSnapshot.ExportSnapshotInputFormat.ExportSnapshotInputSplit(files, resolver); + + try { + String[] locations = split.getLocations(); + assertEquals("Should return 2 locations", 2, locations.length); + + boolean hasRack1 = false; + boolean hasRack2 = false; + for (String location : locations) { + if ("rack1".equals(location)) { + hasRack1 = true; + } + if ("rack2".equals(location)) { + hasRack2 = true; + } + } + + assertTrue("Should contain rack1", hasRack1); + assertTrue("Should contain rack2", hasRack2); + } catch (Exception e) { + throw new RuntimeException("Failed to get locations", e); + } + } + + public static class TestCustomFileGrouper implements ExportSnapshot.CustomFileGrouper { + @Override + public Collection>> + getGroupedInputFiles(Collection> snapshotFiles) { + List>> groups = new ArrayList<>(); + List> group1 = new ArrayList<>(); + List> group2 = new ArrayList<>(); + + int count = 0; + for (Pair file : snapshotFiles) { + if (count % 2 == 0) { + group1.add(file); + } else { + group2.add(file); + } + count++; + } + + groups.add(group1); + groups.add(group2); + return groups; + } + } + + public static class TestFileLocationResolver implements ExportSnapshot.FileLocationResolver { + @Override + public Set getLocationsForInputFiles(Collection> files) { + Set locations = new HashSet<>(); + locations.add("rack1"); + locations.add("rack2"); + return locations; + } + } } From ee6e56b65d9bd27108fa7689aa5dcfcdb7dc41ec Mon Sep 17 00:00:00 2001 From: Charles Connell Date: Mon, 30 Jun 2025 18:53:01 -0400 Subject: [PATCH 4/4] checkstyle --- .../java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java index 163af11e854b..0b6c796c3950 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java @@ -166,7 +166,8 @@ static final class Options { new Option(null, "chmod", true, "Change the permission of the files to the specified one."); static final Option MAPPERS = new Option(null, "mappers", true, "Number of mappers to use during the copy (mapreduce.job.maps). " - + "If you provide a --custom-file-grouper, then --mappers is interpreted as the number of mappers per group."); + + "If you provide a --custom-file-grouper, " + + "then --mappers is interpreted as the number of mappers per group."); static final Option BANDWIDTH = new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second."); static final Option RESET_TTL =