Skip to content

Commit 7ed341e

Browse files
docs(samples): Add Dataflow BigQueryIO write snippets (#8414)
* BigQuery write samples * Add license headers * Fix lint errors --------- Co-authored-by: Veronica Wasson <VeronicaWasson@users.noreply.github.com>
1 parent f55912e commit 7ed341e

File tree

8 files changed

+426
-14
lines changed

8 files changed

+426
-14
lines changed

dataflow/snippets/src/main/java/com/example/dataflow/BigQueryReadAvro.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,10 @@ public static void main(String[] args) {
5555
// Parse the pipeline options passed into the application. Example:
5656
// --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
5757
// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
58-
PipelineOptionsFactory.register(BigQueryReadOptions.class);
59-
BigQueryReadOptions options = PipelineOptionsFactory.fromArgs(args)
58+
PipelineOptionsFactory.register(ExamplePipelineOptions.class);
59+
ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
6060
.withValidation()
61-
.as(BigQueryReadOptions.class);
61+
.as(ExamplePipelineOptions.class);
6262

6363
// Create a pipeline and apply transforms.
6464
Pipeline pipeline = Pipeline.create(options);

dataflow/snippets/src/main/java/com/example/dataflow/BigQueryReadWithProjectionAndFiltering.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ public static void main(String[] args) {
3131
// Parse the pipeline options passed into the application. Example:
3232
// --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
3333
// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
34-
PipelineOptionsFactory.register(BigQueryReadOptions.class);
35-
BigQueryReadOptions options = PipelineOptionsFactory.fromArgs(args)
34+
PipelineOptionsFactory.register(ExamplePipelineOptions.class);
35+
ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
3636
.withValidation()
37-
.as(BigQueryReadOptions.class);
37+
.as(ExamplePipelineOptions.class);
3838

3939
// Create a pipeline and apply transforms.
4040
Pipeline pipeline = Pipeline.create(options);
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.dataflow;
18+
19+
// [START dataflow_bigquery_stream_exactly_once]
20+
import com.google.api.services.bigquery.model.TableRow;
21+
import org.apache.beam.sdk.Pipeline;
22+
import org.apache.beam.sdk.PipelineResult;
23+
import org.apache.beam.sdk.coders.StringUtf8Coder;
24+
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
25+
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
26+
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
27+
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
28+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
29+
import org.apache.beam.sdk.testing.TestStream;
30+
import org.apache.beam.sdk.transforms.MapElements;
31+
import org.apache.beam.sdk.values.TimestampedValue;
32+
import org.apache.beam.sdk.values.TypeDescriptor;
33+
import org.joda.time.Duration;
34+
import org.joda.time.Instant;
35+
36+
public class BigQueryStreamExactlyOnce {
37+
// Create a PTransform that sends simulated streaming data. In a real application, the data
38+
// source would be an external source, such as Pub/Sub.
39+
private static TestStream<String> createEventSource() {
40+
Instant startTime = new Instant(0);
41+
return TestStream.create(StringUtf8Coder.of())
42+
.advanceWatermarkTo(startTime)
43+
.addElements(
44+
TimestampedValue.of("Alice,20", startTime),
45+
TimestampedValue.of("Bob,30",
46+
startTime.plus(Duration.standardSeconds(1))),
47+
TimestampedValue.of("Charles,40",
48+
startTime.plus(Duration.standardSeconds(2))))
49+
.advanceWatermarkToInfinity();
50+
}
51+
52+
public static PipelineResult main(String[] args) {
53+
// Parse the pipeline options passed into the application. Example:
54+
// --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
55+
// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
56+
PipelineOptionsFactory.register(ExamplePipelineOptions.class);
57+
ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
58+
.withValidation()
59+
.as(ExamplePipelineOptions.class);
60+
options.setStreaming(true);
61+
62+
// Create a pipeline and apply transforms.
63+
Pipeline pipeline = Pipeline.create(options);
64+
pipeline
65+
// Add a streaming data source.
66+
.apply(createEventSource())
67+
// Map the event data into TableRow objects.
68+
.apply(MapElements
69+
.into(TypeDescriptor.of(TableRow.class))
70+
.via((String x) -> {
71+
String[] columns = x.split(",");
72+
return new TableRow().set("user_name", columns[0]).set("age", columns[1]);
73+
}))
74+
// Write the rows to BigQuery
75+
.apply(BigQueryIO.writeTableRows()
76+
.to(String.format("%s:%s.%s",
77+
options.getProjectId(),
78+
options.getDatasetName(),
79+
options.getTableName()))
80+
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
81+
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
82+
.withMethod(Write.Method.STORAGE_WRITE_API)
83+
// For exactly-once processing, set the number of Write API streams and the triggering
84+
// frequency.
85+
.withNumStorageWriteApiStreams(1)
86+
.withTriggeringFrequency(Duration.standardSeconds(5)));
87+
return pipeline.run();
88+
}
89+
}
90+
// [END dataflow_bigquery_stream_exactly_once]
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.dataflow;
18+
19+
// [START dataflow_bigquery_write]
20+
import com.google.api.services.bigquery.model.TableRow;
21+
import java.util.Arrays;
22+
import java.util.List;
23+
import org.apache.beam.sdk.Pipeline;
24+
import org.apache.beam.sdk.coders.AvroCoder;
25+
import org.apache.beam.sdk.coders.DefaultCoder;
26+
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
27+
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
28+
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
29+
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
30+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
31+
import org.apache.beam.sdk.transforms.Create;
32+
33+
public class BigQueryWrite {
34+
// A custom datatype for the source data.
35+
@DefaultCoder(AvroCoder.class)
36+
public static class MyData {
37+
public String name;
38+
public Long age;
39+
40+
public MyData() {}
41+
42+
public MyData(String name, Long age) {
43+
this.name = name;
44+
this.age = age;
45+
}
46+
}
47+
48+
public static void main(String[] args) {
49+
// Example source data.
50+
final List<MyData> data = Arrays.asList(
51+
new MyData("Alice", 40L),
52+
new MyData("Bob", 30L),
53+
new MyData("Charlie", 20L)
54+
);
55+
56+
// Parse the pipeline options passed into the application. Example:
57+
// --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
58+
// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
59+
PipelineOptionsFactory.register(ExamplePipelineOptions.class);
60+
ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
61+
.withValidation()
62+
.as(ExamplePipelineOptions.class);
63+
64+
// Create a pipeline and apply transforms.
65+
Pipeline pipeline = Pipeline.create(options);
66+
pipeline
67+
// Create an in-memory PCollection of MyData objects.
68+
.apply(Create.of(data))
69+
// Write the data to an exiting BigQuery table.
70+
.apply(BigQueryIO.<MyData>write()
71+
.to(String.format("%s:%s.%s",
72+
options.getProjectId(),
73+
options.getDatasetName(),
74+
options.getTableName()))
75+
.withFormatFunction(
76+
(MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
77+
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
78+
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
79+
.withMethod(Write.Method.STORAGE_WRITE_API));
80+
pipeline.run().waitUntilFinish();
81+
}
82+
}
83+
// [END dataflow_bigquery_write]
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.dataflow;
18+
19+
// [START dataflow_bigquery_write_with_schema]
20+
import com.google.api.services.bigquery.model.TableFieldSchema;
21+
import com.google.api.services.bigquery.model.TableRow;
22+
import com.google.api.services.bigquery.model.TableSchema;
23+
import java.util.Arrays;
24+
import java.util.List;
25+
import org.apache.beam.sdk.Pipeline;
26+
import org.apache.beam.sdk.coders.AvroCoder;
27+
import org.apache.beam.sdk.coders.DefaultCoder;
28+
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
29+
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
30+
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
31+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
32+
import org.apache.beam.sdk.transforms.Create;
33+
34+
public class BigQueryWriteWithSchema {
35+
// A custom datatype for the source data.
36+
@DefaultCoder(AvroCoder.class)
37+
public static class MyData {
38+
public String name;
39+
public Long age;
40+
41+
public MyData() {}
42+
43+
public MyData(String name, Long age) {
44+
this.name = name;
45+
this.age = age;
46+
}
47+
}
48+
49+
public static void main(String[] args) {
50+
// Example source data.
51+
final List<MyData> data = Arrays.asList(
52+
new MyData("Alice", 40L),
53+
new MyData("Bob", 30L),
54+
new MyData("Charlie", 20L)
55+
);
56+
57+
// Define a table schema. A schema is required for write disposition CREATE_IF_NEEDED.
58+
TableSchema schema = new TableSchema()
59+
.setFields(
60+
Arrays.asList(
61+
new TableFieldSchema()
62+
.setName("user_name")
63+
.setType("STRING")
64+
.setMode("REQUIRED"),
65+
new TableFieldSchema()
66+
.setName("age")
67+
.setType("INT64") // Defaults to NULLABLE
68+
)
69+
);
70+
71+
// Parse the pipeline options passed into the application. Example:
72+
// --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
73+
// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
74+
PipelineOptionsFactory.register(ExamplePipelineOptions.class);
75+
ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
76+
.withValidation()
77+
.as(ExamplePipelineOptions.class);
78+
79+
// Create a pipeline and apply transforms.
80+
Pipeline pipeline = Pipeline.create(options);
81+
pipeline
82+
// Create an in-memory PCollection of MyData objects.
83+
.apply(Create.of(data))
84+
// Write the data to a new or existing BigQuery table.
85+
.apply(BigQueryIO.<MyData>write()
86+
.to(String.format("%s:%s.%s",
87+
options.getProjectId(),
88+
options.getDatasetName(),
89+
options.getTableName()))
90+
.withFormatFunction(
91+
(MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
92+
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
93+
.withSchema(schema)
94+
.withMethod(Write.Method.STORAGE_WRITE_API)
95+
);
96+
pipeline.run().waitUntilFinish();
97+
}
98+
}
99+
// [END dataflow_bigquery_write_with_schema]

dataflow/snippets/src/main/java/com/example/dataflow/BiqQueryReadTableRows.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ public static void main(String[] args) {
3030
// Parse the pipeline options passed into the application. Example:
3131
// --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
3232
// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
33-
PipelineOptionsFactory.register(BigQueryReadOptions.class);
34-
BigQueryReadOptions options = PipelineOptionsFactory.fromArgs(args)
33+
PipelineOptionsFactory.register(ExamplePipelineOptions.class);
34+
ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
3535
.withValidation()
36-
.as(BigQueryReadOptions.class);
36+
.as(ExamplePipelineOptions.class);
3737

3838
// Create a pipeline and apply transforms.
3939
Pipeline pipeline = Pipeline.create(options);

dataflow/snippets/src/main/java/com/example/dataflow/BigQueryReadOptions.java renamed to dataflow/snippets/src/main/java/com/example/dataflow/ExamplePipelineOptions.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,24 @@
1717
package com.example.dataflow;
1818

1919
import org.apache.beam.sdk.options.Description;
20-
import org.apache.beam.sdk.options.PipelineOptions;
20+
import org.apache.beam.sdk.options.StreamingOptions;
2121

2222
/**
2323
* Extends PipelineOptions and adds custom pipeline options for this sample.
2424
*/
25-
public interface BigQueryReadOptions extends PipelineOptions {
25+
public interface ExamplePipelineOptions extends StreamingOptions {
2626
@Description("Project ID for the BigQuery table")
2727
String getProjectId();
2828

29-
void setProjectId(String input);
29+
void setProjectId(String value);
3030

3131
@Description("Dataset for the BigQuery table")
3232
String getDatasetName();
3333

34-
void setDatasetName(String output);
34+
void setDatasetName(String value);
3535

3636
@Description("BigQuery table name")
3737
String getTableName();
3838

39-
void setTableName(String output);
39+
void setTableName(String value);
4040
}

0 commit comments

Comments
 (0)