/
AbstractBigQueryInputFormat.java
365 lines (324 loc) · 14 KB
/
AbstractBigQueryInputFormat.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
package com.google.cloud.hadoop.io.bigquery;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.TableReference;
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase;
import com.google.cloud.hadoop.util.ConfigurationUtil;
import com.google.cloud.hadoop.util.HadoopToStringUtil;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.List;
import java.util.Map;
/**
* Abstract base class for BigQuery input formats. This class is expected to take care of performing
* BigQuery exports to temporary tables, BigQuery exports to GCS and cleaning up any files or tables
* that either of those processes create.
* @param <K> Key type
* @param <V> Value type
*/
public abstract class AbstractBigQueryInputFormat<K, V>
extends InputFormat<K, V> implements DelegateRecordReaderFactory<K, V> {
protected static final Logger LOG =
LoggerFactory.getLogger(AbstractBigQueryInputFormat.class);
/**
* Configuration key for InputFormat class name.
*/
public static final String INPUT_FORMAT_CLASS_KEY = "mapreduce.inputformat.class";
/**
* Configure the BigQuery input table for a job
*/
public static void setInputTable(
Configuration configuration, String projectId, String datasetId, String tableId)
throws IOException {
BigQueryConfiguration.configureBigQueryInput(configuration, projectId, datasetId, tableId);
}
/**
* Configure the BigQuery input table for a job
*/
public static void setInputTable(Configuration configuration, TableReference tableReference)
throws IOException {
setInputTable(
configuration,
tableReference.getProjectId(),
tableReference.getDatasetId(),
tableReference.getTableId());
}
/**
* Configure a directory to which we will export BigQuery data
*/
public static void setTemporaryCloudStorageDirectory(Configuration configuration, String path) {
configuration.set(BigQueryConfiguration.TEMP_GCS_PATH_KEY, path);
}
/**
* Enable or disable BigQuery sharded output.
*/
public static void setEnableShardedExport(Configuration configuration, boolean enabled) {
configuration.setBoolean(BigQueryConfiguration.ENABLE_SHARDED_EXPORT_KEY, enabled);
}
protected static boolean isShardedExportEnabled(Configuration configuration) {
return configuration.getBoolean(
BigQueryConfiguration.ENABLE_SHARDED_EXPORT_KEY,
BigQueryConfiguration.ENABLE_SHARDED_EXPORT_DEFAULT);
}
/**
* Get the ExportFileFormat that this input format supports.
*/
public abstract ExportFileFormat getExportFileFormat();
@Override
public List<InputSplit> getSplits(JobContext context)
throws IOException, InterruptedException {
LOG.debug("getSplits({})", HadoopToStringUtil.toString(context));
Preconditions.checkNotNull(context.getJobID(), "getSplits requires a jobID");
final Configuration configuration = context.getConfiguration();
final JobID jobId = context.getJobID();
BigQueryHelper bigQueryHelper = null;
try {
bigQueryHelper = getBigQueryHelper(configuration);
} catch (GeneralSecurityException gse) {
LOG.error("Failed to create BigQuery client", gse);
throw new IOException("Failed to create BigQuery client", gse);
}
String exportPath = extractExportPathRoot(configuration, jobId);
configuration.set(BigQueryConfiguration.TEMP_GCS_PATH_KEY, exportPath);
Export export = constructExport(
configuration,
getExportFileFormat(),
exportPath,
bigQueryHelper);
export.prepare();
// Invoke the export, maybe wait for it to complete.
try {
export.beginExport();
export.waitForUsableMapReduceInput();
} catch (IOException | InterruptedException ie) {
LOG.error("Error while exporting", ie);
throw new IOException("Error while exporting", ie);
}
List<InputSplit> splits = export.getSplits(context);
if (LOG.isDebugEnabled()) {
try {
// Stringifying a really big list of splits can be expensive, so we guard with
// isDebugEnabled().
LOG.debug("getSplits -> {}", HadoopToStringUtil.toString(splits));
} catch (InterruptedException e) {
LOG.debug("getSplits -> {}", "*exception on toString()*");
}
}
return splits;
}
@Override
public RecordReader<K, V> createRecordReader(
InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
return createRecordReader(inputSplit, taskAttemptContext.getConfiguration());
}
public RecordReader<K, V> createRecordReader(
InputSplit inputSplit, Configuration configuration)
throws IOException, InterruptedException {
if (isShardedExportEnabled(configuration)) {
Preconditions.checkArgument(
inputSplit instanceof ShardedInputSplit,
"Split should be instance of ShardedInputSplit.");
LOG.debug("createRecordReader -> DynamicFileListRecordReader");
return new DynamicFileListRecordReader<>(this);
} else {
Preconditions.checkArgument(
inputSplit instanceof UnshardedInputSplit,
"Split should be instance of UnshardedInputSplit.");
LOG.debug("createRecordReader -> createDelegateRecordReader()");
return createDelegateRecordReader(inputSplit, configuration);
}
}
private static Export constructExport(
Configuration configuration, ExportFileFormat format, String exportPath,
BigQueryHelper bigQueryHelper)
throws IOException {
LOG.debug("contructExport() with export path {}", exportPath);
// Extract relevant configuration settings.
Map<String, String> mandatoryConfig = ConfigurationUtil.getMandatoryConfig(
configuration, BigQueryConfiguration.MANDATORY_CONFIG_PROPERTIES_INPUT);
String jobProjectId = mandatoryConfig.get(BigQueryConfiguration.PROJECT_ID_KEY);
String inputProjectId = mandatoryConfig.get(BigQueryConfiguration.INPUT_PROJECT_ID_KEY);
String datasetId = mandatoryConfig.get(BigQueryConfiguration.INPUT_DATASET_ID_KEY);
String tableName = mandatoryConfig.get(BigQueryConfiguration.INPUT_TABLE_ID_KEY);
TableReference exportTableReference = new TableReference()
.setDatasetId(datasetId)
.setProjectId(inputProjectId)
.setTableId(tableName);
boolean enableShardedExport = isShardedExportEnabled(configuration);
boolean deleteTableOnExit = configuration.getBoolean(
BigQueryConfiguration.DELETE_INTERMEDIATE_TABLE_KEY,
BigQueryConfiguration.DELETE_INTERMEDIATE_TABLE_DEFAULT);
String query = configuration.get(BigQueryConfiguration.INPUT_QUERY_KEY);
LOG.debug(
"isShardedExportEnabled = %s, deleteTableOnExit = %s, tableReference = %s, query = %s",
enableShardedExport,
deleteTableOnExit,
BigQueryStrings.toString(exportTableReference),
query);
Export export;
if (enableShardedExport) {
export = new ShardedExportToCloudStorage(
configuration,
exportPath,
format,
bigQueryHelper,
jobProjectId,
exportTableReference);
} else {
export = new UnshardedExportToCloudStorage(
configuration,
exportPath,
format,
bigQueryHelper,
jobProjectId,
exportTableReference);
}
if (!Strings.isNullOrEmpty(query)) {
// A query was specified. In this case we want to add add prepare and cleanup steps
// via the QueryBasedExport.
export = new QueryBasedExport(
export, query, jobProjectId, bigQueryHelper, exportTableReference, deleteTableOnExit);
}
return export;
}
/**
* Either resolves an export path based on GCS_BUCKET_KEY and JobID, or defers to a pre-provided
* BigQueryConfiguration.TEMP_GCS_PATH_KEY.
*/
protected static String extractExportPathRoot(Configuration configuration, JobID jobId)
throws IOException {
String exportPathRoot = configuration.get(BigQueryConfiguration.TEMP_GCS_PATH_KEY);
if (Strings.isNullOrEmpty(exportPathRoot)) {
LOG.info("Fetching mandatory field '{}' since '{}' isn't set explicitly",
BigQueryConfiguration.GCS_BUCKET_KEY, BigQueryConfiguration.TEMP_GCS_PATH_KEY);
String gcsBucket = ConfigurationUtil.getMandatoryConfig(
configuration, BigQueryConfiguration.GCS_BUCKET_KEY);
exportPathRoot = String.format(
"gs://%s/hadoop/tmp/bigquery/%s", gcsBucket, jobId);
LOG.info("Resolved GCS export path: '{}'", exportPathRoot);
} else {
LOG.info("Using user-provided custom export path: '{}'", exportPathRoot);
}
Path exportPath = new Path(exportPathRoot);
FileSystem fs = exportPath.getFileSystem(configuration);
Preconditions.checkState(
fs instanceof GoogleHadoopFileSystemBase,
"Export FS must derive from GoogleHadoopFileSystemBase.");
return exportPathRoot;
}
/**
* Cleans up relevant temporary resources associated with a job which used the
* GsonBigQueryInputFormat; this should be called explicitly after the completion of the entire
* job. Possibly cleans up intermediate export tables if configured to use one due to
* specifying a BigQuery "query" for the input. Cleans up the GCS directoriy where BigQuery
* exported its files for reading.
*
* @param context The JobContext which contains the full configuration plus JobID which matches
* the JobContext seen in the corresponding BigQueryInptuFormat.getSplits() setup.
*
* @deprecated Use {@link #cleanupJob(Configuration, JobID)}
*/
@Deprecated
public static void cleanupJob(JobContext context)
throws IOException {
// Since cleanupJob may be called from a place where the actual runtime Configuration isn't
// available, we must re-walk the same logic for generating the export path based on settings
// and JobID in the context.
cleanupJob(context.getConfiguration(), context.getJobID());
}
/**
* Cleans up relevant temporary resources associated with a job which used the
* GsonBigQueryInputFormat; this should be called explicitly after the completion of the entire
* job. Possibly cleans up intermediate export tables if configured to use one due to
* specifying a BigQuery "query" for the input. Cleans up the GCS directoriy where BigQuery
* exported its files for reading.
*/
public static void cleanupJob(Configuration configuration, JobID jobId) throws IOException {
String exportPathRoot = extractExportPathRoot(configuration, jobId);
configuration.set(BigQueryConfiguration.TEMP_GCS_PATH_KEY, exportPathRoot);
Bigquery bigquery = null;
try {
bigquery = new BigQueryFactory().getBigQuery(configuration);
} catch (GeneralSecurityException gse) {
throw new IOException("Failed to create Bigquery client", gse);
}
cleanupJob(new BigQueryHelper(bigquery), configuration);
}
/**
* Similar to {@link #cleanupJob(JobContext)}, but allows specifying the Bigquery instance to use.
*
* @param bigQueryHelper The Bigquery API-client helper instance to use.
* @param config The job Configuration object which contains settings such as whether sharded
* export was enabled, which GCS directory the export was performed in, etc.
*/
public static void cleanupJob(BigQueryHelper bigQueryHelper, Configuration config)
throws IOException {
LOG.debug("cleanupJob(Bigquery, Configuration)");
String gcsPath = ConfigurationUtil.getMandatoryConfig(
config, BigQueryConfiguration.TEMP_GCS_PATH_KEY);
Export export = constructExport(
config, getExportFileFormat(config), gcsPath, bigQueryHelper);
try {
export.cleanupExport();
} catch (IOException ioe) {
// Error is swallowed as job has completed successfully and the only failure is deleting
// temporary data.
// This matches the FileOutputCommitter pattern.
LOG.warn(
"Could not delete intermediate data from BigQuery export", ioe);
}
}
@SuppressWarnings("unchecked")
protected static ExportFileFormat getExportFileFormat(Configuration configuration) {
Class<? extends AbstractBigQueryInputFormat<?, ?>> clazz =
(Class<? extends AbstractBigQueryInputFormat<?, ?>>) configuration.getClass(
INPUT_FORMAT_CLASS_KEY, AbstractBigQueryInputFormat.class);
Preconditions.checkState(
AbstractBigQueryInputFormat.class.isAssignableFrom(clazz),
"Expected input format to derive from AbstractBigQueryInputFormat");
return getExportFileFormat(clazz);
}
protected static ExportFileFormat getExportFileFormat(
Class<? extends AbstractBigQueryInputFormat<?, ?>> clazz) {
try {
AbstractBigQueryInputFormat<?, ?> format = clazz.newInstance();
return format.getExportFileFormat();
} catch (InstantiationException | IllegalAccessException e) {
throw Throwables.propagate(e);
}
}
/**
* Helper method to override for testing.
*
* @return Bigquery.
* @throws IOException on IO Error.
* @throws GeneralSecurityException on security exception.
*/
protected Bigquery getBigQuery(Configuration config)
throws GeneralSecurityException, IOException {
BigQueryFactory factory = new BigQueryFactory();
return factory.getBigQuery(config);
}
/**
* Helper method to override for testing.
*/
protected BigQueryHelper getBigQueryHelper(Configuration config)
throws GeneralSecurityException, IOException {
BigQueryFactory factory = new BigQueryFactory();
return factory.getBigQueryHelper(config);
}
}