Skip to content

Commit

Permalink
Merge pull request #1484 from stankiewicz:mongo_schema
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 649148698
  • Loading branch information
cloud-teleport committed Jul 3, 2024
2 parents 0fef11c + 4002cac commit 1eda8b8
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 1 deletion.
1 change: 1 addition & 0 deletions v2/mongodb-to-googlecloud/README_MongoDB_to_BigQuery.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
* **useStorageWriteApiAtLeastOnce** : When using the Storage Write API, specifies the write semantics. To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to `true`. To use exactly-once semantics, set the parameter to `false`. This parameter applies only when `useStorageWriteApi` is `true`. The default value is `false`.
* **javascriptDocumentTransformGcsPath** : The Cloud Storage URI of the `.js` file that defines the JavaScript user-defined function (UDF) to use. (Example: gs://your-bucket/your-transforms/*.js).
* **javascriptDocumentTransformFunctionName** : The name of the JavaScript user-defined function (UDF) to use. For example, if your JavaScript function code is `myTransform(inJson) { /*...do stuff...*/ }`, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). (Example: transform).
* **bigQuerySchemaPath** : The Cloud Storage path for the BigQuery JSON schema. (Example: gs://your-bucket/your-schema.json).



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ public interface BigQueryWriteOptions extends PipelineOptions, DataflowPipelineO
String getOutputTableSpec();

void setOutputTableSpec(String outputTableSpec);

@TemplateParameter.GcsReadFile(
order = 2,
optional = true,
description = "Cloud Storage path to BigQuery JSON schema",
helpText = "The Cloud Storage path for the BigQuery JSON schema.",
example = "gs://your-bucket/your-schema.json")
String getBigQuerySchemaPath();

void setBigQuerySchemaPath(String path);
}

/** UDF options. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
package com.google.cloud.teleport.v2.mongodb.templates;

import static com.google.cloud.teleport.v2.utils.GCSUtils.getGcsFileAsString;
import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt;

import com.google.api.client.json.gson.GsonFactory;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.teleport.metadata.Template;
Expand All @@ -33,6 +35,7 @@
import java.io.IOException;
import javax.script.ScriptException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.mongodb.FindQuery;
import org.apache.beam.sdk.io.mongodb.MongoDbIO;
Expand Down Expand Up @@ -109,7 +112,13 @@ public static boolean run(Options options)
// Get MongoDbUri plain text or base64 encrypted with a specific KMS encryption key
String mongoDbUri = maybeDecrypt(options.getMongoDbUri(), options.getKMSEncryptionKey()).get();

if (options.getJavascriptDocumentTransformFunctionName() != null
if (options.getBigQuerySchemaPath() != null) {
// initialize FileSystem to read from GCS
FileSystems.setDefaultPipelineOptions(options);
String jsonSchema = getGcsFileAsString(options.getBigQuerySchemaPath());
GsonFactory gf = new GsonFactory();
bigquerySchema = gf.fromString(jsonSchema, TableSchema.class);
} else if (options.getJavascriptDocumentTransformFunctionName() != null
&& options.getJavascriptDocumentTransformGcsPath() != null) {
bigquerySchema =
MongoDbUtils.getTableFieldSchemaForUDF(
Expand Down

0 comments on commit 1eda8b8

Please sign in to comment.