Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,7 @@ private void deleteAllBackupWALFiles(Configuration conf, String backupWalDir)
new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf, backupWalDir);
FileSystem fs = manager.getBackupFs();
Path walDir = manager.getWalsDir();
Path bulkloadDir = manager.getBulkLoadFilesDir();

// Delete contents under WAL directory
if (fs.exists(walDir)) {
Expand All @@ -1014,6 +1015,15 @@ private void deleteAllBackupWALFiles(Configuration conf, String backupWalDir)
System.out.println("Deleted all contents under WAL directory: " + walDir);
}

// Delete contents under bulk load directory
if (fs.exists(bulkloadDir)) {
FileStatus[] bulkContents = fs.listStatus(bulkloadDir);
for (FileStatus item : bulkContents) {
fs.delete(item.getPath(), true); // recursive delete of each child
}
System.out.println("Deleted all contents under Bulk Load directory: " + bulkloadDir);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific reason you chose to loop and delete the contents individually?
I think a single recursive fs.delete() here would reduce RPCs (like in line 1072 in deleteOldWALFiles())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason is to preserve the parent directories. In this case, I want to delete all the WAL directories and bulkload directories, but still keep the root directory intact. This way, when we restart the continuous backup to the same backup directory, the required directory structure will already be in place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the required directory isn't exist, e.g. the first using backup, what would that be ? will we create it?

the problem is that this list and per-call in s3 could be experience , that's why S3A has the feature of fs.s3a.multiobjectdelete.enable (with fs.delete path recursively), if we do this way here, and if the list of bulkloaded files are huge within the bulkload directory, you will hit s3 throttling very easily, and we will need to write/folk the same implementation here.

so, instead of having the same required directory structure in place, I suggested you handle it as an enable/reenable check for the required directory structure, and create it to avoid large mount of per-object delete call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One clarification from my side:

Our current backup directory structure looks like this:

-- wal_backup_directory/
     -- WALs/
          -- 23-08-2025/
               ... wal files
          -- 24-08-2025/
          -- 25-08-2025/
     -- bulk-load-files/
          -- 23-08-2025/
               ... bulkload files
          -- 24-08-2025/
          -- 25-08-2025/

As you can see, when we loop and delete, it’s done at the day-wise directory level, not at the individual file level. So the number of delete operations is relatively small.

Regarding re-creation: when we enable a replication peer, there isn’t any placeholder mechanism to re-create this directory structure. In the enable/disable flow, everything remains running, the only difference is that replication is paused and then resumed. Once enabled again, entries are sent to the replication endpoint directly.

It’s only during a restart that things are different. On restart, we instantiate the ContinuousBackupReplicationEndpoint again, and in its constructor we can re-create the directory structure if it’s missing. With enable/disable, this re-creation step isn’t possible.

}

} catch (IOException e) {
System.out.println("WARNING: Failed to delete contents under backup directories: "
+ backupWalDir + ". Error: " + e.getMessage());
Expand All @@ -1022,7 +1032,7 @@ private void deleteAllBackupWALFiles(Configuration conf, String backupWalDir)
}

/**
* Cleans up old WAL files based on the determined cutoff timestamp.
* Cleans up old WAL and bulk-loaded files based on the determined cutoff timestamp.
*/
void deleteOldWALFiles(Configuration conf, String backupWalDir, long cutoffTime)
throws IOException {
Expand All @@ -1033,6 +1043,7 @@ void deleteOldWALFiles(Configuration conf, String backupWalDir, long cutoffTime)
new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf, backupWalDir);
FileSystem fs = manager.getBackupFs();
Path walDir = manager.getWalsDir();
Path bulkloadDir = manager.getBulkLoadFilesDir();

SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
Expand All @@ -1058,6 +1069,9 @@ void deleteOldWALFiles(Configuration conf, String backupWalDir, long cutoffTime)
if (dayStart + ONE_DAY_IN_MILLISECONDS - 1 < cutoffTime) {
System.out.println("Deleting outdated WAL directory: " + dirPath);
fs.delete(dirPath, true);
Path bulkloadPath = new Path(bulkloadDir, dirName);
System.out.println("Deleting corresponding bulk-load directory: " + bulkloadPath);
fs.delete(bulkloadPath, true);
}
} catch (ParseException e) {
System.out.println("WARNING: Failed to parse directory name '" + dirName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
Expand Down Expand Up @@ -164,7 +165,7 @@ public void testSingleBackupForceDelete() throws Exception {

// Step 6: Verify that the backup WAL directory is empty
assertTrue("WAL backup directory should be empty after force delete",
isWalDirsEmpty(conf1, backupWalDir.toString()));
areWalAndBulkloadDirsEmpty(conf1, backupWalDir.toString()));

// Step 7: Take new full backup with continuous backup enabled
String backupIdContinuous = fullTableBackupWithContinuous(Lists.newArrayList(table1));
Expand All @@ -189,38 +190,49 @@ private void setupBackupFolders(long currentTime) throws IOException {
public static void setupBackupFolders(FileSystem fs, Path backupWalDir, long currentTime)
throws IOException {
Path walsDir = new Path(backupWalDir, WALS_DIR);
Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR);

fs.mkdirs(walsDir);
fs.mkdirs(bulkLoadDir);

SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);

for (int i = 0; i < 5; i++) {
String dateStr = dateFormat.format(new Date(currentTime - (i * ONE_DAY_IN_MILLISECONDS)));
fs.mkdirs(new Path(walsDir, dateStr));
fs.mkdirs(new Path(bulkLoadDir, dateStr));
}
}

private static void verifyBackupCleanup(FileSystem fs, Path backupWalDir, long currentTime)
throws IOException {
Path walsDir = new Path(backupWalDir, WALS_DIR);
Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR);
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);

