From 34134ab857d40050818f13fb6409c730792bea20 Mon Sep 17 00:00:00 2001 From: Olajide Date: Mon, 30 Jan 2017 15:17:07 -0600 Subject: [PATCH] Corrected spelling errors --- README.md | 2 +- default_config.properties | 2 +- .../kinesisencryption/dao/BootCarObject.java | 4 +- .../dao/TickerSalesObject.java | 47 +++----------- .../kcl/EncryptedConsumerWithKCL.java | 2 + .../kpl/EncryptedProducerWithKPL.java | 62 +++++++++++-------- .../streams/DecryptShardConsumerThread.java | 10 +-- ...java => EncryptedProducerWithStreams.java} | 47 ++++---------- .../KinesisEncryption/TestEncryptionSDK.java | 11 ++-- 9 files changed, 69 insertions(+), 118 deletions(-) rename src/main/java/kinesisencryption/streams/{EnryptedProducerWithStreams.java => EncryptedProducerWithStreams.java} (74%) diff --git a/README.md b/README.md index ccc64bc..1b8be61 100644 --- a/README.md +++ b/README.md @@ -157,7 +157,7 @@ mvn compile nohup bash -c "(mvn exec:java -Dexec.mainClass=kinesisencryption.streams.EncryptedConsumerWithStreams > ~/kinesisencryption/logs/EncryptedConsumerWithStreams.log) &> ~/kinesisencryption/logs/EncryptedConsumerWithStreams.log" & 5.Startup the Streams producer - nohup bash -c "(mvn exec:java -Dexec.mainClass=kinesisencryption.streams.EnryptedProducerWithStreams > ~/kinesisencryption/logs/EnryptedProducerWithStreams.log) &> ~/kinesisencryption/logs/EnryptedProducerWithStreams.log" & + nohup bash -c "(mvn exec:java -Dexec.mainClass=kinesisencryption.streams.EncryptedProducerWithStreams > ~/kinesisencryption/logs/EnryptedProducerWithStreams.log) &> ~/kinesisencryption/logs/EnryptedProducerWithStreams.log" & 6.Startup the KCL consumer nohup bash -c "(mvn exec:java -Dexec.mainClass=kinesisencryption.kcl.EncryptedConsumerWithKCL > ~/kinesisencryption/logs/EncryptedConsumerWithKCL.log) &> ~/kinesisencryption/logs/EncryptedConsumerWithKCL.log" & diff --git a/default_config.properties b/default_config.properties index 229abde..ffbc692 100644 --- a/default_config.properties +++ b/default_config.properties @@ -266,7 +266,7 @@ RecordTtl = 30000 # The region is also used to sign requests. # # Expected pattern: ^([a-z]+-[a-z]+-[0-9])?$ -Region = us-east-1 +Region = us-west-2 # The maximum total time (milliseconds) elapsed between when we begin a HTTP # request and receiving all of the response. If it goes over, the request diff --git a/src/main/java/kinesisencryption/dao/BootCarObject.java b/src/main/java/kinesisencryption/dao/BootCarObject.java index 59d6cf8..aef6b82 100644 --- a/src/main/java/kinesisencryption/dao/BootCarObject.java +++ b/src/main/java/kinesisencryption/dao/BootCarObject.java @@ -19,12 +19,12 @@ public BootCarObject(String name, String year, String odometer) this.year = year; this.odometer = odometer; } - /*@Override + @Override public String toString() { return name + "," + year + "," + odometer; } - */ + diff --git a/src/main/java/kinesisencryption/dao/TickerSalesObject.java b/src/main/java/kinesisencryption/dao/TickerSalesObject.java index ed994c9..19adcbb 100644 --- a/src/main/java/kinesisencryption/dao/TickerSalesObject.java +++ b/src/main/java/kinesisencryption/dao/TickerSalesObject.java @@ -18,43 +18,14 @@ public TickerSalesObject(String tickerSymbol, String salesPrice, String orderId, this.timeStamp = timeStamp; } - public String getTickerSymbol() - { - return tickerSymbol; - } - - public void setTickerSymbol(String tickerSymbol) - { - this.tickerSymbol = tickerSymbol; - } - - public String getSalesPrice() - { - return salesPrice; - } - - public void setSalesPrice(String salesPrice) - { - this.salesPrice = salesPrice; - } - - public String getOrderId() - { - return orderId; - } - - public void setOrderId(String orderId) - { - this.orderId = orderId; - } - - public String getTimeStamp() - { - return timeStamp; - } - - public void setTimeStamp(String timeStamp) - { - this.timeStamp = timeStamp; + @Override + public String toString() + { + return "TickerSalesObject{" + + "tickerSymbol='" + tickerSymbol + '\'' + + ", salesPrice='" + salesPrice + '\'' + + ", orderId='" + orderId + '\'' + + ", timeStamp='" + timeStamp + '\'' + + '}'; } } diff --git a/src/main/java/kinesisencryption/kcl/EncryptedConsumerWithKCL.java b/src/main/java/kinesisencryption/kcl/EncryptedConsumerWithKCL.java index c99e4d6..7e55d05 100644 --- a/src/main/java/kinesisencryption/kcl/EncryptedConsumerWithKCL.java +++ b/src/main/java/kinesisencryption/kcl/EncryptedConsumerWithKCL.java @@ -30,9 +30,11 @@ private static void initialize() { java.security.Security.setProperty("networkaddress.cache.ttl", "60"); credentialsProvider = new DefaultAWSCredentialsProviderChain(); + try { credentialsProvider.getCredentials(); + } catch(Exception e) { diff --git a/src/main/java/kinesisencryption/kpl/EncryptedProducerWithKPL.java b/src/main/java/kinesisencryption/kpl/EncryptedProducerWithKPL.java index 7ff43c1..a590f8e 100644 --- a/src/main/java/kinesisencryption/kpl/EncryptedProducerWithKPL.java +++ b/src/main/java/kinesisencryption/kpl/EncryptedProducerWithKPL.java @@ -1,11 +1,7 @@ package kinesisencryption.kpl; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; + import java.io.IOException; -import java.io.InputStreamReader; import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.*; @@ -13,7 +9,6 @@ import com.amazonaws.encryptionsdk.AwsCrypto; import com.amazonaws.encryptionsdk.kms.KmsMasterKeyProvider; import com.amazonaws.services.kms.AWSKMSClient; -import kinesisencryption.dao.BootCarObject; import kinesisencryption.dao.TickerSalesObject; import kinesisencryption.utils.KinesisEncryptionUtils; import org.slf4j.Logger; @@ -36,9 +31,19 @@ public class EncryptedProducerWithKPL { private static final Logger log = LoggerFactory.getLogger(EncryptedProducerWithKPL.class); private static final String DELIM = ","; + private List tickerSymbolList; - - public static KinesisProducer getKinesisProducer() + public List getTickerSymbolList() + { + return tickerSymbolList; + } + + public void setTickerSymbolList(List tickerSymbolList) + { + this.tickerSymbolList = tickerSymbolList; + } + + public static KinesisProducer getKinesisProducer() { KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile("default_config.properties"); config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain()); @@ -79,6 +84,9 @@ public static void main (String [] args) throws Exception List tickerObjectList = KinesisEncryptionUtils.getDataObjects(filePath); + EncryptedProducerWithKPL encryptedProducerWithKPLProducer = new EncryptedProducerWithKPL(); + encryptedProducerWithKPLProducer.setTickerSymbolList(tickerObjectList); + final FutureCallback callback = new FutureCallback() { @Override public void onFailure(Throwable t) { @@ -103,24 +111,28 @@ public void onSuccess(UserRecordResult result) { try { String streamName = KinesisEncryptionUtils.getProperties().getProperty("stream_name"); + while(true) + { + for(TickerSalesObject ticker: tickerObjectList) + { + + log.info("Before encryption record is : "+ ticker + "and size is : " + + KinesisEncryptionUtils.calculateSizeOfObject(ticker.toString())); + String encryptedString = KinesisEncryptionUtils.toEncryptedString(crypto, ticker, prov,context); + + log.info("Size of encrypted object is : "+ KinesisEncryptionUtils.calculateSizeOfObject(encryptedString)); + if(KinesisEncryptionUtils.calculateSizeOfObject(encryptedString) >1024000) + log.warn("Record added is greater than 1MB and may be throttled"); + ByteBuffer data = KinesisEncryptionUtils.toEncryptedByteStream(encryptedString); + ListenableFuture f = producer.addUserRecord(streamName, randomPartitionKey(), data); + + Futures.addCallback(f, callback); + log.info("Encrypted record " + data.toString() + " " + "added successfully" ); + + } + tickerObjectList = encryptedProducerWithKPLProducer.getTickerSymbolList(); + } - for(TickerSalesObject ticker: tickerObjectList) - { - - log.info("Before encryption record is : "+ ticker + "and size is : " - + KinesisEncryptionUtils.calculateSizeOfObject(ticker.toString())); - String encryptedString = KinesisEncryptionUtils.toEncryptedString(crypto, ticker, prov,context); - - log.info("Size of encrypted object is : "+ KinesisEncryptionUtils.calculateSizeOfObject(encryptedString)); - if(KinesisEncryptionUtils.calculateSizeOfObject(encryptedString) >1024000) - log.warn("Record added is greater than 1MB and may be throttled"); - ByteBuffer data = KinesisEncryptionUtils.toEncryptedByteStream(encryptedString); - ListenableFuture f = producer.addUserRecord(streamName, randomPartitionKey(), data); - - Futures.addCallback(f, callback); - log.info("Encrypted record " + data.toString() + " " + "added successfully" ); - - } } catch(Exception e) { diff --git a/src/main/java/kinesisencryption/streams/DecryptShardConsumerThread.java b/src/main/java/kinesisencryption/streams/DecryptShardConsumerThread.java index 7cb1ee3..a1e7d3a 100644 --- a/src/main/java/kinesisencryption/streams/DecryptShardConsumerThread.java +++ b/src/main/java/kinesisencryption/streams/DecryptShardConsumerThread.java @@ -30,14 +30,6 @@ public class DecryptShardConsumerThread implements Runnable private final static CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); private static final Logger log = LoggerFactory.getLogger(DecryptShardConsumerThread.class); - public DecryptShardConsumerThread(String shardIterator, String shardId, AmazonKinesisClient kinesis, AWSKMSClient kms ) - { - this.shardIterator = shardIterator; - this.shardId = shardId; - this.kms= kms; - this.kinesis = kinesis; - - } public DecryptShardConsumerThread(String shardIterator, String shardId, AmazonKinesisClient kinesis, Map context, String keyArn) { @@ -100,7 +92,7 @@ public void run() { ByteBuffer buffer = record.getData(); String decryptedResult = KinesisEncryptionUtils.decryptByteStream(crypto,buffer,prov,this.getKeyArn(), this.getContext()); - log.info("Decrypted Result is " + decryptedResult); + log.info("Decrypted Text Result is " + decryptedResult); } catch (CharacterCodingException e) { diff --git a/src/main/java/kinesisencryption/streams/EnryptedProducerWithStreams.java b/src/main/java/kinesisencryption/streams/EncryptedProducerWithStreams.java similarity index 74% rename from src/main/java/kinesisencryption/streams/EnryptedProducerWithStreams.java rename to src/main/java/kinesisencryption/streams/EncryptedProducerWithStreams.java index 2b011f8..04f908a 100644 --- a/src/main/java/kinesisencryption/streams/EnryptedProducerWithStreams.java +++ b/src/main/java/kinesisencryption/streams/EncryptedProducerWithStreams.java @@ -1,18 +1,14 @@ package kinesisencryption.streams; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; + import java.io.IOException; -import java.io.InputStreamReader; import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.*; - import com.amazonaws.encryptionsdk.AwsCrypto; import com.amazonaws.encryptionsdk.kms.KmsMasterKeyProvider; +import com.amazonaws.services.kinesis.model.PutRecordRequest; import com.amazonaws.services.kms.AWSKMSClient; -import kinesisencryption.dao.BootCarObject; import kinesisencryption.dao.TickerSalesObject; import kinesisencryption.utils.KinesisEncryptionUtils; import org.slf4j.Logger; @@ -20,17 +16,12 @@ import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.services.kinesis.AmazonKinesisClient; -import com.amazonaws.services.kinesis.model.PutRecordsRequest; -import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry; -import com.amazonaws.services.kinesis.model.PutRecordsResult; -public class EnryptedProducerWithStreams +public class EncryptedProducerWithStreams { - private static final String DELIM = ","; - - private static final Logger log = LoggerFactory.getLogger(EnryptedProducerWithStreams.class); + private static final Logger log = LoggerFactory.getLogger(EncryptedProducerWithStreams.class); private List tickerSymbolList; public List getTickerSymbolList() @@ -77,31 +68,19 @@ public static void main(String[] args) log.info("Successfully retrieved file location property " + fileLocation); List tickerSymbolsList = KinesisEncryptionUtils.getDataObjects(fileLocation); - EnryptedProducerWithStreams producer = new EnryptedProducerWithStreams(); + EncryptedProducerWithStreams producer = new EncryptedProducerWithStreams(); producer.setTickerSymbolList(tickerSymbolsList); final Map context = Collections.singletonMap("Kinesis", encryptionContext); final KmsMasterKeyProvider prov = new KmsMasterKeyProvider(keyArn); - PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); - List ptreList = new ArrayList(); - int batch = 1; while (true) { - int i = 1; + for (TickerSalesObject ticker : tickerSymbolsList) { - if (i == 500)//Put Record batch requests are 500 - { - putRecordsRequest.setRecords(ptreList); - putRecordsRequest.setStreamName(streamName); - PutRecordsResult putRecordsResult = - kinesis.putRecords(putRecordsRequest); - log.info("PutRecordsResult : " + putRecordsResult.toString() - + " has Batch Number : " + batch); - break; - } - PutRecordsRequestEntry ptre = new PutRecordsRequestEntry(); + PutRecordRequest putRecordRequest = new PutRecordRequest(); + putRecordRequest.setStreamName(streamName); log.info("Before encryption size of String Object is " + KinesisEncryptionUtils.calculateSizeOfObject(ticker.toString())); String encryptedString = KinesisEncryptionUtils.toEncryptedString(crypto, ticker, prov, context); @@ -109,16 +88,14 @@ public static void main(String[] args) if (KinesisEncryptionUtils.calculateSizeOfObject(encryptedString) > 1024000) log.warn("Record added is greater than 1MB and may be throttled"); ByteBuffer data = KinesisEncryptionUtils.toEncryptedByteStream(encryptedString); - ptre.setData(data); - ptre.setPartitionKey(randomPartitionKey()); - ptreList.add(ptre); + putRecordRequest.setData(data); + putRecordRequest.setPartitionKey(randomPartitionKey()); + kinesis.putRecord(putRecordRequest); log.info("Ticker added :" + ticker.toString() + "Ticker Cipher :" + data.toString() + "and size : " + KinesisEncryptionUtils.calculateSizeOfObject(data.toString())); - i++; + } - ptreList = new ArrayList(); tickerSymbolsList = producer.getTickerSymbolList(); - batch++; Thread.sleep(100); } } catch (IOException ioe) diff --git a/src/test/java/com/tayo/KinesisEncryption/TestEncryptionSDK.java b/src/test/java/com/tayo/KinesisEncryption/TestEncryptionSDK.java index 66786d9..1929e49 100644 --- a/src/test/java/com/tayo/KinesisEncryption/TestEncryptionSDK.java +++ b/src/test/java/com/tayo/KinesisEncryption/TestEncryptionSDK.java @@ -30,11 +30,9 @@ public class TestEncryptionSDK extends TestCase { BootCarObject car; String keyId; - AmazonKinesisClient kinesis; - private static final String STREAM_NAME = "UnitTestStream"; + AWSKMSClient kms; - private final static CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); - final static String keyArn = "arn:aws:kms:us-east-1:xxxxxxx:key/mykey-3f1c-4a77-a51d-isinaws"; + final static String keyArn = "arn:aws:kms:us-east-1:573906581002:key/37dc90dc-3f1c-4a77-a51d-a653b173fcdb"; final AwsCrypto crypto = new AwsCrypto(); final KmsMasterKeyProvider prov = new KmsMasterKeyProvider(keyArn); @@ -42,9 +40,8 @@ public void setUp() throws Exception { super.setUp(); car = new BootCarObject("Volvo 740 GL", "2012","134000"); - keyId="mykey-3f1c-4a77-a51d-isinaws"; - kinesis = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain() - .getCredentials()).withRegion(Regions.US_EAST_1); + keyId="37dc90dc-3f1c-4a77-a51d-a653b173fcdb"; + kms = new AWSKMSClient(new DefaultAWSCredentialsProviderChain() .getCredentials()).withRegion(Regions.US_EAST_1);