Skip to content

Commit fcdd130

Browse files
authored
feat(import): support multiple hbase snapshot imports, add sharding, stabilization, and resilience updates (#4600)
- Supports importing multiple snapshot copies concurrently from GCS or local filesystem. - Adds parallel sharding utilizing Splittable DoFns and custom RegionConfig mapping. - Improves stabilization by resolving various NullPointerExceptions and Mockito inline limitations on JDK 21+. - Introduces comprehensive unit testing for all steps of the snapshot import pipeline.
1 parent 7c14d0e commit fcdd130

47 files changed

Lines changed: 5990 additions & 43 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

bigtable-dataflow-parent/bigtable-beam-import/EnableAutoValue.txt

Whitespace-only changes.

bigtable-dataflow-parent/bigtable-beam-import/pom.xml

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,33 @@ limitations under the License.
3939
<type>pom</type>
4040
<scope>import</scope>
4141
</dependency>
42+
43+
<!-- Version alignment -->
44+
<!-- Mark all annotations as provided. They don't affect the runtime of the pipeline so
45+
there is no need to try to version align them -->
46+
<dependency>
47+
<groupId>org.checkerframework</groupId>
48+
<artifactId>checker-qual</artifactId>
49+
<version>3.31.0</version>
50+
<scope>provided</scope>
51+
</dependency>
52+
<dependency>
53+
<groupId>com.google.errorprone</groupId>
54+
<artifactId>error_prone_annotations</artifactId>
55+
<version>2.18.0</version>
56+
<scope>provided</scope>
57+
</dependency>
58+
<dependency>
59+
<groupId>org.codehaus.mojo</groupId>
60+
<artifactId>animal-sniffer-annotations</artifactId>
61+
<version>1.22</version>
62+
<scope>provided</scope>
63+
</dependency>
64+
<dependency>
65+
<groupId>jakarta.annotation</groupId>
66+
<artifactId>jakarta.annotation-api</artifactId>
67+
<scope>provided</scope>
68+
</dependency>
4269
</dependencies>
4370
</dependencyManagement>
4471

@@ -85,6 +112,7 @@ limitations under the License.
85112
<artifactId>bigtable-hbase-beam</artifactId>
86113
<version>2.18.4-SNAPSHOT</version> <!-- {x-version-update:bigtable-client-parent:current} -->
87114
<exclusions>
115+
<!-- Exclude hbase-shaded-client to prevent reintroducing the dnsjava SPI / LiteralByteString conflict (NoClassDefFoundError) -->
88116
<exclusion>
89117
<groupId>org.apache.hbase</groupId>
90118
<artifactId>hbase-shaded-client</artifactId>
@@ -104,6 +132,7 @@ limitations under the License.
104132
</dependency>
105133

106134

135+
<!-- Use hbase-shaded-mapreduce (instead of hbase-shaded-client) to defeat the dnsjava SPI / LiteralByteString conflict (NoClassDefFoundError on JDK 21+) -->
107136
<dependency>
108137
<groupId>org.apache.hbase</groupId>
109138
<artifactId>hbase-shaded-mapreduce</artifactId>
@@ -118,6 +147,10 @@ limitations under the License.
118147
<groupId>org.apache.hadoop</groupId>
119148
<artifactId>hadoop-hdfs-client</artifactId>
120149
</exclusion>
150+
<exclusion>
151+
<groupId>org.apache.hbase</groupId>
152+
<artifactId>hbase-shaded-client</artifactId>
153+
</exclusion>
121154
</exclusions>
122155
</dependency>
123156
<dependency>
@@ -134,7 +167,11 @@ limitations under the License.
134167
<artifactId>beam-runners-direct-java</artifactId>
135168
<scope>test</scope>
136169
</dependency>
137-
170+
<dependency>
171+
<groupId>com.google.cloud</groupId>
172+
<artifactId>google-cloud-bigtable-emulator-core</artifactId>
173+
<scope>test</scope>
174+
</dependency>
138175
<dependency>
139176
<groupId>org.apache.hbase</groupId>
140177
<artifactId>hbase-shaded-testing-util</artifactId>
@@ -148,11 +185,6 @@ limitations under the License.
148185
</exclusions>
149186
</dependency>
150187

151-
<dependency>
152-
<groupId>com.google.cloud</groupId>
153-
<artifactId>google-cloud-bigtable-emulator-core</artifactId>
154-
<scope>test</scope>
155-
</dependency>
156188
<dependency>
157189
<groupId>com.google.cloud.bigtable</groupId>
158190
<artifactId>bigtable-internal-test-helper</artifactId>
@@ -186,7 +218,7 @@ limitations under the License.
186218
</dependency>
187219
<dependency>
188220
<groupId>org.mockito</groupId>
189-
<artifactId>mockito-core</artifactId>
221+
<artifactId>mockito-inline</artifactId>
190222
<version>${mockito.version}</version>
191223
<scope>test</scope>
192224
</dependency>
@@ -221,6 +253,14 @@ limitations under the License.
221253

222254

223255
<plugins>
256+
<plugin>
257+
<groupId>org.apache.maven.plugins</groupId>
258+
<artifactId>maven-compiler-plugin</artifactId>
259+
<configuration>
260+
<fork>true</fork>
261+
</configuration>
262+
</plugin>
263+
224264
<plugin>
225265
<artifactId>maven-jar-plugin</artifactId>
226266
<configuration>
@@ -287,6 +327,8 @@ limitations under the License.
287327
<filter>
288328
<artifact>*:*</artifact>
289329
<excludes>
330+
<!-- Exclude InetAddressResolverProvider to prevent the dnsjava SPI / LiteralByteString conflict (NoClassDefFoundError on JDK 21+) -->
331+
<exclude>META-INF/services/java.net.spi.InetAddressResolverProvider</exclude>
290332
<exclude>META-INF/*.SF</exclude>
291333
<exclude>META-INF/*.DSA</exclude>
292334
<exclude>META-INF/*.RSA</exclude>

bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/Main.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.google.bigtable.repackaged.com.google.api.core.InternalApi;
1919
import com.google.bigtable.repackaged.com.google.api.core.InternalExtensionOnly;
20+
import com.google.cloud.bigtable.beam.hbasesnapshots.HBaseSnapshotRestoreTool;
2021
import com.google.cloud.bigtable.beam.hbasesnapshots.ImportJobFromHbaseSnapshot;
2122
import com.google.cloud.bigtable.beam.sequencefiles.CreateTableHelper;
2223
import com.google.cloud.bigtable.beam.sequencefiles.ExportJob;
@@ -51,6 +52,9 @@ public static void main(String[] args) throws Exception {
5152
case "importsnapshot":
5253
ImportJobFromHbaseSnapshot.main(subArgs);
5354
break;
55+
case "restoresnapshot":
56+
HBaseSnapshotRestoreTool.main(subArgs);
57+
break;
5458
case "create-table":
5559
CreateTableHelper.main(subArgs);
5660
break;

bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/TemplateUtils.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import com.google.cloud.bigtable.beam.sequencefiles.ImportJob.ImportOptions;
2121
import com.google.cloud.bigtable.beam.validation.SyncTableJob.SyncTableOptions;
2222
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
23+
import com.google.cloud.bigtable.hbase.wrappers.BigtableHBaseSettings;
24+
import java.util.concurrent.TimeUnit;
2325
import org.apache.beam.sdk.options.ValueProvider;
2426

2527
/**
@@ -44,7 +46,16 @@ public static CloudBigtableTableConfiguration buildImportConfig(
4446
.withProjectId(opts.getBigtableProject())
4547
.withInstanceId(opts.getBigtableInstanceId())
4648
.withTableId(opts.getBigtableTableId())
47-
.withConfiguration(BigtableOptionsFactory.CUSTOM_USER_AGENT_KEY, customUserAgent);
49+
.withConfiguration(BigtableOptionsFactory.CUSTOM_USER_AGENT_KEY, customUserAgent)
50+
.withConfiguration(
51+
BigtableOptionsFactory.MAX_INFLIGHT_RPCS_KEY,
52+
ValueProvider.NestedValueProvider.of(opts.getMaxInflightRpcs(), String::valueOf))
53+
.withConfiguration(
54+
BigtableHBaseSettings.BULK_MUTATION_CLOSE_TIMEOUT_MILLISECONDS,
55+
ValueProvider.NestedValueProvider.of(
56+
opts.getBulkMutationCloseTimeoutMinutes(),
57+
(Integer minutes) ->
58+
String.valueOf(TimeUnit.MINUTES.toMillis(minutes == null ? 30 : minutes))));
4859
if (opts.getBigtableAppProfileId() != null) {
4960
builder.withAppProfileId(opts.getBigtableAppProfileId());
5061
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.beam.hbasesnapshots;
17+
18+
import com.google.api.core.InternalExtensionOnly;
19+
import com.google.cloud.bigtable.beam.hbasesnapshots.conf.ImportConfig;
20+
import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotConfig;
21+
import com.google.common.annotations.VisibleForTesting;
22+
import com.google.common.base.Preconditions;
23+
import com.google.gson.Gson;
24+
import com.google.gson.GsonBuilder;
25+
import java.io.IOException;
26+
import java.util.List;
27+
import java.util.Map;
28+
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
29+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
30+
import org.apache.commons.logging.Log;
31+
import org.apache.commons.logging.LogFactory;
32+
import org.apache.hadoop.conf.Configuration;
33+
import org.apache.hadoop.fs.FileSystem;
34+
import org.apache.hadoop.fs.Path;
35+
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
36+
37+
/**
38+
* Tool to restore HBase snapshots in GCS for scanning. This tool runs locally (without Dataflow)
39+
* and copies snapshot files to a restore path, resolving HLinks and References so that they can be
40+
* read by a scanner.
41+
*
42+
* <p>Execute the following command to run the tool directly using system properties:
43+
*
44+
* <pre>
45+
* {@code mvn compile exec:java \
46+
* -Dexec.mainClass=com.google.cloud.bigtable.beam.hbasesnapshots.HBaseSnapshotRestoreTool \
47+
* -Dproject=[PROJECT_ID] \
48+
* -DhbaseSnapshotSourceDir=gs://[BUCKET]/[HBASE_EXPORT_ROOT_PATH]/data \
49+
* -Dsnapshots=[SNAPSHOT_NAMES] \
50+
* -DrestorePath=gs://[BUCKET]/[HBASE_EXPORT_ROOT_PATH]/restore
51+
* }
52+
* </pre>
53+
*
54+
* <p>Alternatively, you can provide a path to a JSON configuration file:
55+
*
56+
* <pre>
57+
* {@code mvn compile exec:java \
58+
* -Dexec.mainClass=com.google.cloud.bigtable.beam.hbasesnapshots.HBaseSnapshotRestoreTool \
59+
* -Dproject=[PROJECT_ID] \
60+
* -DimportConfigFilePath=[PATH_TO_JSON_CONFIG]
61+
* }
62+
* </pre>
63+
*
64+
* <p>The JSON configuration file should have the following format:
65+
*
66+
* <pre>
67+
* {
68+
* "sourcepath": "gs://[BUCKET]/[HBASE_EXPORT_ROOT_PATH]/data",
69+
* "restorepath": "gs://[BUCKET]/[HBASE_EXPORT_ROOT_PATH]/restore",
70+
* "snapshots": {
71+
* "snapshot1": "table1",
72+
* "snapshot2": "table2"
73+
* }
74+
* }
75+
* </pre>
76+
*/
77+
@InternalExtensionOnly
78+
public class HBaseSnapshotRestoreTool {
79+
private static final Log LOG = LogFactory.getLog(HBaseSnapshotRestoreTool.class);
80+
81+
private static final String PROJECT_PROPERTY = "project";
82+
private static final String IMPORT_CONFIG_FILE_PATH_PROPERTY = "importConfigFilePath";
83+
private static final String HBASE_SNAPSHOT_SOURCE_DIR_PROPERTY = "hbaseSnapshotSourceDir";
84+
private static final String SNAPSHOTS_PROPERTY = "snapshots";
85+
private static final String RESTORE_PATH_PROPERTY = "restorePath";
86+
87+
public static void main(String[] args) throws Exception {
88+
GcsOptions options = PipelineOptionsFactory.create().as(GcsOptions.class);
89+
String project = System.getProperty(PROJECT_PROPERTY);
90+
if (project != null) {
91+
options.setProject(project);
92+
}
93+
94+
ImportConfig importConfig =
95+
System.getProperty(IMPORT_CONFIG_FILE_PATH_PROPERTY) != null
96+
? buildImportConfigFromConfigFile(System.getProperty(IMPORT_CONFIG_FILE_PATH_PROPERTY))
97+
: buildImportConfigFromArgs(options);
98+
99+
LOG.info(
100+
String.format(
101+
"SourcePath:%s, RestorePath:%s",
102+
importConfig.getSourcepath(), importConfig.getRestorepath()));
103+
104+
Map<String, String> configurations =
105+
SnapshotUtils.getConfiguration(
106+
null, // invoke from a DirectRunner without using dataflow
107+
options.getProject(),
108+
importConfig.getSourcepath(),
109+
importConfig.getHbaseConfiguration());
110+
111+
List<SnapshotConfig> snapshotConfigs =
112+
SnapshotUtils.buildSnapshotConfigs(
113+
importConfig.getSnapshots(),
114+
configurations,
115+
options.getProject(),
116+
importConfig.getSourcepath(),
117+
importConfig.getRestorepath());
118+
119+
for (SnapshotConfig config : snapshotConfigs) {
120+
restoreSnapshot(config);
121+
}
122+
}
123+
124+
@VisibleForTesting
125+
static ImportConfig buildImportConfigFromArgs(GcsOptions gcsOptions) throws IOException {
126+
String sourceDir = System.getProperty(HBASE_SNAPSHOT_SOURCE_DIR_PROPERTY);
127+
String snapshotsProperty = System.getProperty(SNAPSHOTS_PROPERTY);
128+
Map<String, String> snapshots = null;
129+
if (snapshotsProperty != null) {
130+
snapshots =
131+
(sourceDir != null && SnapshotUtils.isRegex(snapshotsProperty))
132+
? SnapshotUtils.getSnapshotsFromSnapshotPath(
133+
sourceDir, gcsOptions.getGcsUtil(), snapshotsProperty)
134+
: SnapshotUtils.getSnapshotsFromString(snapshotsProperty);
135+
}
136+
137+
ImportConfig importConfig = new ImportConfig();
138+
importConfig.setSourcepath(sourceDir);
139+
if (snapshots != null) {
140+
importConfig.setSnapshotsFromMap(snapshots);
141+
}
142+
importConfig.validate();
143+
SnapshotUtils.setRestorePath(System.getProperty(RESTORE_PATH_PROPERTY), importConfig);
144+
145+
return importConfig;
146+
}
147+
148+
@VisibleForTesting
149+
static ImportConfig buildImportConfigFromConfigFile(String configFilePath) throws Exception {
150+
Gson gson = new GsonBuilder().create();
151+
ImportConfig importConfig =
152+
gson.fromJson(SnapshotUtils.readFileContents(configFilePath), ImportConfig.class);
153+
Preconditions.checkNotNull(importConfig, "ImportConfig parsed from file cannot be null.");
154+
importConfig.validate();
155+
SnapshotUtils.setRestorePath(importConfig.getRestorepath(), importConfig);
156+
return importConfig;
157+
}
158+
159+
@VisibleForTesting
160+
/**
161+
* Creates a copy of Snasphsot from the source path into restore path.
162+
*
163+
* @param snapshotConfig - Snapshot Configuration
164+
* @throws IOException
165+
*/
166+
static void restoreSnapshot(SnapshotConfig snapshotConfig) throws IOException {
167+
Path sourcePath = snapshotConfig.getSourcePath();
168+
Path restorePath = snapshotConfig.getRestorePath();
169+
Configuration configuration = snapshotConfig.getConfiguration();
170+
LOG.info(
171+
String.format("RestoreSnapshot - sourcePath:%s restorePath: %s", sourcePath, restorePath));
172+
FileSystem fileSystem = sourcePath.getFileSystem(configuration);
173+
if (fileSystem.exists(restorePath)) {
174+
LOG.info(
175+
String.format(
176+
"Restore path %s already exists, deleting it for idempotency", restorePath));
177+
fileSystem.delete(restorePath, true);
178+
}
179+
RestoreSnapshotHelper.copySnapshotForScanner(
180+
configuration, fileSystem, sourcePath, restorePath, snapshotConfig.getSnapshotName());
181+
}
182+
}

0 commit comments

Comments
 (0)