/
ExportJob.java
260 lines (218 loc) · 9.57 KB
/
ExportJob.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
/*
* Copyright 2017 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.beam.sequencefiles;
import com.google.bigtable.repackaged.com.google.api.core.InternalExtensionOnly;
import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import com.google.cloud.bigtable.beam.TemplateUtils;
import java.io.Serializable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.DefaultFilenamePolicy;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
import org.apache.hadoop.io.serializer.WritableSerialization;
/**
* Beam job to export a Bigtable table to a set of SequenceFiles. Afterwards, the files can be
* either imported into another Bigtable or HBase table. You can limit the rows and columns exported
* using the options in {@link ExportOptions}. Please note that the rows in SequenceFiles will not
* be sorted.
*
* <p>Furthermore, you can export a subset of the data using a combination of --bigtableStartRow,
* --bigtableStopRow and --bigtableFilter.
*
* <p>Execute the following command to run the job directly:
*
* <pre>
* {@code mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.bigtable.beam.sequencefiles.ExportJob \
* -Dexec.args="--runner=dataflow \
* --project=[PROJECT_ID] \
* --tempLocation=gs://[BUCKET]/[TEMP_PATH] \
* --bigtableInstanceId=[INSTANCE] \
* --bigtableTableId=[TABLE] \
* --destination=gs://[BUCKET]/[EXPORT_PATH] \
* --maxNumWorkers=[nodes * 10]"
* }
* </pre>
*
* <p>Execute the following command to create the Dataflow template:
*
* <pre>
* mvn compile exec:java \
* -DmainClass=com.google.cloud.bigtable.beam.sequencefiles.ExportJob \
* -Dexec.args="--runner=DataflowRunner \
* --project=[PROJECT_ID] \
* --stagingLocation=gs://[STAGING_PATH] \
* --templateLocation=gs://[TEMPLATE_PATH] \
* --wait=false"
* </pre>
*
* <p>There are a few ways to run the pipeline using the template. See Dataflow doc for details:
* https://cloud.google.com/dataflow/docs/templates/executing-templates. Optionally, you can upload
* a metadata file that contains information about the runtime parameters that can be used for
* parameter validation purpose and more. A sample metadata file can be found at
* "src/main/resources/ExportJob_metadata".
*
* <p>An example using gcloud command line:
*
* <pre>
* gcloud beta dataflow jobs run [JOB_NAME] \
* --gcs-location gs://[TEMPLATE_PATH] \
* --parameters bigtableProject=[PROJECT_ID],bigtableInstanceId=[INSTANCE],bigtableTableId=[TABLE],destinationPath=gs://[DESTINATION_PATH],filenamePrefix=[FILENAME_PREFIX]
* </pre>
*/
@InternalExtensionOnly
public class ExportJob {
public interface ExportOptions extends GcpOptions {
@Description("This Bigtable App Profile id.")
ValueProvider<String> getBigtableAppProfileId();
@SuppressWarnings("unused")
void setBigtableAppProfileId(ValueProvider<String> appProfileId);
@Description("The project that contains the table to export. Defaults to --project.")
@Default.InstanceFactory(Utils.DefaultBigtableProjectFactory.class)
ValueProvider<String> getBigtableProject();
@SuppressWarnings("unused")
void setBigtableProject(ValueProvider<String> projectId);
@Description("The Bigtable instance id that contains the table to export.")
ValueProvider<String> getBigtableInstanceId();
@SuppressWarnings("unused")
void setBigtableInstanceId(ValueProvider<String> instanceId);
@Description("The Bigtable table id to export.")
ValueProvider<String> getBigtableTableId();
@SuppressWarnings("unused")
void setBigtableTableId(ValueProvider<String> tableId);
@Description("The row where to start the export from, defaults to the first row.")
@Default.String("")
ValueProvider<String> getBigtableStartRow();
@SuppressWarnings("unused")
void setBigtableStartRow(ValueProvider<String> startRow);
@Description("The row where to stop the export, defaults to last row.")
@Default.String("")
ValueProvider<String> getBigtableStopRow();
@SuppressWarnings("unused")
void setBigtableStopRow(ValueProvider<String> stopRow);
@Description("Maximum number of cell versions.")
@Default.Integer(Integer.MAX_VALUE)
ValueProvider<Integer> getBigtableMaxVersions();
@SuppressWarnings("unused")
void setBigtableMaxVersions(ValueProvider<Integer> maxVersions);
@Description("Filter string. See: http://hbase.apache.org/book.html#thrift.")
@Default.String("")
ValueProvider<String> getBigtableFilter();
@SuppressWarnings("unused")
void setBigtableFilter(ValueProvider<String> filter);
@Description("The destination directory")
ValueProvider<String> getDestinationPath();
@SuppressWarnings("unused")
void setDestinationPath(ValueProvider<String> destinationPath);
@Description("The prefix for each shard in destinationPath")
@Default.String("part")
ValueProvider<String> getFilenamePrefix();
@SuppressWarnings("unused")
void setFilenamePrefix(ValueProvider<String> filenamePrefix);
@Description("Wait for pipeline to finish.")
@Default.Boolean(true)
boolean getWait();
@SuppressWarnings("unused")
void setWait(boolean wait);
@Description("Get if idle timeout is retried.")
@Default.Boolean(true)
boolean getRetryIdleTimeout();
@SuppressWarnings("unused")
void setRetryIdleTimeout(boolean retryIdleTimeout);
}
public static void main(String[] args) {
PipelineOptionsFactory.register(ExportOptions.class);
ExportOptions opts =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ExportOptions.class);
Pipeline pipeline = buildPipeline(opts);
PipelineResult result = pipeline.run();
if (opts.getWait()) {
Utils.waitForPipelineToFinish(result);
}
}
static Pipeline buildPipeline(ExportOptions opts) {
// Use the base target directory to stage bundles
ValueProvider<ResourceId> destinationPath =
NestedValueProvider.of(opts.getDestinationPath(), new StringToDirResourceId());
// Concat the destination path & prefix for the final path
FilePathPrefix filePathPrefix = new FilePathPrefix(destinationPath, opts.getFilenamePrefix());
SequenceFileSink<ImmutableBytesWritable, Result> sink =
new SequenceFileSink<>(
destinationPath,
DefaultFilenamePolicy.fromStandardParameters(filePathPrefix, null, "", false),
ImmutableBytesWritable.class,
WritableSerialization.class,
Result.class,
ResultSerialization.class);
Pipeline pipeline = Pipeline.create(Utils.tweakOptions(opts));
CloudBigtableScanConfiguration config = TemplateUtils.buildExportConfig(opts);
pipeline
.apply("Read table", Read.from(CloudBigtableIO.read(config)))
.apply("Format results", MapElements.via(new ResultToKV()))
.apply("Write", WriteFiles.to(sink));
return pipeline;
}
static class ResultToKV extends SimpleFunction<Result, KV<ImmutableBytesWritable, Result>> {
@Override
public KV<ImmutableBytesWritable, Result> apply(Result input) {
return KV.of(new ImmutableBytesWritable(input.getRow()), input);
}
}
static class StringToDirResourceId
implements SerializableFunction<String, ResourceId>, Serializable {
@Override
public ResourceId apply(String input) {
return FileSystems.matchNewResource(input, true);
}
}
static class FilePathPrefix implements ValueProvider<ResourceId>, Serializable {
private final ValueProvider<ResourceId> destinationPath;
private final ValueProvider<String> filenamePrefix;
FilePathPrefix(
ValueProvider<ResourceId> destinationPath, ValueProvider<String> filenamePrefix) {
this.destinationPath = destinationPath;
this.filenamePrefix = filenamePrefix;
}
@Override
public ResourceId get() {
return destinationPath
.get()
.resolve(filenamePrefix.get(), StandardResolveOptions.RESOLVE_FILE);
}
@Override
public boolean isAccessible() {
return destinationPath.isAccessible() && filenamePrefix.isAccessible();
}
}
}