-
Notifications
You must be signed in to change notification settings - Fork 2.8k
/
AmazonKinesisRecordProducerSample.java
175 lines (154 loc) · 8.11 KB
/
AmazonKinesisRecordProducerSample.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
/*
* Copyright 2012-2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.CreateStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ListStreamsRequest;
import com.amazonaws.services.kinesis.model.ListStreamsResult;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.PutRecordResult;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.StreamDescription;
public class AmazonKinesisRecordProducerSample {
/*
* Before running the code:
* Fill in your AWS access credentials in the provided credentials
* file template, and be sure to move the file to the default location
* (~/.aws/credentials) where the sample code will load the
* credentials from.
* https://console.aws.amazon.com/iam/home?#security_credential
*
* WARNING:
* To avoid accidental leakage of your credentials, DO NOT keep
* the credentials file in your source directory.
*/
private static AmazonKinesis kinesis;
private static void init() throws Exception {
/*
* The ProfileCredentialsProvider will return your [default]
* credential profile by reading from the credentials file located at
* (~/.aws/credentials).
*/
ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider();
try {
credentialsProvider.getCredentials();
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load the credentials from the credential profiles file. " +
"Please make sure that your credentials file is at the correct " +
"location (~/.aws/credentials), and is in valid format.",
e);
}
kinesis = AmazonKinesisClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion("us-west-2")
.build();
}
public static void main(String[] args) throws Exception {
init();
final String myStreamName = AmazonKinesisApplicationSample.SAMPLE_APPLICATION_STREAM_NAME;
final Integer myStreamSize = 1;
// Describe the stream and check if it exists.
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest().withStreamName(myStreamName);
try {
StreamDescription streamDescription = kinesis.describeStream(describeStreamRequest).getStreamDescription();
System.out.printf("Stream %s has a status of %s.\n", myStreamName, streamDescription.getStreamStatus());
if ("DELETING".equals(streamDescription.getStreamStatus())) {
System.out.println("Stream is being deleted. This sample will now exit.");
System.exit(0);
}
// Wait for the stream to become active if it is not yet ACTIVE.
if (!"ACTIVE".equals(streamDescription.getStreamStatus())) {
waitForStreamToBecomeAvailable(myStreamName);
}
} catch (ResourceNotFoundException ex) {
System.out.printf("Stream %s does not exist. Creating it now.\n", myStreamName);
// Create a stream. The number of shards determines the provisioned throughput.
CreateStreamRequest createStreamRequest = new CreateStreamRequest();
createStreamRequest.setStreamName(myStreamName);
createStreamRequest.setShardCount(myStreamSize);
kinesis.createStream(createStreamRequest);
// The stream is now being created. Wait for it to become active.
waitForStreamToBecomeAvailable(myStreamName);
}
// List all of my streams.
ListStreamsRequest listStreamsRequest = new ListStreamsRequest();
listStreamsRequest.setLimit(10);
ListStreamsResult listStreamsResult = kinesis.listStreams(listStreamsRequest);
List<String> streamNames = listStreamsResult.getStreamNames();
while (listStreamsResult.isHasMoreStreams()) {
if (streamNames.size() > 0) {
listStreamsRequest.setExclusiveStartStreamName(streamNames.get(streamNames.size() - 1));
}
listStreamsResult = kinesis.listStreams(listStreamsRequest);
streamNames.addAll(listStreamsResult.getStreamNames());
}
// Print all of my streams.
System.out.println("List of my streams: ");
for (int i = 0; i < streamNames.size(); i++) {
System.out.println("\t- " + streamNames.get(i));
}
System.out.printf("Putting records in stream : %s until this application is stopped...\n", myStreamName);
System.out.println("Press CTRL-C to stop.");
// Write records to the stream until this program is aborted.
while (true) {
long createTime = System.currentTimeMillis();
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(myStreamName);
putRecordRequest.setData(ByteBuffer.wrap(String.format("testData-%d", createTime).getBytes()));
putRecordRequest.setPartitionKey(String.format("partitionKey-%d", createTime));
PutRecordResult putRecordResult = kinesis.putRecord(putRecordRequest);
System.out.printf("Successfully put record, partition key : %s, ShardID : %s, SequenceNumber : %s.\n",
putRecordRequest.getPartitionKey(),
putRecordResult.getShardId(),
putRecordResult.getSequenceNumber());
}
}
private static void waitForStreamToBecomeAvailable(String myStreamName) throws InterruptedException {
System.out.printf("Waiting for %s to become ACTIVE...\n", myStreamName);
long startTime = System.currentTimeMillis();
long endTime = startTime + TimeUnit.MINUTES.toMillis(10);
while (System.currentTimeMillis() < endTime) {
Thread.sleep(TimeUnit.SECONDS.toMillis(20));
try {
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName(myStreamName);
// ask for no more than 10 shards at a time -- this is an optional parameter
describeStreamRequest.setLimit(10);
DescribeStreamResult describeStreamResponse = kinesis.describeStream(describeStreamRequest);
String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus();
System.out.printf("\t- current state: %s\n", streamStatus);
if ("ACTIVE".equals(streamStatus)) {
return;
}
} catch (ResourceNotFoundException ex) {
// ResourceNotFound means the stream doesn't exist yet,
// so ignore this error and just keep polling.
} catch (AmazonServiceException ase) {
throw ase;
}
}
throw new RuntimeException(String.format("Stream %s never became active", myStreamName));
}
}