Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,38 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase {

private static String BACKUP_ROOT_DIR = "backupIT";

/*
* This class is used to run the backup and restore thread(s). Throwing an exception in this
* thread will not cause the test to fail, so the purpose of this class is to both kick off the
* backup and restore and record any exceptions that occur so they can be thrown in the main
* thread.
*/
protected class BackupAndRestoreThread implements Runnable {
private final TableName table;
private Exception exc;

public BackupAndRestoreThread(TableName table) {
this.table = table;
this.exc = null;
}

public Exception getException() {
return this.exc;
}

@Override
public void run() {
try {
runTestSingle(this.table);
} catch (Exception e) {
LOG.error(
"An exception occurred in thread {} when performing a backup and restore with table {}: ",
Thread.currentThread().getName(), this.table.getNameAsString(), e);
this.exc = e;
}
}
}

@Override
@Before
public void setUp() throws Exception {
Expand Down Expand Up @@ -175,28 +207,35 @@ public void testBackupRestore() throws Exception {
runTestMulti();
}

private void runTestMulti() throws IOException {
private void runTestMulti() throws Exception {
LOG.info("IT backup & restore started");
Thread[] workers = new Thread[numTables];
BackupAndRestoreThread[] backupAndRestoreThreads = new BackupAndRestoreThread[numTables];
for (int i = 0; i < numTables; i++) {
final TableName table = tableNames[i];
Runnable r = new Runnable() {
@Override
public void run() {
try {
runTestSingle(table);
} catch (IOException e) {
LOG.error("Failed", e);
Assert.fail(e.getMessage());
}
}
};
workers[i] = new Thread(r);
BackupAndRestoreThread backupAndRestoreThread = new BackupAndRestoreThread(table);
backupAndRestoreThreads[i] = backupAndRestoreThread;
workers[i] = new Thread(backupAndRestoreThread);
workers[i].start();
}
// Wait all workers to finish
for (Thread t : workers) {
Uninterruptibles.joinUninterruptibly(t);
// Wait for all workers to finish and check for errors
Exception error = null;
Exception threadExc;
for (int i = 0; i < numTables; i++) {
Uninterruptibles.joinUninterruptibly(workers[i]);
threadExc = backupAndRestoreThreads[i].getException();
if (threadExc == null) {
continue;
}
if (error == null) {
error = threadExc;
} else {
error.addSuppressed(threadExc);
}
}
// Throw any found errors after all threads have completed
if (error != null) {
throw error;
}
LOG.info("IT backup & restore finished");
}
Expand Down Expand Up @@ -229,8 +268,7 @@ private void loadData(TableName table, int numRows) throws IOException {
}

private String backup(BackupRequest request, BackupAdmin client) throws IOException {
String backupId = client.backupTables(request);
return backupId;
return client.backupTables(request);
}

private void restore(RestoreRequest request, BackupAdmin client) throws IOException {
Expand Down Expand Up @@ -300,7 +338,6 @@ private void runTestSingle(TableName table) throws IOException {

private void restoreVerifyTable(Connection conn, BackupAdmin client, TableName table,
String backupId, long expectedRows) throws IOException {

TableName[] tablesRestoreIncMultiple = new TableName[] { table };
restore(
createRestoreRequest(BACKUP_ROOT_DIR, backupId, false, tablesRestoreIncMultiple, null, true),
Expand Down