Skip to content

Commit

Permalink
Add support for backing up a given namespace (#3687)
Browse files Browse the repository at this point in the history
This PR adds 'namespace' argument to the Corfu's Backup API,
which allows to back up all tables belonging to the given
namespace.
  • Loading branch information
Lujie1996 committed Jun 29, 2023
1 parent 9b29e16 commit 5845f0c
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 203 deletions.
138 changes: 47 additions & 91 deletions runtime/src/main/java/org/corfudb/runtime/Backup.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,23 @@
import org.apache.commons.io.FileUtils;
import org.corfudb.protocols.logprotocol.OpaqueEntry;
import org.corfudb.protocols.logprotocol.SMREntry;
import org.corfudb.runtime.collections.CorfuRecord;
import org.corfudb.runtime.exceptions.BackupRestoreException;
import org.corfudb.runtime.exceptions.TrimmedException;
import org.corfudb.runtime.view.StreamOptions;
import org.corfudb.runtime.view.TableRegistry;
import org.corfudb.runtime.view.stream.OpaqueStream;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -46,49 +51,42 @@ public class Backup {
private String backupTempDirPath;

// The stream IDs of tables which are backed up
private final List<UUID> streamIDs;
private final List<UUID> streamsToBackUp = new ArrayList<>();

// The snapshot address to back up
private long timestamp;

// The Corfu Runtime which is performing the back up
// The Corfu Runtime which is performing the backup
private final CorfuRuntime runtime;

// All tables in Corfu Db
private List<UUID> allTablesInDb;
// The tables belonging to this namespace will be backed up
private final String namespace;

// Whether to back up only tagged tables or all tables
private final boolean taggedTables;

/**
* Backup files of tables are temporarily stored under BACKUP_TEMP_DIR. They are deleted after backup finishes.
*/
private static final String BACKUP_TEMP_DIR_PREFIX = "corfu_backup_";

/**
* @param filePath - the filePath where the generated backup tar file will be placed
* @param streamIDs - the stream IDs of tables which are backed up
* @param runtime - the runtime which is performing the back up
*/
public Backup(String filePath, List<UUID> streamIDs, CorfuRuntime runtime) {
this.filePath = filePath;
this.streamIDs = streamIDs;
this.runtime = runtime;
}

/**
* Discover and back up all tables, or tables with requires_backup_support tag
* Discover and back up all tables
*
* @param filePath - the filePath where the generated backup tar file will be placed
* @param runtime - the runtime which is performing the back up
* @param taggedTablesOnly - if true, back up tables which has requires_backup_support tag set;
* if false, back up all UFO tables
* @param runtime - the runtime which is performing the backup
* @param taggedTables - if true, back up only tagged tables; if false, back up all tables
* @param namespace - tables belonging to this namespace will be backed up; if null, back up all namespaces
*/
public Backup(String filePath, CorfuRuntime runtime, boolean taggedTablesOnly) {
public Backup(@Nonnull String filePath,
@Nonnull CorfuRuntime runtime,
boolean taggedTables,
@Nullable String namespace) {

this.filePath = filePath;
this.runtime = runtime;
if (taggedTablesOnly) {
this.streamIDs = getTaggedTables();
} else {
this.streamIDs = getAllTables();
}
this.taggedTables = taggedTables;
this.namespace = namespace;
}

/**
Expand All @@ -98,11 +96,20 @@ public Backup(String filePath, CorfuRuntime runtime, boolean taggedTablesOnly) {
*/
public void start() throws IOException {
log.info("started corfu backup");
if (streamIDs == null) {
log.warn("streamIDs is a null variable! back up aborted!");
return;

streamsToBackUp.addAll(runtime.getTableRegistry()
.getRegistryTable()
.entryStream()
.filter(this::filterTable)
.map(table -> CorfuRuntime.getStreamID(TableRegistry.getFullyQualifiedTableName(table.getKey())))
.collect(Collectors.toList()));

if (streamsToBackUp.isEmpty()) {
log.warn("back up is called with empty streamIDs!");
}

log.info("Preparing to back up {} tables: {}", streamsToBackUp.size(), streamsToBackUp);

this.timestamp = runtime.getAddressSpaceView().getLogTail();

try {
Expand All @@ -112,27 +119,23 @@ public void start() throws IOException {
backup();
generateTarFile();
} catch (Exception e) {
throw new BackupRestoreException("failed to backup tables " + streamIDs, e);
throw new BackupRestoreException("failed to backup tables " + streamsToBackUp, e);
} finally {
cleanup();
}
log.info("backup completed");
}

/**
* Check if table exists in Corfu Db
* A predicate checking if the given table should be backed up based on the given configuration
*
* @param streamId the stream id of the table which is being checked
* @return true if table exists
* @param table a given table entry from RegistryTable
* @return if the table should be backed up
*/
private boolean tableExists(UUID streamId) {
if (allTablesInDb == null) {
allTablesInDb = runtime.getTableRegistry().listTables()
.stream()
.map(tableName -> CorfuRuntime.getStreamID(TableRegistry.getFullyQualifiedTableName(tableName)))
.collect(Collectors.toList());
}
return allTablesInDb.contains(streamId);
private boolean filterTable(Map.Entry<CorfuStoreMetadata.TableName,
CorfuRecord<CorfuStoreMetadata.TableDescriptors, CorfuStoreMetadata.TableMetadata>> table) {
return (namespace == null || table.getKey().getNamespace().equals(namespace)) &&
(!taggedTables || table.getValue().getMetadata().getTableOptions().getRequiresBackupSupport());
}

/**
Expand All @@ -141,22 +144,11 @@ private boolean tableExists(UUID streamId) {
* @throws IOException
*/
private void backup() throws IOException {
if (streamIDs.isEmpty()) {
log.warn("back up is called with empty streamIDs!");
return;
}

long startTime = System.currentTimeMillis();

this.backupTempDirPath = Files.createTempDirectory(BACKUP_TEMP_DIR_PREFIX).toString();
Map<UUID, String> streamIdToTableNameMap = getStreamIdToTableNameMap();
for (UUID streamId : streamIDs) {
if (!tableExists(streamId)) {
log.warn("cannot back up a non-existent table stream id {} table name {}",
streamId, streamIdToTableNameMap.get(streamId));
continue;
}

for (UUID streamId : streamsToBackUp) {
// temporary backup file's name format: uuid.namespace$tableName
Path filePath = Paths.get(backupTempDirPath)
.resolve(streamId + "." + streamIdToTableNameMap.get(streamId));
Expand All @@ -165,7 +157,7 @@ private void backup() throws IOException {
long elapsedTime = System.currentTimeMillis() - startTime;

log.info("successfully backed up {} tables to {} directory, elapsed time {}ms",
streamIDs.size(), backupTempDirPath, elapsedTime);
streamsToBackUp.size(), backupTempDirPath, elapsedTime);
}

/**
Expand Down Expand Up @@ -200,7 +192,7 @@ private void backupTable(Path filePath, UUID uuid) throws IOException {

long elapsedTime = System.currentTimeMillis() - startTime;

log.info("{} entries (size: {} bytes, elapsed time: {} ms) saved to temp file {}",
log.info("{} SMREntry (size: {} bytes, elapsed time: {} ms) saved to temp file {}",
backupTableStats.getNumOfEntries(), backupTableStats.getTableSize(), elapsedTime, filePath);
}

Expand All @@ -209,10 +201,10 @@ private BackupTableStats writeTableToFile(FileOutputStream fileOutput, Stream<Op
int numOfEntries = 0;
int tableSize = 0;
while (iterator.hasNext()) {
numOfEntries++;
OpaqueEntry lastEntry = iterator.next();
List<SMREntry> smrEntries = lastEntry.getEntries().get(uuid);
if (smrEntries != null) {
numOfEntries += smrEntries.size();
Map<UUID, List<SMREntry>> map = new HashMap<>();
map.put(uuid, smrEntries);
OpaqueEntry newOpaqueEntry = new OpaqueEntry(lastEntry.getVersion(), map);
Expand Down Expand Up @@ -276,42 +268,6 @@ private void addToTarFile(File tableFile,TarArchiveOutputStream tarOutput) throw
}
}

/**
* Get UUIDs of tables which have the requires_backup_support tag set
*
* @return List<UUID> - a list of UUIDs for all the tables which require backup support
*/
private List<UUID> getTaggedTables() {
TableRegistry tableRegistry = runtime.getTableRegistry();
List<UUID> tables = tableRegistry
.listTables()
.stream()
.filter(tableName -> tableRegistry
.getRegistryTable()
.get(tableName)
.getMetadata()
.getTableOptions()
.getRequiresBackupSupport())
.map(tableName -> CorfuRuntime
.getStreamID(TableRegistry.getFullyQualifiedTableName(tableName)))
.collect(Collectors.toList());
log.info("{} tables need to be backed up.", tables.size());

return tables;
}

/**
* Get UUIDs of all registered UFO tables
*
* @return List<UUID> - a list of UUIDs for all registered UFO tables
*/
private List<UUID> getAllTables() {
return runtime.getTableRegistry().listTables().stream()
.map(table -> CorfuRuntime.getStreamID(
TableRegistry.getFullyQualifiedTableName(table.getNamespace(), table.getTableName())))
.collect(Collectors.toList());
}

/**
* Get a map which maps stream ID to TableName
*
Expand Down
7 changes: 4 additions & 3 deletions runtime/src/main/java/org/corfudb/runtime/Restore.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ private void restore() throws IOException {
}

long startTime = System.currentTimeMillis();
int restoredTableCount = 0;
for (String tableBackup : tableBackups) {
log.info("start restoring table {}", tableBackup);
if (restoreMode == RestoreMode.PARTIAL_TAGGED && !isTableTagged(tableBackup)) {
Expand All @@ -154,10 +155,10 @@ private void restore() throws IOException {
log.error("failed to restore table {} from temp file {}", streamId, tableBackup);
throw e;
}
restoredTableCount++;
}
long elapsedTime = System.currentTimeMillis() - startTime;
log.info("successfully restored {} tables to, elapsed time {}ms",
tableBackups.size(), elapsedTime);
log.info("successfully restored {} tables, elapsed time {}ms", restoredTableCount, elapsedTime);
}

/**
Expand Down Expand Up @@ -193,7 +194,7 @@ private void restoreTable(Path filePath, UUID streamId) throws IOException {

long elapsedTime = System.currentTimeMillis() - startTime;

log.info("completed restore of table {} with {} numEntries, total size {} byte(s), elapsed time {}ms",
log.info("completed restore of table {} with {} SMREntry, total size {} byte(s), elapsed time {}ms",
streamId, sbw.getTotalNumSMREntries(), sbw.getTotalWriteSize(), elapsedTime);
} catch (FileNotFoundException e) {
log.error("restoreTable can not find file {}", filePath);
Expand Down

0 comments on commit 5845f0c

Please sign in to comment.