Skip to content

Commit

Permalink
HBASE-27541 Add support for defining restore hfile output path
Browse files Browse the repository at this point in the history
Co-authored-by: Bryan Beaudreault <bbeaudreault@hubspot.com>

Extract tmp restore output dir
  • Loading branch information
Jarryd Lee committed Jan 13, 2023
1 parent 2a7c69d commit f388a5c
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ public interface RestoreJob extends Configurable {
* Run restore operation
* @param dirPaths path array of WAL log directories
* @param fromTables from tables
* @param restoreRootDir output file system
* @param toTables to tables
* @param fullBackupRestore full backup restore
* @throws IOException if running the job fails
*/
void run(Path[] dirPaths, TableName[] fromTables, TableName[] toTables, boolean fullBackupRestore)
throws IOException;
void run(Path[] dirPaths, TableName[] fromTables, Path restoreRootDir, TableName[] toTables,
boolean fullBackupRestore) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public Builder withBackupRootDir(String backupRootDir) {
return this;
}

public Builder withRestoreRootDir(String restoreRootDir) {
request.setRestoreRootDir(restoreRootDir);
return this;
}

public Builder withBackupId(String backupId) {
request.setBackupId(backupId);
return this;
Expand Down Expand Up @@ -68,6 +73,7 @@ public RestoreRequest build() {
}

private String backupRootDir;
private String restoreRootDir;
private String backupId;
private boolean check = false;
private TableName[] fromTables;
Expand All @@ -86,6 +92,15 @@ private RestoreRequest setBackupRootDir(String backupRootDir) {
return this;
}

public String getRestoreRootDir() {
return restoreRootDir;
}

private RestoreRequest setRestoreRootDir(String restoreRootDir) {
this.restoreRootDir = restoreRootDir;
return this;
}

public String getBackupId() {
return backupId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.backup.RestoreRequest;
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.backup.util.RestoreTool;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
Expand All @@ -55,11 +56,12 @@ public class RestoreTablesClient {
private String backupId;
private TableName[] sTableArray;
private TableName[] tTableArray;
private String targetRootDir;
private String backupRootDir;
private Path restoreRootDir;
private boolean isOverwrite;

public RestoreTablesClient(Connection conn, RestoreRequest request) {
this.targetRootDir = request.getBackupRootDir();
public RestoreTablesClient(Connection conn, RestoreRequest request) throws IOException {
this.backupRootDir = request.getBackupRootDir();
this.backupId = request.getBackupId();
this.sTableArray = request.getFromTables();
this.tTableArray = request.getToTables();
Expand All @@ -69,6 +71,12 @@ public RestoreTablesClient(Connection conn, RestoreRequest request) {
this.isOverwrite = request.isOverwrite();
this.conn = conn;
this.conf = conn.getConfiguration();
if (request.getRestoreRootDir() != null) {
restoreRootDir = new Path(request.getRestoreRootDir());
} else {
FileSystem fs = FileSystem.get(conf);
this.restoreRootDir = BackupUtils.getTmpRestoreOutputDir(fs, conf);
}
}

/**
Expand Down Expand Up @@ -131,7 +139,7 @@ private void restoreImages(BackupImage[] images, TableName sTable, TableName tTa
String rootDir = image.getRootDir();
String backupId = image.getBackupId();
Path backupRoot = new Path(rootDir);
RestoreTool restoreTool = new RestoreTool(conf, backupRoot, backupId);
RestoreTool restoreTool = new RestoreTool(conf, backupRoot, restoreRootDir, backupId);
Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId);
String lastIncrBackupId = images.length == 1 ? null : images[images.length - 1].getBackupId();
// We need hFS only for full restore (see the code)
Expand Down Expand Up @@ -249,7 +257,7 @@ public void execute() throws IOException {
// case RESTORE_IMAGES:
HashMap<TableName, BackupManifest> backupManifestMap = new HashMap<>();
// check and load backup image manifest for the tables
Path rootPath = new Path(targetRootDir);
Path rootPath = new Path(backupRootDir);
HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath,
backupId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ public void run(String[] backupIds) throws IOException {
Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
String dirs = StringUtils.join(dirPaths, ",");

Path bulkOutputPath = BackupUtils.getBulkOutputDir(
Path tmpPath = BackupUtils.getTmpRestoreOutputDir(fs, conf);
Path bulkOutputPath = BackupUtils.getBulkOutputDir(tmpPath,
BackupUtils.getFileNameCompatibleString(tableNames[i]), getConf(), false);
// Delete content if exists
if (fs.exists(bulkOutputPath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public MapReduceRestoreJob() {
}

@Override
public void run(Path[] dirPaths, TableName[] tableNames, TableName[] newTableNames,
boolean fullBackupRestore) throws IOException {
public void run(Path[] dirPaths, TableName[] tableNames, Path restoreRootDir,
TableName[] newTableNames, boolean fullBackupRestore) throws IOException {
String bulkOutputConfKey;

player = new MapReduceHFileSplitterJob();
Expand All @@ -70,9 +70,8 @@ public void run(Path[] dirPaths, TableName[] tableNames, TableName[] newTableNam

for (int i = 0; i < tableNames.length; i++) {
LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]);

Path bulkOutputPath = BackupUtils
.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(newTableNames[i]), getConf());
Path bulkOutputPath = BackupUtils.getBulkOutputDir(restoreRootDir,
BackupUtils.getFileNameCompatibleString(newTableNames[i]), getConf());
Configuration conf = getConf();
conf.set(bulkOutputConfKey, bulkOutputPath.toString());
String[] playerArgs = { dirs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,21 +690,32 @@ public static boolean validate(HashMap<TableName, BackupManifest> backupManifest
return isValid;
}

public static Path getBulkOutputDir(String tableName, Configuration conf, boolean deleteOnExit)
throws IOException {
FileSystem fs = FileSystem.get(conf);
String tmp =
conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, fs.getHomeDirectory() + "/hbase-staging");
Path path = new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-"
+ EnvironmentEdgeManager.currentTime());
public static Path getBulkOutputDir(Path restoreRootDir, String tableName, Configuration conf,
boolean deleteOnExit) throws IOException {
FileSystem fs = restoreRootDir.getFileSystem(conf);
Path path = new Path(restoreRootDir,
"bulk_output-" + tableName + "-" + EnvironmentEdgeManager.currentTime());
if (deleteOnExit) {
fs.deleteOnExit(path);
}
return path;
}

public static Path getBulkOutputDir(String tableName, Configuration conf) throws IOException {
return getBulkOutputDir(tableName, conf, true);
/**
* Build temporary output path
* @param fs filesystem for default output dir
* @param conf configuration
* @return output path
*/
public static Path getTmpRestoreOutputDir(FileSystem fs, Configuration conf) {
String tmp =
conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, fs.getHomeDirectory() + "/hbase-staging");
return new Path(tmp);
}

public static Path getBulkOutputDir(Path restoreRootDir, String tableName, Configuration conf)
throws IOException {
return getBulkOutputDir(restoreRootDir, tableName, conf, true);
}

public static String getFileNameCompatibleString(TableName table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,20 @@ public class RestoreTool {
private final String[] ignoreDirs = { HConstants.RECOVERED_EDITS_DIR };
protected Configuration conf;
protected Path backupRootPath;
protected Path restoreRootDir;
protected String backupId;
protected FileSystem fs;

// store table name and snapshot dir mapping
private final HashMap<TableName, Path> snapshotMap = new HashMap<>();

public RestoreTool(Configuration conf, final Path backupRootPath, final String backupId)
throws IOException {
public RestoreTool(Configuration conf, final Path backupRootPath, final Path restoreRootDir,
final String backupId) throws IOException {
this.conf = conf;
this.backupRootPath = backupRootPath;
this.backupId = backupId;
this.fs = backupRootPath.getFileSystem(conf);
this.restoreRootDir = restoreRootDir;
}

/**
Expand Down Expand Up @@ -200,7 +202,7 @@ public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[
}
RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf);

restoreService.run(logDirs, tableNames, newTableNames, false);
restoreService.run(logDirs, tableNames, restoreRootDir, newTableNames, false);
}
}

Expand Down Expand Up @@ -350,8 +352,8 @@ private void createAndRestoreTable(Connection conn, TableName tableName, TableNa
RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf);
Path[] paths = new Path[regionPathList.size()];
regionPathList.toArray(paths);
restoreService.run(paths, new TableName[] { tableName }, new TableName[] { newTableName },
true);
restoreService.run(paths, new TableName[] { tableName }, restoreRootDir,
new TableName[] { newTableName }, true);

} catch (Exception e) {
LOG.error(e.toString(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ public Path run() {
Path bulkOutputDir = ugi.doAs(new PrivilegedAction<Path>() {
@Override
public Path run() {
try {
return BackupUtils.getBulkOutputDir("test", conf, false);
try (FileSystem fs = FileSystem.get(conf)) {
Path tmpPath = BackupUtils.getTmpRestoreOutputDir(fs, conf);
return BackupUtils.getBulkOutputDir(tmpPath, "test", conf, false);
} catch (IOException ioe) {
LOG.error("Failed to get bulk output dir path", ioe);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public void run(String[] backupIds) throws IOException {
// Find input directories for table
Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
String dirs = StringUtils.join(dirPaths, ",");
Path bulkOutputPath = BackupUtils.getBulkOutputDir(
Path tmpPath = BackupUtils.getTmpRestoreOutputDir(fs, conf);
Path bulkOutputPath = BackupUtils.getBulkOutputDir(tmpPath,
BackupUtils.getFileNameCompatibleString(tableNames[i]), getConf(), false);
// Delete content if exists
if (fs.exists(bulkOutputPath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,21 @@
*/
package org.apache.hadoop.hbase.backup;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.BeforeClass;
import org.junit.ClassRule;
Expand All @@ -49,6 +57,7 @@ public class TestRemoteRestore extends TestBackupBase {
public static void setUp() throws Exception {
TEST_UTIL = new HBaseTestingUtil();
conf1 = TEST_UTIL.getConfiguration();
conf1.set(HConstants.CLUSTER_DISTRIBUTED, "true");
useSecondCluster = true;
setUpHelper();
}
Expand All @@ -72,4 +81,51 @@ public void testFullRestoreRemote() throws Exception {
TEST_UTIL.deleteTable(table1_restore);
hba.close();
}

/**
* Verify that restore jobs can be run on a standalone mapreduce cluster. Ensures hfiles output
* via {@link MapReduceHFileSplitterJob} exist on correct filesystem.
* @throws Exception if doing the backup or an operation on the tables fails
*/
@Test
public void testFullRestoreRemoteWithAlternateRestoreOutputDir() throws Exception {
LOG.info("test remote full backup on a single table with alternate restore output dir");
String backupId =
backupTables(BackupType.FULL, toList(table1.getNameAsString()), BACKUP_REMOTE_ROOT_DIR);
LOG.info("backup complete");
TableName[] tableset = new TableName[] { table1 };
TableName[] tablemap = new TableName[] { table1_restore };

HBaseTestingUtil mrTestUtil = new HBaseTestingUtil();
mrTestUtil.setZkCluster(TEST_UTIL.getZkCluster());
mrTestUtil.startMiniDFSCluster(3);
mrTestUtil.startMiniMapReduceCluster();

Configuration testUtilConf = TEST_UTIL.getConnection().getConfiguration();
Configuration conf = new Configuration(mrTestUtil.getConfiguration());
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT,
testUtilConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
conf.set(HConstants.MASTER_ADDRS_KEY, testUtilConf.get(HConstants.MASTER_ADDRS_KEY));

new BackupAdminImpl(ConnectionFactory.createConnection(conf))
.restore(new RestoreRequest.Builder().withBackupRootDir(BACKUP_REMOTE_ROOT_DIR)
.withRestoreRootDir(BACKUP_ROOT_DIR).withBackupId(backupId).withCheck(false)
.withFromTables(tableset).withToTables(tablemap).withOvewrite(false).build());

Path hfileOutputPath = new Path(
new Path(conf.get(MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY)).toUri().getPath());

// files exist on hbase cluster
FileSystem fileSystem = FileSystem.get(TEST_UTIL.getConfiguration());
assertTrue(fileSystem.exists(hfileOutputPath));

// files don't exist on MR cluster
fileSystem = FileSystem.get(conf);
assertFalse(fileSystem.exists(hfileOutputPath));

Admin hba = TEST_UTIL.getAdmin();
assertTrue(hba.tableExists(table1_restore));
TEST_UTIL.deleteTable(table1_restore);
hba.close();
}
}

0 comments on commit f388a5c

Please sign in to comment.