/
CreateTopicExample.java
138 lines (126 loc) · 5.35 KB
/
CreateTopicExample.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package pubsublite;
// [START pubsublite_create_topic]
import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.ReservationName;
import com.google.cloud.pubsublite.ReservationPath;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.Topic;
import com.google.cloud.pubsublite.proto.Topic.PartitionConfig;
import com.google.cloud.pubsublite.proto.Topic.PartitionConfig.Capacity;
import com.google.cloud.pubsublite.proto.Topic.ReservationConfig;
import com.google.cloud.pubsublite.proto.Topic.RetentionConfig;
import com.google.protobuf.util.Durations;
import java.util.concurrent.ExecutionException;
public class CreateTopicExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String cloudRegion = "your-cloud-region";
char zoneId = 'a';
String topicId = "your-topic-id";
String reservationId = "your-reservation-id";
long projectNumber = Long.parseLong("123456789");
int partitions = 1;
// True if using a regional location. False if using a zonal location.
// https://cloud.google.com/pubsub/lite/docs/topics
boolean regional = false;
createTopicExample(
cloudRegion, zoneId, projectNumber, topicId, reservationId, partitions, regional);
}
public static void createTopicExample(
String cloudRegion,
char zoneId,
long projectNumber,
String topicId,
String reservationId,
int partitions,
boolean regional)
throws Exception {
ReservationPath reservationPath =
ReservationPath.newBuilder()
.setProject(ProjectNumber.of(projectNumber))
.setLocation(CloudRegion.of(cloudRegion))
.setName(ReservationName.of(reservationId))
.build();
CloudRegionOrZone location;
if (regional) {
location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
} else {
location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
}
TopicPath topicPath =
TopicPath.newBuilder()
.setProject(ProjectNumber.of(projectNumber))
.setLocation(location)
.setName(TopicName.of(topicId))
.build();
Topic topic =
Topic.newBuilder()
.setPartitionConfig(
PartitionConfig.newBuilder()
// Set throughput capacity per partition in MiB/s.
.setCapacity(
Capacity.newBuilder()
// Must be 4-16 MiB/s.
.setPublishMibPerSec(4)
// Must be 4-32 MiB/s.
.setSubscribeMibPerSec(8)
.build())
.setCount(partitions))
.setRetentionConfig(
RetentionConfig.newBuilder()
// How long messages are retained.
.setPeriod(Durations.fromDays(1))
// Set storage per partition to 30 GiB. This must be 30 GiB-10 TiB.
// If the number of bytes stored in any of the topic's partitions grows
// beyond this value, older messages will be dropped to make room for
// newer ones, regardless of the value of `period`.
.setPerPartitionBytes(30 * 1024 * 1024 * 1024L))
.setReservationConfig(
ReservationConfig.newBuilder()
.setThroughputReservation(reservationPath.toString())
.build())
.setName(topicPath.toString())
.build();
AdminClientSettings adminClientSettings =
AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();
try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
Topic response = adminClient.createTopic(topic).get();
if (regional) {
System.out.println(response.getAllFields() + " (regional topic) created successfully.");
} else {
System.out.println(response.getAllFields() + " (zonal topic) created successfully.");
}
} catch (ExecutionException e) {
try {
throw e.getCause();
} catch (AlreadyExistsException alreadyExists) {
System.out.println("This topic already exists.");
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}
}
}
// [END pubsublite_create_topic]