Skip to content
This repository was archived by the owner on Sep 27, 2023. It is now read-only.

Commit 2670cb1

Browse files
author
Praful Makani
authored
docs(samples): add run notification (#328)
* docs(samples): add run notification * docs(samples): add region tag * docs(samples): lint
1 parent 16ef1ad commit 2670cb1

File tree

5 files changed

+255
-0
lines changed

5 files changed

+255
-0
lines changed

samples/install-without-bom/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@
5151
<version>1.117.1</version>
5252
<scope>test</scope>
5353
</dependency>
54+
<dependency>
55+
<groupId>com.google.cloud</groupId>
56+
<artifactId>google-cloud-pubsub</artifactId>
57+
<version>1.108.1</version>
58+
<scope>test</scope>
59+
</dependency>
5460
</dependencies>
5561

5662
<!-- compile and run all snippet tests -->

samples/snapshot/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@
5050
<version>1.117.1</version>
5151
<scope>test</scope>
5252
</dependency>
53+
<dependency>
54+
<groupId>com.google.cloud</groupId>
55+
<artifactId>google-cloud-pubsub</artifactId>
56+
<version>1.108.1</version>
57+
<scope>test</scope>
58+
</dependency>
5359
</dependencies>
5460

5561
<!-- compile and run all snippet tests -->

samples/snippets/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,5 +63,11 @@
6363
<version>1.117.1</version>
6464
<scope>test</scope>
6565
</dependency>
66+
<dependency>
67+
<groupId>com.google.cloud</groupId>
68+
<artifactId>google-cloud-pubsub</artifactId>
69+
<version>1.108.1</version>
70+
<scope>test</scope>
71+
</dependency>
6672
</dependencies>
6773
</project>
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.bigquerydatatransfer;
18+
19+
// [START bigquerydatatransfer_run_notification]
20+
import com.google.api.gax.rpc.ApiException;
21+
import com.google.cloud.bigquery.datatransfer.v1.CreateTransferConfigRequest;
22+
import com.google.cloud.bigquery.datatransfer.v1.DataTransferServiceClient;
23+
import com.google.cloud.bigquery.datatransfer.v1.ProjectName;
24+
import com.google.cloud.bigquery.datatransfer.v1.TransferConfig;
25+
import com.google.protobuf.Struct;
26+
import com.google.protobuf.Value;
27+
import java.io.IOException;
28+
import java.util.HashMap;
29+
import java.util.Map;
30+
31+
// Sample to get run notification
32+
public class RunNotification {
33+
34+
public static void main(String[] args) throws IOException {
35+
// TODO(developer): Replace these variables before running the sample.
36+
final String projectId = "MY_PROJECT_ID";
37+
final String datasetId = "MY_DATASET_ID";
38+
final String pubsubTopicName = "MY_TOPIC_NAME";
39+
final String query =
40+
"SELECT CURRENT_TIMESTAMP() as current_time, @run_time as intended_run_time, "
41+
+ "@run_date as intended_run_date, 17 as some_integer";
42+
Map<String, Value> params = new HashMap<>();
43+
params.put("query", Value.newBuilder().setStringValue(query).build());
44+
params.put(
45+
"destination_table_name_template",
46+
Value.newBuilder().setStringValue("my_destination_table_{run_date}").build());
47+
params.put("write_disposition", Value.newBuilder().setStringValue("WRITE_TRUNCATE").build());
48+
params.put("partitioning_field", Value.newBuilder().build());
49+
TransferConfig transferConfig =
50+
TransferConfig.newBuilder()
51+
.setDestinationDatasetId(datasetId)
52+
.setDisplayName("Your Scheduled Query Name")
53+
.setDataSourceId("scheduled_query")
54+
.setParams(Struct.newBuilder().putAllFields(params).build())
55+
.setSchedule("every 24 hours")
56+
.setNotificationPubsubTopic(pubsubTopicName)
57+
.build();
58+
runNotification(projectId, transferConfig);
59+
}
60+
61+
public static void runNotification(String projectId, TransferConfig transferConfig)
62+
throws IOException {
63+
try (DataTransferServiceClient dataTransferServiceClient = DataTransferServiceClient.create()) {
64+
ProjectName parent = ProjectName.of(projectId);
65+
CreateTransferConfigRequest request =
66+
CreateTransferConfigRequest.newBuilder()
67+
.setParent(parent.toString())
68+
.setTransferConfig(transferConfig)
69+
.build();
70+
TransferConfig config = dataTransferServiceClient.createTransferConfig(request);
71+
System.out.println(
72+
"\nScheduled query with run notification created successfully :" + config.getName());
73+
} catch (ApiException ex) {
74+
System.out.print("\nScheduled query with run notification was not created." + ex.toString());
75+
}
76+
}
77+
}
78+
// [END bigquerydatatransfer_run_notification]
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.bigquerydatatransfer;
18+
19+
import static com.google.common.truth.Truth.assertThat;
20+
import static junit.framework.TestCase.assertNotNull;
21+
22+
import com.google.cloud.bigquery.BigQuery;
23+
import com.google.cloud.bigquery.BigQueryOptions;
24+
import com.google.cloud.bigquery.DatasetInfo;
25+
import com.google.cloud.bigquery.datatransfer.v1.TransferConfig;
26+
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
27+
import com.google.cloud.pubsub.v1.TopicAdminClient;
28+
import com.google.protobuf.Struct;
29+
import com.google.protobuf.Value;
30+
import com.google.pubsub.v1.ProjectSubscriptionName;
31+
import com.google.pubsub.v1.ProjectTopicName;
32+
import com.google.pubsub.v1.Subscription;
33+
import java.io.ByteArrayOutputStream;
34+
import java.io.IOException;
35+
import java.io.PrintStream;
36+
import java.util.HashMap;
37+
import java.util.Map;
38+
import java.util.UUID;
39+
import java.util.logging.Level;
40+
import java.util.logging.Logger;
41+
import org.junit.After;
42+
import org.junit.Before;
43+
import org.junit.BeforeClass;
44+
import org.junit.Test;
45+
46+
public class RunNotificationIT {
47+
48+
private static final Logger LOG = Logger.getLogger(RunNotificationIT.class.getName());
49+
private BigQuery bigquery;
50+
private ByteArrayOutputStream bout;
51+
private String name;
52+
private String displayName;
53+
private String datasetName;
54+
private String topicName;
55+
private String formattedTopicName;
56+
private String subscriberName;
57+
private PrintStream out;
58+
private PrintStream originalPrintStream;
59+
60+
private static final String PROJECT_ID = requireEnvVar("GOOGLE_CLOUD_PROJECT");
61+
62+
private static String requireEnvVar(String varName) {
63+
String value = System.getenv(varName);
64+
assertNotNull(
65+
"Environment variable " + varName + " is required to perform these tests.",
66+
System.getenv(varName));
67+
return value;
68+
}
69+
70+
@BeforeClass
71+
public static void checkRequirements() {
72+
requireEnvVar("GOOGLE_CLOUD_PROJECT");
73+
}
74+
75+
@Before
76+
public void setUp() throws IOException {
77+
String id = UUID.randomUUID().toString().substring(0, 8);
78+
displayName = "MY_SCHEDULE_NAME_TEST_" + id;
79+
datasetName = "MY_DATASET_NAME_TEST_" + id;
80+
topicName = "MY_TOPIC_TEST_" + id;
81+
formattedTopicName = String.format("projects/%s/topics/%s", PROJECT_ID, topicName);
82+
subscriberName = "MY_SUBSCRIBER_TEST_" + id;
83+
// create a temporary dataset
84+
bigquery = BigQueryOptions.getDefaultInstance().getService();
85+
bigquery.create(DatasetInfo.of(datasetName));
86+
// create a temporary pubsub topic
87+
try (TopicAdminClient client = TopicAdminClient.create()) {
88+
client.createTopic(formattedTopicName);
89+
}
90+
// create a temporary subscriber
91+
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
92+
ProjectTopicName projectTopicName = ProjectTopicName.of(PROJECT_ID, topicName);
93+
ProjectSubscriptionName subscriptionName =
94+
ProjectSubscriptionName.of(PROJECT_ID, subscriberName);
95+
subscriptionAdminClient.createSubscription(
96+
Subscription.newBuilder()
97+
.setName(subscriptionName.toString())
98+
.setTopic(projectTopicName.toString())
99+
.setEnableMessageOrdering(true)
100+
.build());
101+
}
102+
bout = new ByteArrayOutputStream();
103+
out = new PrintStream(bout);
104+
originalPrintStream = System.out;
105+
System.setOut(out);
106+
}
107+
108+
@After
109+
public void tearDown() throws IOException {
110+
// Clean up
111+
// delete a temporary subscriber
112+
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
113+
String formatSubscriberName =
114+
String.format("projects/%s/subscriptions/%s", PROJECT_ID, subscriberName);
115+
subscriptionAdminClient.deleteSubscription(formatSubscriberName);
116+
}
117+
// delete a temporary pubsub topic
118+
try (TopicAdminClient client = TopicAdminClient.create()) {
119+
client.deleteTopic(formattedTopicName);
120+
}
121+
DeleteScheduledQuery.deleteScheduledQuery(name);
122+
// delete a temporary dataset
123+
bigquery.delete(datasetName, BigQuery.DatasetDeleteOption.deleteContents());
124+
// restores print statements in the original method
125+
System.out.flush();
126+
System.setOut(originalPrintStream);
127+
LOG.log(Level.INFO, bout.toString());
128+
}
129+
130+
@Test
131+
public void testRunNotification() throws IOException {
132+
String query =
133+
"SELECT CURRENT_TIMESTAMP() as current_time, @run_time as intended_run_time, "
134+
+ "@run_date as intended_run_date, 17 as some_integer";
135+
String destinationTableName =
136+
"MY_DESTINATION_TABLE_" + UUID.randomUUID().toString().substring(0, 8) + "_{run_date}";
137+
Map<String, Value> params = new HashMap<>();
138+
params.put("query", Value.newBuilder().setStringValue(query).build());
139+
params.put(
140+
"destination_table_name_template",
141+
Value.newBuilder().setStringValue(destinationTableName).build());
142+
params.put("write_disposition", Value.newBuilder().setStringValue("WRITE_TRUNCATE").build());
143+
params.put("partitioning_field", Value.newBuilder().setStringValue("").build());
144+
TransferConfig transferConfig =
145+
TransferConfig.newBuilder()
146+
.setDestinationDatasetId(datasetName)
147+
.setDisplayName(displayName)
148+
.setDataSourceId("scheduled_query")
149+
.setParams(Struct.newBuilder().putAllFields(params).build())
150+
.setSchedule("every 24 hours")
151+
.setNotificationPubsubTopic(formattedTopicName)
152+
.build();
153+
RunNotification.runNotification(PROJECT_ID, transferConfig);
154+
String result = bout.toString();
155+
name = result.substring(result.indexOf(":") + 1, result.length() - 1);
156+
assertThat(result).contains("Scheduled query with run notification created successfully");
157+
assertThat(bout.toString()).contains(name);
158+
}
159+
}

0 commit comments

Comments
 (0)