diff --git a/dev-support/design-docs/parallel-backup/existing_design.uml b/dev-support/design-docs/parallel-backup/existing_design.uml new file mode 100644 index 000000000000..f18b52b0e2a4 --- /dev/null +++ b/dev-support/design-docs/parallel-backup/existing_design.uml @@ -0,0 +1,74 @@ +@startuml +/' +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +'/ +start +: (1.1) Create Backup; +if ( hasActiveSession or inInconsistentState?) then (yes) + : Fail; + stop +else (no) + : (1.2.1) Create Backup Request and execute; + : (1.2.2) Create dirs in destination; + : (1.2.3) Create Backup Client and execute; + : (1.2.4) Create exclusive backup session; + : (1.2.5) Take Backup Table Snapshot; + : (1.2.6) Set State to RUNNING and phase to REQUEST; + if ( full backup?) then (yes) + : (1.2.7.1.1) Read last backup start time or 0L; + : (1.2.7.1.2) Perform LogRoll Procedure; + : (1.2.7.1.3) Record WAL older than LogRoll to system table; + : (1.2.7.1.4) Set Phase to SNAPSHOT; + : (1.2.7.1.5) Take Snapshot of every table; + : (1.2.7.1.6) Export Snapshot to dest dir; + : (1.2.7.1.7) Write start time for next backup to system table; + : (1.2.7.1.8) Add Manifest; + : (1.2.7.1.9) Delete Snapshots; + : (1.2.7.1.10) Cleanup Export snapshot log; + else (no) + : (1.2.7.2.1) Set phase to PREPARE_INCREMENTAL; + if (fail to get log file map?) then (yes) + : Fail; + stop + else (no) + : (1.2.7.2.2) Copy table and region info; + : (1.2.7.2.3) MR to convert WAL into HFiles; + : (1.2.7.2.4) Copy HFiles into dest with DistCP + : (1.2.7.2.5) Record WAL older than what is copied; + : (1.2.7.2.6) Write start time for next backup to system table; + : (1.2.7.2.7) Add Manifest; + : (1.2.7.2.8) Cleanup DistCp log; + endif + endif + : (1.2.8) Delete System Table Snapshot; + : (1.2.9) Update BackupInfo with Status Complete; + : (1.2.10) Clear exclusive backup sesion; +endif +stop + +start +: Fail; +: (2.1) Set State to FAILED; +if (full backup?) then (yes) + : (2.2.1.1) Delete all snapshots; + : (2.2.1.2)Cleanup export snapshot log; +endif +: (2.3) Restore backup system table from snapshot; +: (2.4) Delete backup system table snapshot; +: (2.5) Cleanup Target Dir; +stop +@enduml diff --git a/dev-support/design-docs/parallel-backup/proposed_design.uml b/dev-support/design-docs/parallel-backup/proposed_design.uml new file mode 100644 index 000000000000..23af56d63e6c --- /dev/null +++ b/dev-support/design-docs/parallel-backup/proposed_design.uml @@ -0,0 +1,76 @@ +@startuml +/' +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +'/ + +start +: (1.1) Create Backup; +: (1.2.1) Create Backup Request and execute; +: (1.2.2) Create dirs in destination; +: (1.2.3) Create Backup Client and execute; +#Orange: (1.2.4) Create exclusive backup session; +note right: Table exclusive lock +#Red: (1.2.5) Take Backup Table Snapshot; +note right: To be removed +: (1.2.6) Set State to RUNNING and phase to REQUEST; +if ( full backup?) then (yes) + : (1.2.7.1.1) Read last backup start time or 0L; + : (1.2.7.1.2) Perform LogRoll Procedure; + : (1.2.7.1.3) Record WAL older than LogRoll to system table; + : (1.2.7.1.4) Set Phase to SNAPSHOT; + : (1.2.7.1.5) Take Snapshot of every table; + : (1.2.7.1.6) Export Snapshot to dest dir; + : (1.2.7.1.7) Write start time for next backup to system table; + : (1.2.7.1.8) Add Manifest; + : (1.2.7.1.9) Delete Snapshots; + : (1.2.7.1.10) Cleanup Export snapshot log; +else (no) + : (1.2.7.2.1) Set phase to PREPARE_INCREMENTAL; + if (fail to get log file map?) then (yes) + : Fail; + stop + else (no) + : (1.2.7.2.2) Copy table and region info; + : (1.2.7.2.3) MR to convert WAL into HFiles; + : (1.2.7.2.4) Copy HFiles into dest with DistCP + : (1.2.7.2.5) Record WAL older than what is copied; + : (1.2.7.2.6) Write start time for next backup to system table; + : (1.2.7.2.7) Add Manifest; + : (1.2.7.2.8) Cleanup DistCp log; + endif +endif +#RED: (1.2.8) Delete System Table Snapshot; +note right: To be removed +: (1.2.9) Update BackupInfo with Status Complete; +#Orange: (1.2.10) Clear exclusive backup sesion; +note right: Clear table exclusive lock +stop + +start +: Fail; +: (2.1) Set State to FAILED; +if (full backup?) then (yes) + : (2.2.1.1) Delete all snapshots; + : (2.2.1.2) Cleanup export snapshot log; +endif +#RED: (2.3) Restore backup system table from snapshot; +note right: To be removed +#RED: (2.4) Delete backup system table snapshot; +note right: To be removed +: (2.5) Cleanup Target Dir; +stop +@enduml diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java index 56c454519d81..c2f7e446ba3c 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java @@ -114,7 +114,13 @@ public interface BackupRestoreConstants { String CONF_STAGING_ROOT = "snapshot.export.staging.root"; - String BACKUPID_PREFIX = "backup_"; + String BACKUPID_PREFIX = "backup"; + + String UNDERSCORE = "_"; + + static String getBackupPrefix() { + return BackupRestoreConstants.BACKUPID_PREFIX + BackupRestoreConstants.UNDERSCORE; + } enum BackupCommand { CREATE, diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java index f580fb0c47bb..b7f24ec9211a 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -94,15 +95,18 @@ public int deleteBackups(String[] backupIds) throws IOException { int totalDeleted = 0; Map> allTablesMap = new HashMap<>(); + List backupInfos = new ArrayList<>(); + for (String backupId : backupIds) { + backupInfos.add(getBackupInfo(backupId)); + } boolean deleteSessionStarted; - boolean snapshotDone; try (final BackupSystemTable sysTable = new BackupSystemTable(conn)) { // Step 1: Make sure there is no active session // is running by using startBackupSession API // If there is an active session in progress, exception will be thrown try { - sysTable.startBackupExclusiveOperation(); + sysTable.startBackupExclusiveOperation(backupInfos); deleteSessionStarted = true; } catch (IOException e) { LOG.warn("You can not run delete command while active backup session is in progress. \n" @@ -121,13 +125,6 @@ public int deleteBackups(String[] backupIds) throws IOException { // Step 3: Record delete session sysTable.startDeleteOperation(backupIds); - // Step 4: Snapshot backup system table - if (!BackupSystemTable.snapshotExists(conn)) { - BackupSystemTable.snapshot(conn); - } else { - LOG.warn("Backup system table snapshot exists"); - } - snapshotDone = true; try { for (int i = 0; i < backupIds.length; i++) { BackupInfo info = sysTable.readBackupInfo(backupIds[i]); @@ -145,28 +142,11 @@ public int deleteBackups(String[] backupIds) throws IOException { finalizeDelete(allTablesMap, sysTable); // Finish sysTable.finishDeleteOperation(); - // delete snapshot - BackupSystemTable.deleteSnapshot(conn); } catch (IOException e) { - // Fail delete operation - // Step 1 - if (snapshotDone) { - if (BackupSystemTable.snapshotExists(conn)) { - BackupSystemTable.restoreFromSnapshot(conn); - // delete snapshot - BackupSystemTable.deleteSnapshot(conn); - // We still have record with unfinished delete operation - LOG.error("Delete operation failed, please run backup repair utility to restore " - + "backup system integrity", e); - throw e; - } else { - LOG.warn("Delete operation succeeded, there were some errors: ", e); - } - } - + LOG.warn("Delete operation succeeded, there were some errors: ", e); } finally { if (deleteSessionStarted) { - sysTable.finishBackupExclusiveOperation(); + sysTable.finishBackupExclusiveOperation(Arrays.asList(backupIds)); } } } @@ -524,7 +504,8 @@ public String backupTables(BackupRequest request) throws IOException { String targetRootDir = request.getTargetRootDir(); List tableList = request.getTableList(); - String backupId = BackupRestoreConstants.BACKUPID_PREFIX + EnvironmentEdgeManager.currentTime(); + String backupId = + BackupRestoreConstants.getBackupPrefix() + EnvironmentEdgeManager.currentTime(); if (type == BackupType.INCREMENTAL) { Set incrTableSet; try (BackupSystemTable table = new BackupSystemTable(conn)) { diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java index ce9c5bbe8fae..99bcdf370cb1 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java @@ -42,6 +42,7 @@ import java.io.IOException; import java.net.URI; +import java.util.Arrays; import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -258,7 +259,7 @@ public static class CreateCommand extends Command { @Override protected boolean requiresNoActiveSession() { - return true; + return false; } @Override @@ -335,12 +336,12 @@ public void execute() throws IOException { System.setProperty("mapreduce.job.queuename", queueName); } + List tablesList = Lists.newArrayList(BackupUtils.parseTableNames(tables)); + try (BackupAdminImpl admin = new BackupAdminImpl(conn)) { BackupRequest.Builder builder = new BackupRequest.Builder(); BackupRequest request = builder.withBackupType(BackupType.valueOf(args[1].toUpperCase())) - .withTableList( - tables != null ? Lists.newArrayList(BackupUtils.parseTableNames(tables)) : null) - .withTargetRootDir(targetBackupDir).withTotalTasks(workers) + .withTableList(tablesList).withTargetRootDir(targetBackupDir).withTotalTasks(workers) .withBandwidthPerTasks(bandwidth).withBackupSetName(setName).build(); String backupId = admin.backupTables(request); System.out.println("Backup session " + backupId + " finished. Status: SUCCESS"); @@ -672,9 +673,8 @@ public void execute() throws IOException { try (final Connection conn = ConnectionFactory.createConnection(conf); final BackupSystemTable sysTable = new BackupSystemTable(conn)) { // Failed backup - BackupInfo backupInfo; - List list = sysTable.getBackupInfos(BackupState.RUNNING); - if (list.size() == 0) { + List backupInfos = sysTable.getBackupInfos(BackupState.RUNNING); + if (backupInfos.size() == 0) { // No failed sessions found System.out.println("REPAIR status: no failed sessions found." + " Checking failed delete backup operation ..."); @@ -682,25 +682,26 @@ public void execute() throws IOException { repairFailedBackupMergeIfAny(conn, sysTable); return; } - backupInfo = list.get(0); - // If this is a cancel exception, then we've already cleaned. - // set the failure timestamp of the overall backup - backupInfo.setCompleteTs(EnvironmentEdgeManager.currentTime()); - // set failure message - backupInfo.setFailedMsg("REPAIR status: repaired after failure:\n" + backupInfo); - // set overall backup status: failed - backupInfo.setState(BackupState.FAILED); - // compose the backup failed data - String backupFailedData = "BackupId=" + backupInfo.getBackupId() + ",startts=" - + backupInfo.getStartTs() + ",failedts=" + backupInfo.getCompleteTs() + ",failedphase=" - + backupInfo.getPhase() + ",failedmessage=" + backupInfo.getFailedMsg(); - System.out.println(backupFailedData); - TableBackupClient.cleanupAndRestoreBackupSystem(conn, backupInfo, conf); - // If backup session is updated to FAILED state - means we - // processed recovery already. - sysTable.updateBackupInfo(backupInfo); - sysTable.finishBackupExclusiveOperation(); - System.out.println("REPAIR status: finished repair failed session:\n " + backupInfo); + for (BackupInfo backupInfo : backupInfos) { + // If this is a cancel exception, then we've already cleaned. + // set the failure timestamp of the overall backup + backupInfo.setCompleteTs(EnvironmentEdgeManager.currentTime()); + // set failure message + backupInfo.setFailedMsg("REPAIR status: repaired after failure:\n" + backupInfo); + // set overall backup status: failed + backupInfo.setState(BackupState.FAILED); + // compose the backup failed data + String backupFailedData = "BackupId=" + backupInfo.getBackupId() + ",startts=" + + backupInfo.getStartTs() + ",failedts=" + backupInfo.getCompleteTs() + ",failedphase=" + + backupInfo.getPhase() + ",failedmessage=" + backupInfo.getFailedMsg(); + System.out.println(backupFailedData); + TableBackupClient.cleanupAndRestoreBackupSystem(conn, backupInfo, conf); + // If backup session is updated to FAILED state - means we + // processed recovery already. + sysTable.updateBackupInfo(backupInfo); + sysTable.finishBackupExclusiveOperation(Arrays.asList(backupInfo.getBackupId())); + System.out.println("REPAIR status: finished repair failed session:\n " + backupInfo); + } } } @@ -709,16 +710,12 @@ private void repairFailedBackupDeletionIfAny(Connection conn, BackupSystemTable String[] backupIds = sysTable.getListOfBackupIdsFromDeleteOperation(); if (backupIds == null || backupIds.length == 0) { System.out.println("No failed backup DELETE operation found"); - // Delete backup table snapshot if exists - BackupSystemTable.deleteSnapshot(conn); return; } System.out.println("Found failed DELETE operation for: " + StringUtils.join(backupIds)); System.out.println("Running DELETE again ..."); - // Restore table from snapshot - BackupSystemTable.restoreFromSnapshot(conn); // Finish previous failed session - sysTable.finishBackupExclusiveOperation(); + sysTable.finishBackupExclusiveOperation(Arrays.asList(backupIds)); try (BackupAdmin admin = new BackupAdminImpl(conn)) { admin.deleteBackups(backupIds); } @@ -731,8 +728,6 @@ public static void repairFailedBackupMergeIfAny(Connection conn, BackupSystemTab String[] backupIds = sysTable.getListOfBackupIdsFromMergeOperation(); if (backupIds == null || backupIds.length == 0) { System.out.println("No failed backup MERGE operation found"); - // Delete backup table snapshot if exists - BackupSystemTable.deleteSnapshot(conn); return; } System.out.println("Found failed MERGE operation for: " + StringUtils.join(backupIds)); @@ -758,10 +753,8 @@ public static void repairFailedBackupMergeIfAny(Connection conn, BackupSystemTab } else { checkRemoveBackupImages(fs, backupRoot, backupIds); } - // Restore table from snapshot - BackupSystemTable.restoreFromSnapshot(conn); // Unlock backup system - sysTable.finishBackupExclusiveOperation(); + sysTable.finishBackupExclusiveOperation(Arrays.asList(backupIds)); // Finish previous failed session sysTable.finishMergeOperation(); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java index ed1755ad5021..abff217b00b4 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -381,14 +382,14 @@ public void updateBackupInfo(BackupInfo context) throws IOException { * Starts new backup session * @throws IOException if active session already exists */ - public void startBackupSession() throws IOException { + public void startBackupSession(List backupInfos) throws IOException { long startTime = EnvironmentEdgeManager.currentTime(); long timeout = conf.getInt(BACKUP_EXCLUSIVE_OPERATION_TIMEOUT_SECONDS_KEY, DEFAULT_BACKUP_EXCLUSIVE_OPERATION_TIMEOUT) * 1000L; long lastWarningOutputTime = 0; while (EnvironmentEdgeManager.currentTime() - startTime < timeout) { try { - systemTable.startBackupExclusiveOperation(); + systemTable.startBackupExclusiveOperation(backupInfos); return; } catch (IOException e) { if (e instanceof ExclusiveOperationException) { @@ -420,8 +421,8 @@ public void startBackupSession() throws IOException { * Finishes active backup session * @throws IOException if no active session */ - public void finishBackupSession() throws IOException { - systemTable.finishBackupExclusiveOperation(); + public void finishBackupSession(String backupId) throws IOException { + systemTable.finishBackupExclusiveOperation(Arrays.asList(backupId)); } /** diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index 04f43b5b0ea1..09a5f80889f1 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.nio.charset.StandardCharsets; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -30,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.NavigableMap; import java.util.Objects; import java.util.Set; import java.util.TreeMap; @@ -51,6 +53,7 @@ import org.apache.hadoop.hbase.backup.BackupType; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.CheckAndMutate; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; @@ -67,6 +70,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.util.Time; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -143,12 +147,13 @@ public String toString() { /** * Stores backup sessions (contexts) */ - final static byte[] SESSIONS_FAMILY = Bytes.toBytes("session"); + private final static String SESSIONS_FAMILY_STR = "session"; + private final static byte[] SESSIONS_FAMILY = Bytes.toBytes(SESSIONS_FAMILY_STR); /** * Stores other meta */ - final static byte[] META_FAMILY = Bytes.toBytes("meta"); - final static byte[] BULK_LOAD_FAMILY = Bytes.toBytes("bulk"); + private final static String META_FAMILY_STR = "meta"; + private final static byte[] META_FAMILY = Bytes.toBytes(META_FAMILY_STR); /** * Connection to HBase cluster, shared among all instances */ @@ -156,11 +161,10 @@ public String toString() { private final static String BACKUP_INFO_PREFIX = "session:"; private final static String START_CODE_ROW = "startcode:"; - private final static byte[] ACTIVE_SESSION_ROW = Bytes.toBytes("activesession:"); - private final static byte[] ACTIVE_SESSION_COL = Bytes.toBytes("c"); - - private final static byte[] ACTIVE_SESSION_YES = Bytes.toBytes("yes"); - private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no"); + private final static String ACTIVE_SESSION_ROW_STR = "activesession:"; + private final static byte[] ACTIVE_SESSION_ROW = Bytes.toBytes(ACTIVE_SESSION_ROW_STR); + private final static String ACTIVE_SESSION_VERSION_STR = "version"; + private final static byte[] ACTIVE_SESSION_VERSION = Bytes.toBytes(ACTIVE_SESSION_VERSION_STR); private final static String INCR_BACKUP_SET = "incrbackupset:"; private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; @@ -252,6 +256,35 @@ private void waitForSystemTable(Admin admin, TableName tableName) throws IOExcep LOG.debug("Backup table {} exists and available", tableName); } + private Map>> getExistingBackupSessionsForTables() + throws IOException { + Map>> response = new HashMap<>(); + try (Table table = connection.getTable(tableName)) { + Get get = new Get(ACTIVE_SESSION_ROW); + Result result = table.get(get); + NavigableMap>> cfMap = + result.getMap(); + if (cfMap != null) { + for (Map.Entry>> cfEntry : cfMap + .entrySet()) { + String cf = Bytes.toString(cfEntry.getKey()); + response.putIfAbsent(cf, new HashMap<>()); + NavigableMap> columns = cfEntry.getValue(); + if (columns != null) { + for (Map.Entry> colEntry : columns.entrySet()) { + String tableName = Bytes.toString(colEntry.getKey()); + for (Map.Entry valueEntry : colEntry.getValue().entrySet()) { + response.get(cf).put(tableName, new AbstractMap.SimpleImmutableEntry<>( + valueEntry.getKey(), new String(valueEntry.getValue(), StandardCharsets.UTF_8))); + } + } + } + } + } + } + return response; + } + @Override public void close() { // do nothing @@ -606,53 +639,108 @@ public void writeBackupStartCode(Long startCode, String backupRoot) throws IOExc * @throws IOException if a table operation fails or an active backup exclusive operation is * already underway */ - public void startBackupExclusiveOperation() throws IOException { + public void startBackupExclusiveOperation(List backupInfos) throws IOException { LOG.debug("Start new backup exclusive operation"); - try (Table table = connection.getTable(tableName)) { - Put put = createPutForStartBackupSession(); - // First try to put if row does not exist + Map>> backupSessionsForTables = + getExistingBackupSessionsForTables(); + Map lockedTables = new HashMap<>(); + for (BackupInfo backupInfo : backupInfos) { + for (TableName tableName : backupInfo.getTables()) { + if ( + backupSessionsForTables.containsKey(META_FAMILY_STR) + && backupSessionsForTables.get(META_FAMILY_STR).containsKey(tableName.toString()) + ) { + String lkBkpId = + backupSessionsForTables.get(META_FAMILY_STR).get(tableName.toString()).getValue(); + lockedTables.put(tableName.toString(), lkBkpId); + } + } + } + if (lockedTables.size() > 0) { + throw new ExclusiveOperationException( + "Some tables have backups running. Table locks: " + lockedTables); + } + + try (Table sysTable = connection.getTable(tableName)) { + Put put = new Put(ACTIVE_SESSION_ROW); + for (BackupInfo backupInfo : backupInfos) { + for (TableName table : backupInfo.getTables()) { + put.addColumn(META_FAMILY, table.getNameAsString().getBytes(StandardCharsets.UTF_8), + backupInfo.getBackupId().getBytes(StandardCharsets.UTF_8)); + } + } + put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_VERSION, + Bytes.toBytes(String.valueOf(Time.now()))); + if ( - !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) - .ifNotExists().thenPut(put) + !sysTable.checkAndMutate(CheckAndMutate.newBuilder(ACTIVE_SESSION_ROW) + .ifNotExists(SESSIONS_FAMILY, ACTIVE_SESSION_VERSION).build(put)).isSuccess() ) { - // Row exists, try to put if value == ACTIVE_SESSION_NO + String version = "0"; + if ( + backupSessionsForTables.containsKey(SESSIONS_FAMILY_STR) && backupSessionsForTables + .get(SESSIONS_FAMILY_STR).containsKey(ACTIVE_SESSION_VERSION_STR) + ) { + version = backupSessionsForTables.get(SESSIONS_FAMILY_STR).get(ACTIVE_SESSION_VERSION_STR) + .getValue(); + } if ( - !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) - .ifEquals(ACTIVE_SESSION_NO).thenPut(put) + !sysTable.checkAndMutate(CheckAndMutate.newBuilder(ACTIVE_SESSION_ROW) + .ifEquals(SESSIONS_FAMILY, ACTIVE_SESSION_VERSION, + version.getBytes(StandardCharsets.UTF_8)) + .build(put)).isSuccess() ) { - throw new ExclusiveOperationException(); + throw new ExclusiveOperationException(String.format( + "Failed to acquire table lock. Read Version: %s, " + "for Row: %s, CF: %s, QF: %s", + version, ACTIVE_SESSION_ROW_STR, SESSIONS_FAMILY_STR, ACTIVE_SESSION_VERSION_STR)); } } } } - private Put createPutForStartBackupSession() { - Put put = new Put(ACTIVE_SESSION_ROW); - put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_YES); - return put; - } - - public void finishBackupExclusiveOperation() throws IOException { + public void finishBackupExclusiveOperation(List backupIds) throws IOException { LOG.debug("Finish backup exclusive operation"); + Map>> backupSessionsForTables = + getExistingBackupSessionsForTables(); + + boolean hasDeletes = false; + try (Table sysTable = connection.getTable(tableName)) { + Delete delete = new Delete(ACTIVE_SESSION_ROW); - try (Table table = connection.getTable(tableName)) { - Put put = createPutForStopBackupSession(); if ( - !table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) - .ifEquals(ACTIVE_SESSION_YES).thenPut(put) + backupSessionsForTables.containsKey(META_FAMILY_STR) + && backupSessionsForTables.get(META_FAMILY_STR).size() > 0 ) { + Map> tables = backupSessionsForTables.get(META_FAMILY_STR); + for (Entry> entry : tables.entrySet()) { + String bkp = tables.get(entry.getKey()).getValue(); + if (backupIds.contains(bkp)) { + delete.addColumn(META_FAMILY, entry.getKey().getBytes(StandardCharsets.UTF_8)); + hasDeletes = true; + } + } + } + if (hasDeletes) { + String version = backupSessionsForTables.get(SESSIONS_FAMILY_STR) + .get(ACTIVE_SESSION_VERSION_STR).getValue(); + if ( + !sysTable.checkAndMutate(CheckAndMutate.newBuilder(ACTIVE_SESSION_ROW) + .ifEquals(SESSIONS_FAMILY, ACTIVE_SESSION_VERSION, + version.getBytes(StandardCharsets.UTF_8)) + .build(delete)).isSuccess() + ) { + throw new ExclusiveOperationException(String.format( + "Failed to acquire table lock while deleting entries." + + "Read Version: %s, for Row: %s, CF: %s, QF: %s", + version, ACTIVE_SESSION_ROW_STR, SESSIONS_FAMILY_STR, ACTIVE_SESSION_VERSION_STR)); + } + } else if (backupIds.size() > 0) { throw new IOException("There is no active backup exclusive operation"); } } } - private Put createPutForStopBackupSession() { - Put put = new Put(ACTIVE_SESSION_ROW); - put.addColumn(SESSIONS_FAMILY, ACTIVE_SESSION_COL, ACTIVE_SESSION_NO); - return put; - } - /** * Get the Region Servers log information after the last log roll from backup system table. * @param backupRoot root directory path to backup diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/ExclusiveOperationException.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/ExclusiveOperationException.java index d5c4ab31c655..19183331df9d 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/ExclusiveOperationException.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/ExclusiveOperationException.java @@ -24,6 +24,10 @@ @SuppressWarnings("serial") public class ExclusiveOperationException extends IOException { + public ExclusiveOperationException(String msg) { + super(msg); + } + public ExclusiveOperationException() { super(); } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java index 60dbc6470a77..fd7b2deb008c 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -93,7 +94,7 @@ public void init(final Connection conn, final String backupId, BackupRequest req this.tableList = new ArrayList<>(backupInfo.getTables()); } // Start new session - backupManager.startBackupSession(); + backupManager.startBackupSession(Arrays.asList(backupInfo)); } /** @@ -104,7 +105,6 @@ public void init(final Connection conn, final String backupId, BackupRequest req protected void beginBackup(BackupManager backupManager, BackupInfo backupInfo) throws IOException { - BackupSystemTable.snapshot(conn); backupManager.setBackupInfo(backupInfo); // set the start timestamp of the overall backup long startTs = EnvironmentEdgeManager.currentTime(); @@ -242,7 +242,7 @@ protected void failBackup(Connection conn, BackupInfo backupInfo, BackupManager // If backup session is updated to FAILED state - means we // processed recovery already. backupManager.updateBackupInfo(backupInfo); - backupManager.finishBackupSession(); + backupManager.finishBackupSession(backupId); LOG.error("Backup " + backupInfo.getBackupId() + " failed."); } catch (IOException ee) { LOG.error("Please run backup repair tool manually to restore backup system integrity"); @@ -259,8 +259,6 @@ public static void cleanupAndRestoreBackupSystem(Connection conn, BackupInfo bac deleteSnapshots(conn, backupInfo, conf); cleanupExportSnapshotLog(conf); } - BackupSystemTable.restoreFromSnapshot(conn); - BackupSystemTable.deleteSnapshot(conn); // clean up the uncompleted data at target directory if the ongoing backup has already entered // the copy phase // For incremental backup, DistCp logs will be cleaned with the targetDir. @@ -367,11 +365,10 @@ protected void completeBackup(final Connection conn, BackupInfo backupInfo, deleteSnapshots(conn, backupInfo, conf); cleanupExportSnapshotLog(conf); } - BackupSystemTable.deleteSnapshot(conn); backupManager.updateBackupInfo(backupInfo); // Finish active session - backupManager.finishBackupSession(); + backupManager.finishBackupSession(backupId); LOG.info("Backup " + backupInfo.getBackupId() + " completed."); } diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java index 3b4cf0246d73..0bb3f6de1f7c 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupMergeJob.java @@ -24,6 +24,7 @@ import java.io.OutputStream; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Arrays; import java.util.Deque; import java.util.HashSet; import java.util.List; @@ -99,9 +100,16 @@ public void run(String[] backupIds) throws IOException { FileSystem fs = FileSystem.get(getConf()); try { + List backupInfos = new ArrayList<>(); + for (String backupId : backupIds) { + BackupInfo bInfo = table.readBackupInfo(backupId); + if (bInfo != null) { + backupInfos.add(bInfo); + } + } // Get exclusive lock on backup system - table.startBackupExclusiveOperation(); + table.startBackupExclusiveOperation(backupInfos); // Start merge operation table.startMergeOperation(backupIds); @@ -177,7 +185,7 @@ public void run(String[] backupIds) throws IOException { // Finish merge session table.finishMergeOperation(); // Release lock - table.finishBackupExclusiveOperation(); + table.finishBackupExclusiveOperation(Arrays.asList(backupIds)); } catch (RuntimeException e) { throw e; @@ -188,7 +196,7 @@ public void run(String[] backupIds) throws IOException { // merge MUST be repeated (no need for repair) cleanupBulkLoadDirs(fs, toPathList(processedTableList)); table.finishMergeOperation(); - table.finishBackupExclusiveOperation(); + table.finishBackupExclusiveOperation(Arrays.asList(backupIds)); throw new IOException("Backup merge operation failed, you should try it again", e); } else { // backup repair must be run diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java index d4e849f610ae..c0355d6df17f 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java @@ -753,13 +753,15 @@ public static BulkLoadHFiles createLoader(Configuration config) { public static String findMostRecentBackupId(String[] backupIds) { long recentTimestamp = Long.MIN_VALUE; + String retId = null; for (String backupId : backupIds) { long ts = Long.parseLong(Iterators.get(Splitter.on('_').split(backupId).iterator(), 1)); if (ts > recentTimestamp) { recentTimestamp = ts; + retId = backupId; } } - return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp; + return (retId == null) ? BackupRestoreConstants.getBackupPrefix() + recentTimestamp : retId; } } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithFailures.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithFailures.java index 12c8d5c4065c..47aac493506e 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithFailures.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithFailures.java @@ -17,22 +17,13 @@ */ package org.apache.hadoop.hbase.backup; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Optional; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.SnapshotDescription; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; @@ -41,15 +32,9 @@ import org.apache.hadoop.hbase.coprocessor.MasterObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.util.ToolRunner; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; /** * This class is only a base for other integration-level backup tests. Do not add tests here. @@ -63,8 +48,6 @@ public class TestBackupDeleteWithFailures extends TestBackupBase { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestBackupDeleteWithFailures.class); - private static final Logger LOG = LoggerFactory.getLogger(TestBackupDeleteWithFailures.class); - public enum Failure { NO_FAILURES, PRE_SNAPSHOT_FAILURE, @@ -125,75 +108,4 @@ public static void setUp() throws Exception { conf1.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); setUpHelper(); } - - private MasterSnapshotObserver getMasterSnapshotObserver() { - return TEST_UTIL.getHBaseCluster().getMaster().getMasterCoprocessorHost() - .findCoprocessor(MasterSnapshotObserver.class); - } - - @Test - public void testBackupDeleteWithFailures() throws Exception { - testBackupDeleteWithFailuresAfter(1, Failure.PRE_DELETE_SNAPSHOT_FAILURE); - testBackupDeleteWithFailuresAfter(0, Failure.POST_DELETE_SNAPSHOT_FAILURE); - testBackupDeleteWithFailuresAfter(1, Failure.PRE_SNAPSHOT_FAILURE); - } - - private void testBackupDeleteWithFailuresAfter(int expected, Failure... failures) - throws Exception { - LOG.info("test repair backup delete on a single table with data and failures " + failures[0]); - List tableList = Lists.newArrayList(table1); - String backupId = fullTableBackup(tableList); - assertTrue(checkSucceeded(backupId)); - LOG.info("backup complete"); - String[] backupIds = new String[] { backupId }; - BackupSystemTable table = new BackupSystemTable(TEST_UTIL.getConnection()); - BackupInfo info = table.readBackupInfo(backupId); - Path path = new Path(info.getBackupRootDir(), backupId); - FileSystem fs = FileSystem.get(path.toUri(), conf1); - assertTrue(fs.exists(path)); - - Connection conn = TEST_UTIL.getConnection(); - Admin admin = conn.getAdmin(); - MasterSnapshotObserver observer = getMasterSnapshotObserver(); - - observer.setFailures(failures); - try { - getBackupAdmin().deleteBackups(backupIds); - } catch (IOException e) { - if (expected != 1) { - assertTrue(false); - } - } - - // Verify that history length == expected after delete failure - assertTrue(table.getBackupHistory().size() == expected); - - String[] ids = table.getListOfBackupIdsFromDeleteOperation(); - - // Verify that we still have delete record in backup system table - if (expected == 1) { - assertTrue(ids.length == 1); - assertTrue(ids[0].equals(backupId)); - } else { - assertNull(ids); - } - - // Now run repair command to repair "failed" delete operation - String[] args = new String[] { "repair" }; - - observer.setFailures(Failure.NO_FAILURES); - - // Run repair - int ret = ToolRunner.run(conf1, new BackupDriver(), args); - assertTrue(ret == 0); - // Verify that history length == 0 - assertTrue(table.getBackupHistory().size() == 0); - ids = table.getListOfBackupIdsFromDeleteOperation(); - - // Verify that we do not have delete record in backup system table - assertNull(ids); - - table.close(); - admin.close(); - } } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupManager.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupManager.java index e044b42a0bd9..c0d6778c1f68 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupManager.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupManager.java @@ -21,6 +21,7 @@ import static org.junit.Assert.fail; import java.io.IOException; +import java.util.ArrayList; import java.util.concurrent.atomic.AtomicLongArray; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -94,7 +95,7 @@ public void testStartBackupExclusiveOperation() { @Override public void run() { try { - backupManager.startBackupSession(); + backupManager.startBackupSession(new ArrayList<>()); boolean result = startTimes.compareAndSet(0, 0, EnvironmentEdgeManager.currentTime()); if (!result) { result = startTimes.compareAndSet(1, 0, EnvironmentEdgeManager.currentTime()); @@ -110,7 +111,7 @@ public void run() { throw new IOException("PANIC! Unreachable code"); } } - backupManager.finishBackupSession(); + backupManager.finishBackupSession(null); } catch (IOException | InterruptedException e) { fail("Unexpected exception: " + e.getMessage()); } @@ -128,8 +129,8 @@ public void run() { } LOG.info("Diff start time=" + (startTimes.get(1) - startTimes.get(0)) + "ms"); LOG.info("Diff finish time=" + (stopTimes.get(1) - stopTimes.get(0)) + "ms"); - assertTrue(startTimes.get(1) - startTimes.get(0) >= sleepTime); - assertTrue(stopTimes.get(1) - stopTimes.get(0) >= sleepTime); + assertTrue(startTimes.get(1) - startTimes.get(0) < sleepTime); + assertTrue(stopTimes.get(1) - stopTimes.get(0) < sleepTime); } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestFullBackupWithFailures.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestFullBackupWithFailures.java index 1536fd1841fb..07d3ab97e19f 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestFullBackupWithFailures.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestFullBackupWithFailures.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.backup; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -75,7 +76,7 @@ public void runBackupAndFailAtStage(int stage) throws Exception { assertFalse(checkSucceeded(backupId)); } Set tables = table.getIncrementalBackupTableSet(BACKUP_ROOT_DIR); - assertTrue(tables.size() == 0); + assertEquals((Stage.stage_4.ordinal() == stage) ? 2 : 0, tables.size()); } } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java index 1ece1770489b..8c2a58286042 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -109,8 +110,16 @@ public void run(String[] backupIds) throws IOException { FileSystem fs = FileSystem.get(getConf()); try { + List backupInfos = new ArrayList<>(); + for (String backupId : backupIds) { + BackupInfo bInfo = table.readBackupInfo(backupId); + if (bInfo != null) { + backupInfos.add(bInfo); + } + } + // Start backup exclusive operation - table.startBackupExclusiveOperation(); + table.startBackupExclusiveOperation(backupInfos); // Start merge operation table.startMergeOperation(backupIds); @@ -192,7 +201,7 @@ public void run(String[] backupIds) throws IOException { // Finish merge session table.finishMergeOperation(); // Release lock - table.finishBackupExclusiveOperation(); + table.finishBackupExclusiveOperation(Arrays.asList(backupIds)); } catch (RuntimeException e) { throw e; } catch (Exception e) { @@ -202,7 +211,7 @@ public void run(String[] backupIds) throws IOException { // merge MUST be repeated (no need for repair) cleanupBulkLoadDirs(fs, toPathList(processedTableList)); table.finishMergeOperation(); - table.finishBackupExclusiveOperation(); + table.finishBackupExclusiveOperation(Arrays.asList(backupIds)); throw new IOException("Backup merge operation failed, you should try it again", e); } else { // backup repair must be run @@ -282,8 +291,8 @@ public void TestIncBackupMergeRestore() throws Exception { conf.set(FAILURE_PHASE_KEY, phase.toString()); + String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 }; try (BackupAdmin bAdmin = new BackupAdminImpl(conn)) { - String[] backups = new String[] { backupIdIncMultiple, backupIdIncMultiple2 }; bAdmin.mergeBackups(backups); Assert.fail("Expected IOException"); } catch (IOException e) { @@ -293,7 +302,7 @@ public void TestIncBackupMergeRestore() throws Exception { // Both Merge and backup exclusive operations are finished assertFalse(table.isMergeInProgress()); try { - table.finishBackupExclusiveOperation(); + table.finishBackupExclusiveOperation(Arrays.asList(backups)); Assert.fail("IOException is expected"); } catch (IOException ee) { // Expected @@ -302,7 +311,11 @@ public void TestIncBackupMergeRestore() throws Exception { // Repair is required assertTrue(table.isMergeInProgress()); try { - table.startBackupExclusiveOperation(); + List backupInfos = new ArrayList<>(); + for (String backupId : backups) { + backupInfos.add(table.readBackupInfo(backupId)); + } + table.startBackupExclusiveOperation(backupInfos); Assert.fail("IOException is expected"); } catch (IOException ee) { // Expected - clean up before proceeding diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestParallelBackup.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestParallelBackup.java new file mode 100644 index 000000000000..985f1b98782a --- /dev/null +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestParallelBackup.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.Random; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.util.ToolRunner; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category(LargeTests.class) + +public class TestParallelBackup extends TestBackupBase { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestParallelBackup.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestParallelBackup.class); + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL = new HBaseTestingUtil(); + conf1 = TEST_UTIL.getConfiguration(); + conf1.setInt(BackupManager.BACKUP_EXCLUSIVE_OPERATION_TIMEOUT_SECONDS_KEY, 3); + setUpHelper(); + } + + @Test + public void testParallelFullBackupOnDifferentTable() throws Exception { + int before; + String tableName1 = table1.getNameAsString(); + String tableName2 = table2.getNameAsString(); + try (BackupSystemTable table = new BackupSystemTable(TEST_UTIL.getConnection())) { + before = table.getBackupHistory().size(); + } + + Thread backupThread1 = new Thread(new RunFullBackup(tableName1)); + Thread backupThread2 = new Thread(new RunFullBackup(tableName2)); + + backupThread1.start(); + backupThread2.start(); + + backupThread1.join(); + backupThread2.join(); + + try (BackupSystemTable table = new BackupSystemTable(TEST_UTIL.getConnection())) { + List backups = table.getBackupHistory(); + int after = table.getBackupHistory().size(); + assertEquals(after, before + 2); + for (BackupInfo data : backups) { + String backupId = data.getBackupId(); + assertTrue(checkSucceeded(backupId)); + } + LOG.info("backup complete"); + } + } + + @Test + public void testParallelFullBackupOnSameTable() throws Exception { + int before; + String tableNames = table1.getNameAsString() + "," + table2.getNameAsString(); + try (BackupSystemTable table = new BackupSystemTable(TEST_UTIL.getConnection())) { + before = table.getBackupHistory().size(); + } + + Thread backupThread1 = new Thread(new RunFullBackup(tableNames)); + Thread backupThread2 = new Thread(new RunFullBackup(tableNames)); + + backupThread1.start(); + backupThread2.start(); + + backupThread1.join(); + backupThread2.join(); + + try (BackupSystemTable table = new BackupSystemTable(TEST_UTIL.getConnection())) { + List backups = table.getBackupHistory(); + int after = table.getBackupHistory().size(); + assertEquals(after, before + 1); + for (BackupInfo data : backups) { + String backupId = data.getBackupId(); + assertTrue(checkSucceeded(backupId)); + } + LOG.info("backup complete"); + } + } + + @Test + public void testParallelFullBackupOnMultipleTable() throws Exception { + int before; + String tableNames1 = table1.getNameAsString() + "," + table2.getNameAsString(); + String tableNames2 = table1.getNameAsString(); + try (BackupSystemTable table = new BackupSystemTable(TEST_UTIL.getConnection())) { + before = table.getBackupHistory().size(); + } + + Thread backupThread1 = new Thread(new RunFullBackup(tableNames1)); + Thread backupThread2 = new Thread(new RunFullBackup(tableNames2)); + + backupThread1.start(); + backupThread2.start(); + + backupThread1.join(); + backupThread2.join(); + + try (BackupSystemTable table = new BackupSystemTable(TEST_UTIL.getConnection())) { + List backups = table.getBackupHistory(); + int after = table.getBackupHistory().size(); + assertEquals(after, before + 1); + for (BackupInfo data : backups) { + String backupId = data.getBackupId(); + assertTrue(checkSucceeded(backupId)); + } + LOG.info("backup complete"); + } + } + + static class RunFullBackup implements Runnable { + String tableNames; + + RunFullBackup(String tableNames) { + this.tableNames = tableNames; + } + + @Override + public void run() { + try { + String[] args = new String[] { "create", "full", BACKUP_ROOT_DIR, "-t", tableNames }; + Threads.sleep(new Random().nextInt(500)); + int ret = ToolRunner.run(conf1, new BackupDriver(), args); + assertTrue(ret == 0); + } catch (Exception e) { + LOG.error("Failure with exception: " + e.getMessage(), e); + } + } + } +} diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java index 93345fd17059..900fbb3f3aad 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestRepairAfterFailedDelete.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertTrue; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -78,7 +79,7 @@ public void testRepairBackupDelete() throws Exception { admin.restoreSnapshot(snapshotName); admin.enableTable(BackupSystemTable.getTableName(conf1)); // Start backup session - table.startBackupExclusiveOperation(); + table.startBackupExclusiveOperation(Arrays.asList(info)); // Start delete operation table.startDeleteOperation(backupIds);