/
StockTradeRecordProcessor.java
127 lines (108 loc) · 4.76 KB
/
StockTradeRecordProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/*
* Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.samples.stocktrades.processor;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.samples.stocktrades.model.StockTrade;
/**
* Processes records retrieved from stock trades stream.
*
*/
public class StockTradeRecordProcessor implements IRecordProcessor {
private static final Log LOG = LogFactory.getLog(StockTradeRecordProcessor.class);
private String kinesisShardId;
// Reporting interval
private static final long REPORTING_INTERVAL_MILLIS = 60000L; // 1 minute
private long nextReportingTimeInMillis;
// Checkpointing interval
private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L; // 1 minute
private long nextCheckpointTimeInMillis;
// Aggregates stats for stock trades
private StockStats stockStats = new StockStats();
/**
* {@inheritDoc}
*/
@Override
public void initialize(String shardId) {
LOG.info("Initializing record processor for shard: " + shardId);
this.kinesisShardId = shardId;
nextReportingTimeInMillis = System.currentTimeMillis() + REPORTING_INTERVAL_MILLIS;
nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
}
/**
* {@inheritDoc}
*/
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
for (Record record : records) {
// process record
processRecord(record);
}
// If it is time to report stats as per the reporting interval, report stats
if (System.currentTimeMillis() > nextReportingTimeInMillis) {
reportStats();
resetStats();
nextReportingTimeInMillis = System.currentTimeMillis() + REPORTING_INTERVAL_MILLIS;
}
// Checkpoint once every checkpoint interval
if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
checkpoint(checkpointer);
nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
}
}
private void reportStats() {
// TODO: Implement method
}
private void resetStats() {
// TODO: Implement method
}
private void processRecord(Record record) {
// TODO: Implement method
}
/**
* {@inheritDoc}
*/
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
LOG.info("Shutting down record processor for shard: " + kinesisShardId);
// Important to checkpoint after reaching end of shard, so we can start processing data from child shards.
if (reason == ShutdownReason.TERMINATE) {
checkpoint(checkpointer);
}
}
private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
LOG.info("Checkpointing shard " + kinesisShardId);
try {
checkpointer.checkpoint();
} catch (ShutdownException se) {
// Ignore checkpoint if the processor instance has been shutdown (fail over).
LOG.info("Caught shutdown exception, skipping checkpoint.", se);
} catch (ThrottlingException e) {
// Skip checkpoint when throttled. In practice, consider a backoff and retry policy.
LOG.error("Caught throttling exception, skipping checkpoint.", e);
} catch (InvalidStateException e) {
// This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
LOG.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
}
}
}