Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

samples: create BigQuery subscription #1168

Merged
merged 29 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2b50adb
feat: Add flow control support to publisher
kamalaboulhosn Mar 24, 2020
6bb47bc
make suggested fixes
kamalaboulhosn Mar 25, 2020
0ffda7c
Merge remote-tracking branch 'upstream/master'
kamalaboulhosn May 2, 2020
9c113c3
chore: Remove note that ordering keys requires enablements.
kamalaboulhosn May 2, 2020
2a295c1
Merge remote-tracking branch 'upstream/master'
kamalaboulhosn Jun 21, 2020
6947740
feat: Add support for server-side flow control
kamalaboulhosn Jun 21, 2020
fb08aa3
Revert "chore: Remove note that ordering keys requires enablements."
kamalaboulhosn Jun 21, 2020
304059f
Merge remote-tracking branch 'upstream/master'
kamalaboulhosn Jun 24, 2020
c9c2028
fix: Fix import order
kamalaboulhosn Jun 24, 2020
7b50aa1
Merge remote-tracking branch 'upstream/master' into master
kamalaboulhosn Jul 23, 2020
78ab82d
fix: Make error message more clear about where ordering must be enabl…
kamalaboulhosn Jul 23, 2020
dc62fb7
Merge remote-tracking branch 'upstream/master'
kamalaboulhosn Sep 27, 2020
8b73467
fix: Ensure that messages that are in pending batches for an ordering…
kamalaboulhosn Sep 27, 2020
75dc73e
Merge remote-tracking branch 'upstream/master'
kamalaboulhosn Sep 29, 2020
8be37d7
fix: Only check keyHasError if ordering keys is non-empty
kamalaboulhosn Sep 29, 2020
f2cded5
Merge remote-tracking branch 'upstream/master'
kamalaboulhosn Jul 19, 2021
e1f347b
Merge remote-tracking branch 'upstream/master'
kamalaboulhosn Aug 31, 2021
83342f5
fix: Set publish timeouts to be consistent with desired values
kamalaboulhosn Aug 31, 2021
27d4e75
Merge remote-tracking branch 'upstream/main'
kamalaboulhosn Apr 19, 2022
de32d95
Merge remote-tracking branch 'upstream/main'
kamalaboulhosn Jun 17, 2022
b40094d
samples: create BigQuery subscription
kamalaboulhosn Jun 17, 2022
3ee105d
samples: create BigQuery subscription
kamalaboulhosn Jun 17, 2022
37e842f
samples: create BigQuery subscription
kamalaboulhosn Jun 17, 2022
0d513a1
Make table ID more clear
kamalaboulhosn Jun 17, 2022
df8a01d
Fix string
kamalaboulhosn Jun 17, 2022
5b72e58
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jun 21, 2022
ff338ff
Fix pom and test creation/teardown
kamalaboulhosn Jun 21, 2022
f1aeb5a
Merge branch 'master' of https://github.com/kamalaboulhosn/java-pubsub
kamalaboulhosn Jun 21, 2022
0e509ab
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jun 22, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ If you are using Maven with [BOM][libraries-bom], add this to your pom.xml file
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>25.3.0</version>
<version>25.4.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down Expand Up @@ -243,6 +243,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-pubsub/tree/m
| Native Image Pub Sub Sample | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/native-image-sample/src/main/java/pubsub/NativeImagePubSubSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/native-image-sample/src/main/java/pubsub/NativeImagePubSubSample.java) |
| Publish Operations | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/native-image-sample/src/main/java/utilities/PublishOperations.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/native-image-sample/src/main/java/utilities/PublishOperations.java) |
| Create Avro Schema Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreateAvroSchemaExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreateAvroSchemaExample.java) |
| Create Big Query Subscription Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreateBigQuerySubscriptionExample.java) |
| Create Proto Schema Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreateProtoSchemaExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreateProtoSchemaExample.java) |
| Create Pull Subscription Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreatePullSubscriptionExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreatePullSubscriptionExample.java) |
| Create Push Subscription Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreatePushSubscriptionExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreatePushSubscriptionExample.java) |
Expand Down
5 changes: 5 additions & 0 deletions samples/install-without-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@
<version>2.7.1</version>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
<version>2.13.3</version>
</dependency>
</dependencies>

<!-- compile and run all snippet tests -->
Expand Down
5 changes: 5 additions & 0 deletions samples/snapshot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@
<version>2.7.1</version>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
<version>2.13.3</version>
</dependency>
</dependencies>

<!-- compile and run all snippet tests -->
Expand Down
8 changes: 6 additions & 2 deletions samples/snippets/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>25.3.0</version>
<version>25.4.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand All @@ -57,8 +57,12 @@
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
</dependency>

<!-- [START_EXCLUDE] -->
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package pubsub;

