Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHOENIX-6273: Add support to handle MR Snapshot restore externally #1079

Merged
merged 11 commits into from
Jan 22, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
Expand All @@ -30,6 +31,8 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand All @@ -38,13 +41,15 @@

import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
Expand All @@ -61,16 +66,18 @@
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
import org.apache.phoenix.schema.types.PDouble;
import org.apache.phoenix.schema.types.PhoenixArray;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {

private static final Logger LOGGER = LoggerFactory.getLogger(TableSnapshotReadsMapReduceIT.class);
Expand Down Expand Up @@ -101,6 +108,16 @@ public class TableSnapshotReadsMapReduceIT extends BaseUniqueNamesOwnClusterIT {
private Path tmpDir;
private Configuration conf;
private static final Random RANDOM = new Random();
private Boolean isSnapshotRestoreDoneExternally;

public TableSnapshotReadsMapReduceIT(Boolean isSnapshotRestoreDoneExternally) {
this.isSnapshotRestoreDoneExternally = isSnapshotRestoreDoneExternally;
}

@Parameterized.Parameters
public static synchronized Collection<Boolean> snapshotRestoreDoneExternallyParams() {
return Arrays.asList(true, false);
}

@BeforeClass
public static synchronized void doSetup() throws Exception {
Expand Down Expand Up @@ -165,6 +182,8 @@ public void testSnapshotMapReduceJobNotImpactingTableMapReduceJob() throws Excep
Assert.assertEquals("Correct snapshot name not found in configuration", SNAPSHOT_NAME,
config.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY));

TestingMapReduceParallelScanGrouper.clearNumCallsToGetRegionBoundaries();

try (Connection conn = DriverManager.getConnection(getUrl())) {
// create table
tableName = generateUniqueName();
Expand All @@ -180,7 +199,6 @@ public void testSnapshotMapReduceJobNotImpactingTableMapReduceJob() throws Excep
config = job.getConfiguration();
Assert.assertNull("Snapshot name is not null in Configuration",
config.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY));

}

private Job createAndTestJob(Connection conn)
Expand Down Expand Up @@ -237,7 +255,7 @@ private void setOutput(Job job, String stockStatsTableName) {

private void configureJob(Job job, String tableName, String inputQuery, String condition, boolean shouldSplit) throws Exception {
try {
upsertAndSnapshot(tableName, shouldSplit);
upsertAndSnapshot(tableName, shouldSplit, job.getConfiguration());
result = new ArrayList<>();

job.setMapperClass(TableSnapshotMapper.class);
Expand Down Expand Up @@ -273,6 +291,7 @@ private void configureJob(Job job, String tableName, String inputQuery, String c
}

assertFalse("Should only have stored" + result.size() + "rows in the table for the timestamp!", rs.next());
assertRestoreDirCount(conf, tmpDir.toString(), 1);
} finally {
deleteSnapshotIfExists(SNAPSHOT_NAME);
}
Expand Down Expand Up @@ -333,7 +352,7 @@ private void upsertData(PreparedStatement stmt, String field1, String field2, in
stmt.execute();
}

private void upsertAndSnapshot(String tableName, boolean shouldSplit) throws Exception {
private void upsertAndSnapshot(String tableName, boolean shouldSplit, Configuration configuration) throws Exception {
if (shouldSplit) {
// having very few rows in table doesn't really help much with splitting case.
// we should upsert large no of rows as a prerequisite to splitting
Expand Down Expand Up @@ -363,6 +382,14 @@ private void upsertAndSnapshot(String tableName, boolean shouldSplit) throws Exc
PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
upsertData(stmt, "DDDD", "SNFB", 45);
conn.commit();
if (isSnapshotRestoreDoneExternally) {
//Performing snapshot restore which will be used during scans
Path rootDir = new Path(configuration.get(HConstants.HBASE_DIR));
FileSystem fs = rootDir.getFileSystem(configuration);
Path restoreDir = new Path(configuration.get(PhoenixConfigurationUtil.RESTORE_DIR_KEY));
RestoreSnapshotHelper.copySnapshotForScanner(configuration, fs, rootDir, restoreDir, SNAPSHOT_NAME);
PhoenixConfigurationUtil.setMRSnapshotManagedExternally(configuration, true);
}
}
}

Expand Down Expand Up @@ -428,6 +455,28 @@ private void deleteSnapshotIfExists(String snapshotName) throws Exception {
}
}

/**
* Making sure that restore temp directory is not having multiple sub directories
* for same snapshot restore.
* @param conf
* @param restoreDir
* @param expectedCount
* @throws IOException
*/
private void assertRestoreDirCount(Configuration conf, String restoreDir, int expectedCount)
throws IOException {
FileSystem fs = FileSystem.get(conf);
FileStatus[] subDirectories = fs.listStatus(new Path(restoreDir));
assertNotNull(subDirectories);
if (isSnapshotRestoreDoneExternally) {
//Snapshot Restore to be deleted externally by the caller
assertEquals(expectedCount, subDirectories.length);
} else {
//Snapshot Restore already deleted internally
assertEquals(0, subDirectories.length);
}
}

public static class TableSnapshotMapper extends Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, NullWritable> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,18 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.phoenix.compile.ExplainPlanAttributes
.ExplainPlanAttributesBuilder;
import org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.schema.tuple.Tuple;
Expand Down Expand Up @@ -78,8 +81,12 @@ public TableSnapshotResultIterator(Configuration configuration, Scan scan, ScanM
this.scan = scan;
this.scanMetricsHolder = scanMetricsHolder;
this.scanIterator = UNINITIALIZED_SCANNER;
this.restoreDir = new Path(configuration.get(PhoenixConfigurationUtil.RESTORE_DIR_KEY),
UUID.randomUUID().toString());
if (PhoenixConfigurationUtil.isMRSnapshotManagedExternally(configuration)) {
this.restoreDir = new Path(configuration.get(PhoenixConfigurationUtil.RESTORE_DIR_KEY));
} else {
this.restoreDir = new Path(configuration.get(PhoenixConfigurationUtil.RESTORE_DIR_KEY),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sakshamgangwar why do we do the UUID in both the PhoenixMapReduceUtil and TableSnapshotResultIterator? Shouldn't it be one or the other? (If so, putting here in TableSnapshotResultIterator seems clearer to me.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gjacoby126 that existed before this change as well, this is not a necessary code, I believe at job level in PhoenixMapReduceUtil we update the configuration restoreDir to a new path so as to make sure that even if restoreDir configuration sent from the caller is same for multiple jobs, it should be able to handle it internally by adding a new sub-directory (which we are also doing at scan level too :) ). Whereas in externally managed now we take full responsibility for the restoreDir creation/usage/deletion.

UUID.randomUUID().toString());
}
this.snapshotName = configuration.get(
PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
this.rootDir = CommonFSUtils.getRootDir(configuration);
Expand All @@ -88,19 +95,34 @@ public TableSnapshotResultIterator(Configuration configuration, Scan scan, ScanM
}

private void init() throws IOException {
RestoreSnapshotHelper.RestoreMetaChanges meta =
RestoreSnapshotHelper.copySnapshotForScanner(this.configuration, this.fs,
this.rootDir, this.restoreDir, this.snapshotName);
List<RegionInfo> restoredRegions = meta.getRegionsToAdd();
this.htd = meta.getTableDescriptor();
this.regions = new ArrayList<RegionInfo>(restoredRegions.size());

for (RegionInfo restoredRegion : restoredRegions) {
if (isValidRegion(restoredRegion)) {
this.regions.add(restoredRegion);
if (!PhoenixConfigurationUtil.isMRSnapshotManagedExternally(configuration)) {
RestoreSnapshotHelper.RestoreMetaChanges meta =
RestoreSnapshotHelper.copySnapshotForScanner(this.configuration, this.fs, this.rootDir,
this.restoreDir, this.snapshotName);
List<RegionInfo> restoredRegions = meta.getRegionsToAdd();
this.htd = meta.getTableDescriptor();
this.regions = new ArrayList<>(restoredRegions.size());
for (RegionInfo restoredRegion : restoredRegions) {
if (isValidRegion(restoredRegion)) {
this.regions.add(restoredRegion);
}
}
} else {
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
gjacoby126 marked this conversation as resolved.
Show resolved Hide resolved
SnapshotProtos.SnapshotDescription snapshotDesc =
SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
SnapshotManifest manifest =
SnapshotManifest.open(configuration, fs, snapshotDir, snapshotDesc);
List<SnapshotProtos.SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
this.regions = new ArrayList<>(regionManifests.size());
this.htd = manifest.getTableDescriptor();
for (SnapshotProtos.SnapshotRegionManifest srm : regionManifests) {
HRegionInfo hri = HRegionInfo.convert(srm.getRegionInfo());
if (isValidRegion(hri)) {
regions.add(hri);
}
}
}

this.regions.sort(RegionInfo.COMPARATOR);
LOGGER.info("Initialization complete with " + regions.size() + " valid regions");
}
Expand Down Expand Up @@ -164,7 +186,9 @@ public void close() throws SQLException {
closed = true; // ok to say closed even if the below code throws an exception
try {
scanIterator.close();
fs.delete(this.restoreDir, true);
gjacoby126 marked this conversation as resolved.
Show resolved Hide resolved
if (!PhoenixConfigurationUtil.isMRSnapshotManagedExternally(configuration)) {
fs.delete(this.restoreDir, true);
}
} catch (IOException e) {
throw ServerUtil.parseServerException(e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,19 @@ protected QueryPlan getQueryPlan(final JobContext context, final Configuration

// setting the snapshot configuration
String snapshotName = configuration.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
String restoreDir = configuration.get(PhoenixConfigurationUtil.RESTORE_DIR_KEY);
boolean isSnapshotRestoreManagedExternally = PhoenixConfigurationUtil.isMRSnapshotManagedExternally(configuration);
Configuration config = queryPlan.getContext().
getConnection().getQueryServices().getConfiguration();
if (snapshotName != null) {
PhoenixConfigurationUtil.setSnapshotNameKey(config, snapshotName);
PhoenixConfigurationUtil.setRestoreDirKey(config, restoreDir);
PhoenixConfigurationUtil.setMRSnapshotManagedExternally(config, isSnapshotRestoreManagedExternally);
} else {
// making sure we unset snapshot name as new job doesn't need it
config.unset(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
config.unset(PhoenixConfigurationUtil.RESTORE_DIR_KEY);
config.unset(PhoenixConfigurationUtil.MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE);
}

return queryPlan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ public final class PhoenixConfigurationUtil {
// provide an absolute path to inject your multi input mapper logic
public static final String MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ = "phoenix.mapreduce.multi.mapper.tracker.path";

// provide control to whether or not handle mapreduce snapshot restore and cleanup operations which
// is used by scanners on phoenix side internally or handled by caller externally
public static final String MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE = "phoenix.mapreduce.external.snapshot.restore";

// by default MR snapshot restore is handled internally by phoenix
public static final boolean DEFAULT_MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE = false;

/**
* Determines type of Phoenix Map Reduce job.
* 1. QUERY allows running arbitrary queries without aggregates
Expand Down Expand Up @@ -866,4 +873,17 @@ public static void setTenantId(Configuration configuration, String tenantId){
configuration.set(MAPREDUCE_TENANT_ID, tenantId);
}

public static void setMRSnapshotManagedExternally(Configuration configuration, Boolean isSnapshotRestoreManagedExternally) {
Preconditions.checkNotNull(configuration);
Preconditions.checkNotNull(isSnapshotRestoreManagedExternally);
configuration.setBoolean(MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE, isSnapshotRestoreManagedExternally);
}

public static boolean isMRSnapshotManagedExternally(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
boolean isSnapshotRestoreManagedExternally =
configuration.getBoolean(MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE, DEFAULT_MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE);
return isSnapshotRestoreManagedExternally;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;

import java.io.IOException;
import java.util.UUID;

/**
* Utility class for setting Configuration parameters for the Map Reduce job
Expand Down Expand Up @@ -181,8 +180,7 @@ private static Configuration setSnapshotInput(Job job, Class<? extends DBWritabl
PhoenixConfigurationUtil.setInputClass(configuration, inputClass);
PhoenixConfigurationUtil.setSnapshotNameKey(configuration, snapshotName);
PhoenixConfigurationUtil.setInputTableName(configuration, tableName);

PhoenixConfigurationUtil.setRestoreDirKey(configuration, new Path(restoreDir, UUID.randomUUID().toString()).toString());
PhoenixConfigurationUtil.setRestoreDirKey(configuration, restoreDir.toString());
PhoenixConfigurationUtil.setSchemaType(configuration, schemaType);
return configuration;
}
Expand Down