diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 52f824c5ddae..7d8d2bb5ef66 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -122,8 +122,7 @@ protected static int getIndex(TableName tbl, List sTableList) { * @param tablesToBackup list of tables to be backed up */ protected List handleBulkLoad(List tablesToBackup) throws IOException { - List activeFiles = new ArrayList<>(); - List archiveFiles = new ArrayList<>(); + Map toBulkload = new HashMap<>(); List bulkLoads = backupManager.readBulkloadRows(tablesToBackup); FileSystem tgtFs; try { @@ -136,6 +135,8 @@ protected List handleBulkLoad(List tablesToBackup) throws I for (BulkLoad bulkLoad : bulkLoads) { TableName srcTable = bulkLoad.getTableName(); + MergeSplitBulkloadInfo bulkloadInfo = + toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new); String regionName = bulkLoad.getRegion(); String fam = bulkLoad.getColumnFamily(); String filename = FilenameUtils.getName(bulkLoad.getHfilePath()); @@ -165,13 +166,18 @@ protected List handleBulkLoad(List tablesToBackup) throws I srcTableQualifier); LOG.trace("copying {} to {}", p, tgt); } - activeFiles.add(p.toString()); + bulkloadInfo.addActiveFile(p.toString()); } else if (fs.exists(archive)) { LOG.debug("copying archive {} to {}", archive, tgt); - archiveFiles.add(archive.toString()); + bulkloadInfo.addArchiveFiles(archive.toString()); } - mergeSplitAndCopyBulkloadedHFiles(activeFiles, archiveFiles, srcTable, tgtFs); } + + for (MergeSplitBulkloadInfo bulkloadInfo : toBulkload.values()) { + mergeSplitAndCopyBulkloadedHFiles(bulkloadInfo.getActiveFiles(), + bulkloadInfo.getArchiveFiles(), bulkloadInfo.getSrcTable(), tgtFs); + } + return bulkLoads; } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/MergeSplitBulkloadInfo.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/MergeSplitBulkloadInfo.java new file mode 100644 index 000000000000..95243aebc467 --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/MergeSplitBulkloadInfo.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hbase.TableName; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +class MergeSplitBulkloadInfo { + private final List activeFiles = new ArrayList<>(); + private final List archiveFiles = new ArrayList<>(); + + private final TableName srcTable; + + public MergeSplitBulkloadInfo(TableName srcTable) { + this.srcTable = srcTable; + } + + public TableName getSrcTable() { + return srcTable; + } + + public List getArchiveFiles() { + return archiveFiles; + } + + public List getActiveFiles() { + return activeFiles; + } + + public void addActiveFile(String file) { + activeFiles.add(file); + } + + public void addArchiveFiles(String file) { + archiveFiles.add(file); + } +}