/
EncryptedProducerWithStreams.java
120 lines (99 loc) · 5.25 KB
/
EncryptedProducerWithStreams.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package kinesisencryption.streams;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.*;
import com.amazonaws.encryptionsdk.AwsCrypto;
import com.amazonaws.encryptionsdk.kms.KmsMasterKeyProvider;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kms.AWSKMSClient;
import kinesisencryption.dao.TickerSalesObject;
import kinesisencryption.utils.KinesisEncryptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
public class EncryptedProducerWithStreams
{
private static final Logger log = LoggerFactory.getLogger(EncryptedProducerWithStreams.class);
private List<TickerSalesObject> tickerSymbolList;
public List<TickerSalesObject> getTickerSymbolList()
{
return tickerSymbolList;
}
public void setTickerSymbolList(List<TickerSalesObject> tickerSymbolList)
{
this.tickerSymbolList = tickerSymbolList;
}
public static void main(String[] args)
{
AmazonKinesisClient kinesis = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()
.getCredentials());
AWSKMSClient kms = new AWSKMSClient(new DefaultAWSCredentialsProviderChain()
.getCredentials());
String keyArn = null;
String encryptionContext = null;
final AwsCrypto crypto = new AwsCrypto();
/*
* Simulating the appearance of a steady flow of data by continuously loading data from file*/
try
{
keyArn = KinesisEncryptionUtils.getProperties().getProperty("key_arn");
log.info("Successfully retrieved keyarn property " + keyArn);
encryptionContext = KinesisEncryptionUtils.getProperties().getProperty("encryption_context");
log.info("Successfully retrieved encryption context property " + encryptionContext);
String streamName = KinesisEncryptionUtils.getProperties().getProperty("stream_name");
log.info("Successfully retrieved stream name property " + streamName);
String keyId = KinesisEncryptionUtils.getProperties().getProperty("key_id");
log.info("Successfully retrieved key id property " + keyId);
String kinesisEndpoint = KinesisEncryptionUtils.getProperties().getProperty("kinesis_endpoint");
log.info("Successfully retrieved kinesis endpoint property " + kinesisEndpoint);
kinesis.setEndpoint(kinesisEndpoint);
String kmsEndpoint = KinesisEncryptionUtils.getProperties().getProperty("kms_endpoint");
log.info("Successfully retrieved kms endpoint property " + kmsEndpoint);
kms.setEndpoint(kmsEndpoint);
String fileLocation = KinesisEncryptionUtils.getProperties().getProperty("file_path");
log.info("Successfully retrieved file location property " + fileLocation);
List<TickerSalesObject> tickerSymbolsList = KinesisEncryptionUtils.getDataObjects(fileLocation);
EncryptedProducerWithStreams producer = new EncryptedProducerWithStreams();
producer.setTickerSymbolList(tickerSymbolsList);
final Map<String, String> context = Collections.singletonMap("Kinesis", encryptionContext);
final KmsMasterKeyProvider prov = new KmsMasterKeyProvider(keyArn);
while (true)
{
for (TickerSalesObject ticker : tickerSymbolsList)
{
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(streamName);
log.info("Before encryption size of String Object is "
+ KinesisEncryptionUtils.calculateSizeOfObject(ticker.toString()));
String encryptedString = KinesisEncryptionUtils.toEncryptedString(crypto, ticker, prov, context);
log.info("Size of encrypted object is : " + KinesisEncryptionUtils.calculateSizeOfObject(encryptedString));
if (KinesisEncryptionUtils.calculateSizeOfObject(encryptedString) > 1024000)
log.warn("Record added is greater than 1MB and may be throttled");
ByteBuffer data = KinesisEncryptionUtils.toEncryptedByteStream(encryptedString);
putRecordRequest.setData(data);
putRecordRequest.setPartitionKey(randomPartitionKey());
kinesis.putRecord(putRecordRequest);
log.info("Ticker added :" + ticker.toString() + "Ticker Cipher :" + data.toString() + "and size : "
+ KinesisEncryptionUtils.calculateSizeOfObject(data.toString()));
}
tickerSymbolsList = producer.getTickerSymbolList();
Thread.sleep(100);
}
} catch (IOException ioe)
{
log.error(ioe.toString());
} catch (InterruptedException ie)
{
log.error(ie.toString());
} catch (Exception e)
{
e.printStackTrace();
}
}
public static String randomPartitionKey()
{
return new BigInteger(128, new Random()).toString(10);
}
}