// Expect folders older than 3 days to be deleted
for (int i = 3; i < 5; i++) {
String oldDateStr = dateFormat.format(new Date(currentTime - (i * ONE_DAY_IN_MILLISECONDS)));
Path walPath = new Path(walsDir, oldDateStr);
Path bulkLoadPath = new Path(bulkLoadDir, oldDateStr);
assertFalse("Old WAL directory (" + walPath + ") should be deleted, but it exists!",
fs.exists(walPath));
assertFalse("Old BulkLoad directory (" + bulkLoadPath + ") should be deleted, but it exists!",
fs.exists(bulkLoadPath));
}

// Expect folders within the last 3 days to exist
for (int i = 0; i < 3; i++) {
String recentDateStr =
dateFormat.format(new Date(currentTime - (i * ONE_DAY_IN_MILLISECONDS)));
Path walPath = new Path(walsDir, recentDateStr);
Path bulkLoadPath = new Path(bulkLoadDir, recentDateStr);

assertTrue("Recent WAL directory (" + walPath + ") should exist, but it is missing!",
fs.exists(walPath));
assertTrue(
"Recent BulkLoad directory (" + bulkLoadPath + ") should exist, but it is missing!",
fs.exists(bulkLoadPath));
}
}

Expand Down Expand Up @@ -264,15 +276,16 @@ private boolean continuousBackupReplicationPeerExistsAndEnabled() throws IOExcep
peer -> peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER) && peer.isEnabled());
}

private static boolean isWalDirsEmpty(Configuration conf, String backupWalDir)
private static boolean areWalAndBulkloadDirsEmpty(Configuration conf, String backupWalDir)
throws IOException {
BackupFileSystemManager manager =
new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf, backupWalDir);

FileSystem fs = manager.getBackupFs();
Path walDir = manager.getWalsDir();
Path bulkloadDir = manager.getBulkLoadFilesDir();

return isDirectoryEmpty(fs, walDir);
return isDirectoryEmpty(fs, walDir) && isDirectoryEmpty(fs, bulkloadDir);
}

private static boolean isDirectoryEmpty(FileSystem fs, Path dirPath) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.logDirectoryStructure;
import static org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.setupBackupFolders;
import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
Expand Down Expand Up @@ -134,7 +135,7 @@ public void testDeleteOldWALFilesOfCleanupCommand() throws IOException {
fs.mkdirs(backupWalDir);

long currentTime = EnvironmentEdgeManager.getDelegate().currentTime();
setupBackupFolders(fs, backupWalDir, currentTime); // Create 5 days of WALs folders
setupBackupFolders(fs, backupWalDir, currentTime); // Create 5 days of WAL/bulkload-files folder

logDirectoryStructure(fs, backupWalDir, "Before cleanup:");

Expand All @@ -154,18 +155,22 @@ public void testDeleteOldWALFilesOfCleanupCommand() throws IOException {
private static void verifyCleanupOutcome(FileSystem fs, Path backupWalDir, long currentTime,
long cutoffTime) throws IOException {
Path walsDir = new Path(backupWalDir, WALS_DIR);
Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR);
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));

for (int i = 0; i < 5; i++) {
long dayTime = currentTime - (i * ONE_DAY_IN_MILLISECONDS);
String dayDir = dateFormat.format(new Date(dayTime));
Path walPath = new Path(walsDir, dayDir);
Path bulkPath = new Path(bulkLoadDir, dayDir);

if (dayTime + ONE_DAY_IN_MILLISECONDS - 1 < cutoffTime) {
assertFalse("Old WAL dir should be deleted: " + walPath, fs.exists(walPath));
assertFalse("Old BulkLoad dir should be deleted: " + bulkPath, fs.exists(bulkPath));
} else {
assertTrue("Recent WAL dir should exist: " + walPath, fs.exists(walPath));
assertTrue("Recent BulkLoad dir should exist: " + bulkPath, fs.exists(bulkPath));
}
}
}
Expand Down