// [START pubsub_create_bigquery_subscription]
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.BigQueryConfig;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateBigQuerySubscriptionExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
String subscriptionId = "your-subscription-id";
String bigqueryTableId = "your-project.your-dataset.your-table";

createBigQuerySubscription(projectId, topicId, subscriptionId, bigqueryTableId);
}

public static void createBigQuerySubscription(
String projectId, String topicId, String subscriptionId, String bigqueryTableId)
throws IOException {
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);

BigQueryConfig bigqueryConfig =
BigQueryConfig.newBuilder().setTable(bigqueryTableId).setWriteMetadata(true).build();

Subscription subscription =
subscriptionAdminClient.createSubscription(
Subscription.newBuilder()
.setName(subscriptionName.toString())
.setTopic(topicName.toString())
.setBigqueryConfig(bigqueryConfig)
.build());

System.out.println("Created a BigQuery subscription: " + subscription.getAllFields());
}
}
}
// [END pubsub_create_bigquery_subscription]
57 changes: 57 additions & 0 deletions samples/snippets/src/test/java/pubsub/AdminIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@
import static junit.framework.TestCase.assertNotNull;

import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.SubscriptionName;
Expand Down Expand Up @@ -48,6 +59,10 @@ public class AdminIT {
private static final String exactlyOnceSubscriptionId =
"iam-exactly-once-subscription-" + _suffix;
private static final String pushEndpoint = "https://my-test-project.appspot.com/push";
private static final String bigqueryDatasetId =
"java_samples_data_set" + _suffix.replace("-", "_");
private static final String bigquerySubscriptionId = "iam-bigquery-subscription-" + _suffix;
private static final String bigqueryTableId = "java_samples_table_" + _suffix;

private static final TopicName topicName = TopicName.of(projectId, topicId);
private static final SubscriptionName pullSubscriptionName =
Expand Down Expand Up @@ -79,6 +94,9 @@ public void setUp() throws Exception {
bout = new ByteArrayOutputStream();
out = new PrintStream(bout);
System.setOut(out);

// Create table for BigQuery subscription.
createBigQueryTable();
}

@After
Expand All @@ -102,9 +120,39 @@ public void tearDown() throws Exception {
} catch (NotFoundException ignored) {
// ignore this as resources may not have been created
}

// Delete BigQuery table.
deleteBigQueryTable();

System.setOut(null);
}

private void createBigQueryTable() throws Exception {
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
DatasetInfo datasetInfo = DatasetInfo.newBuilder(projectId, bigqueryDatasetId).build();
bigquery.create(datasetInfo);

Schema schema =
Schema.of(
Field.of("data", StandardSQLTypeName.STRING),
Field.of("message_id", StandardSQLTypeName.STRING),
Field.of("attributes", StandardSQLTypeName.STRING),
Field.of("subscription_name", StandardSQLTypeName.STRING),
Field.of("publish_time", StandardSQLTypeName.TIMESTAMP));

TableId tableId = TableId.of(projectId, bigqueryDatasetId, bigqueryTableId);
TableDefinition tableDefinition = StandardTableDefinition.of(schema);
TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();

bigquery.create(tableInfo);
}

private void deleteBigQueryTable() throws Exception {
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
DatasetId datasetId = DatasetId.of(projectId, bigqueryDatasetId);
bigquery.delete(datasetId, BigQuery.DatasetDeleteOption.deleteContents());
}

@Test
public void testAdmin() throws Exception {
// Test create topic.
Expand Down Expand Up @@ -208,12 +256,21 @@ public void testAdmin() throws Exception {
.contains("Created a subscription with exactly once delivery enabled:");
assertThat(bout.toString()).contains("enable_exactly_once_delivery=true");

bout.reset();
// Test create a BigQuery subscription
String bigqueryTablePath = String.join(".", projectId, bigqueryDatasetId, bigqueryTableId);
CreateBigQuerySubscriptionExample.createBigQuerySubscription(
projectId, topicId, bigquerySubscriptionId, bigqueryTablePath);
assertThat(bout.toString()).contains("Created a BigQuery subscription:");
assertThat(bout.toString()).contains(bigqueryTablePath);

bout.reset();
// Test delete subscription.
DeleteSubscriptionExample.deleteSubscriptionExample(projectId, pullSubscriptionId);
DeleteSubscriptionExample.deleteSubscriptionExample(projectId, pushSubscriptionId);
DeleteSubscriptionExample.deleteSubscriptionExample(projectId, orderedSubscriptionId);
DeleteSubscriptionExample.deleteSubscriptionExample(projectId, exactlyOnceSubscriptionId);
DeleteSubscriptionExample.deleteSubscriptionExample(projectId, bigquerySubscriptionId);
assertThat(bout.toString()).contains("Deleted subscription.");

bout.reset();
Expand Down