Skip to content

Commit

Permalink
Corrected spelling errors
Browse files Browse the repository at this point in the history
  • Loading branch information
cheefoo committed Jan 30, 2017
1 parent 12c89cd commit 34134ab
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 118 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -157,7 +157,7 @@ mvn compile
nohup bash -c "(mvn exec:java -Dexec.mainClass=kinesisencryption.streams.EncryptedConsumerWithStreams > ~/kinesisencryption/logs/EncryptedConsumerWithStreams.log) &> ~/kinesisencryption/logs/EncryptedConsumerWithStreams.log" &

5.Startup the Streams producer
nohup bash -c "(mvn exec:java -Dexec.mainClass=kinesisencryption.streams.EnryptedProducerWithStreams > ~/kinesisencryption/logs/EnryptedProducerWithStreams.log) &> ~/kinesisencryption/logs/EnryptedProducerWithStreams.log" &
nohup bash -c "(mvn exec:java -Dexec.mainClass=kinesisencryption.streams.EncryptedProducerWithStreams > ~/kinesisencryption/logs/EnryptedProducerWithStreams.log) &> ~/kinesisencryption/logs/EnryptedProducerWithStreams.log" &

6.Startup the KCL consumer
nohup bash -c "(mvn exec:java -Dexec.mainClass=kinesisencryption.kcl.EncryptedConsumerWithKCL > ~/kinesisencryption/logs/EncryptedConsumerWithKCL.log) &> ~/kinesisencryption/logs/EncryptedConsumerWithKCL.log" &
Expand Down
2 changes: 1 addition & 1 deletion default_config.properties
Expand Up @@ -266,7 +266,7 @@ RecordTtl = 30000
# The region is also used to sign requests.
#
# Expected pattern: ^([a-z]+-[a-z]+-[0-9])?$
Region = us-east-1
Region = us-west-2

# The maximum total time (milliseconds) elapsed between when we begin a HTTP
# request and receiving all of the response. If it goes over, the request
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/kinesisencryption/dao/BootCarObject.java
Expand Up @@ -19,12 +19,12 @@ public BootCarObject(String name, String year, String odometer)
this.year = year;
this.odometer = odometer;
}
/*@Override
@Override
public String toString()
{
return name + "," + year + "," + odometer;
}
*/




Expand Down
47 changes: 9 additions & 38 deletions src/main/java/kinesisencryption/dao/TickerSalesObject.java
Expand Up @@ -18,43 +18,14 @@ public TickerSalesObject(String tickerSymbol, String salesPrice, String orderId,
this.timeStamp = timeStamp;
}

public String getTickerSymbol()
{
return tickerSymbol;
}

public void setTickerSymbol(String tickerSymbol)
{
this.tickerSymbol = tickerSymbol;
}

public String getSalesPrice()
{
return salesPrice;
}

public void setSalesPrice(String salesPrice)
{
this.salesPrice = salesPrice;
}

public String getOrderId()
{
return orderId;
}

public void setOrderId(String orderId)
{
this.orderId = orderId;
}

public String getTimeStamp()
{
return timeStamp;
}

public void setTimeStamp(String timeStamp)
{
this.timeStamp = timeStamp;
@Override
public String toString()
{
return "TickerSalesObject{" +
"tickerSymbol='" + tickerSymbol + '\'' +
", salesPrice='" + salesPrice + '\'' +
", orderId='" + orderId + '\'' +
", timeStamp='" + timeStamp + '\'' +
'}';
}
}
Expand Up @@ -30,9 +30,11 @@ private static void initialize()
{
java.security.Security.setProperty("networkaddress.cache.ttl", "60");
credentialsProvider = new DefaultAWSCredentialsProviderChain();

try
{
credentialsProvider.getCredentials();

}
catch(Exception e)
{
Expand Down
62 changes: 37 additions & 25 deletions src/main/java/kinesisencryption/kpl/EncryptedProducerWithKPL.java
@@ -1,19 +1,14 @@
package kinesisencryption.kpl;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;

import java.io.IOException;
import java.io.InputStreamReader;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.*;

import com.amazonaws.encryptionsdk.AwsCrypto;
import com.amazonaws.encryptionsdk.kms.KmsMasterKeyProvider;
import com.amazonaws.services.kms.AWSKMSClient;
import kinesisencryption.dao.BootCarObject;
import kinesisencryption.dao.TickerSalesObject;
import kinesisencryption.utils.KinesisEncryptionUtils;
import org.slf4j.Logger;
Expand All @@ -36,9 +31,19 @@ public class EncryptedProducerWithKPL
{
private static final Logger log = LoggerFactory.getLogger(EncryptedProducerWithKPL.class);
private static final String DELIM = ",";
private List<TickerSalesObject> tickerSymbolList;


public static KinesisProducer getKinesisProducer()
public List<TickerSalesObject> getTickerSymbolList()
{
return tickerSymbolList;
}

public void setTickerSymbolList(List<TickerSalesObject> tickerSymbolList)
{
this.tickerSymbolList = tickerSymbolList;
}

public static KinesisProducer getKinesisProducer()
{
KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile("default_config.properties");
config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain());
Expand Down Expand Up @@ -79,6 +84,9 @@ public static void main (String [] args) throws Exception


List<TickerSalesObject> tickerObjectList = KinesisEncryptionUtils.getDataObjects(filePath);
EncryptedProducerWithKPL encryptedProducerWithKPLProducer = new EncryptedProducerWithKPL();
encryptedProducerWithKPLProducer.setTickerSymbolList(tickerObjectList);

final FutureCallback<UserRecordResult> callback = new FutureCallback<UserRecordResult>() {
@Override
public void onFailure(Throwable t) {
Expand All @@ -103,24 +111,28 @@ public void onSuccess(UserRecordResult result) {
try
{
String streamName = KinesisEncryptionUtils.getProperties().getProperty("stream_name");
while(true)
{
for(TickerSalesObject ticker: tickerObjectList)
{

log.info("Before encryption record is : "+ ticker + "and size is : "
+ KinesisEncryptionUtils.calculateSizeOfObject(ticker.toString()));
String encryptedString = KinesisEncryptionUtils.toEncryptedString(crypto, ticker, prov,context);

log.info("Size of encrypted object is : "+ KinesisEncryptionUtils.calculateSizeOfObject(encryptedString));
if(KinesisEncryptionUtils.calculateSizeOfObject(encryptedString) >1024000)
log.warn("Record added is greater than 1MB and may be throttled");
ByteBuffer data = KinesisEncryptionUtils.toEncryptedByteStream(encryptedString);
ListenableFuture<UserRecordResult> f = producer.addUserRecord(streamName, randomPartitionKey(), data);

Futures.addCallback(f, callback);
log.info("Encrypted record " + data.toString() + " " + "added successfully" );

}
tickerObjectList = encryptedProducerWithKPLProducer.getTickerSymbolList();
}

for(TickerSalesObject ticker: tickerObjectList)
{

log.info("Before encryption record is : "+ ticker + "and size is : "
+ KinesisEncryptionUtils.calculateSizeOfObject(ticker.toString()));
String encryptedString = KinesisEncryptionUtils.toEncryptedString(crypto, ticker, prov,context);

log.info("Size of encrypted object is : "+ KinesisEncryptionUtils.calculateSizeOfObject(encryptedString));
if(KinesisEncryptionUtils.calculateSizeOfObject(encryptedString) >1024000)
log.warn("Record added is greater than 1MB and may be throttled");
ByteBuffer data = KinesisEncryptionUtils.toEncryptedByteStream(encryptedString);
ListenableFuture<UserRecordResult> f = producer.addUserRecord(streamName, randomPartitionKey(), data);

Futures.addCallback(f, callback);
log.info("Encrypted record " + data.toString() + " " + "added successfully" );

}
}
catch(Exception e)
{
Expand Down
Expand Up @@ -30,14 +30,6 @@ public class DecryptShardConsumerThread implements Runnable
private final static CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
private static final Logger log = LoggerFactory.getLogger(DecryptShardConsumerThread.class);

public DecryptShardConsumerThread(String shardIterator, String shardId, AmazonKinesisClient kinesis, AWSKMSClient kms )
{
this.shardIterator = shardIterator;
this.shardId = shardId;
this.kms= kms;
this.kinesis = kinesis;

}

public DecryptShardConsumerThread(String shardIterator, String shardId, AmazonKinesisClient kinesis, Map<String, String> context, String keyArn)
{
Expand Down Expand Up @@ -100,7 +92,7 @@ public void run()
{
ByteBuffer buffer = record.getData();
String decryptedResult = KinesisEncryptionUtils.decryptByteStream(crypto,buffer,prov,this.getKeyArn(), this.getContext());
log.info("Decrypted Result is " + decryptedResult);
log.info("Decrypted Text Result is " + decryptedResult);
}
catch (CharacterCodingException e)
{
Expand Down
@@ -1,36 +1,27 @@
package kinesisencryption.streams;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;

import java.io.IOException;
import java.io.InputStreamReader;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.*;

import com.amazonaws.encryptionsdk.AwsCrypto;
import com.amazonaws.encryptionsdk.kms.KmsMasterKeyProvider;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kms.AWSKMSClient;
import kinesisencryption.dao.BootCarObject;
import kinesisencryption.dao.TickerSalesObject;
import kinesisencryption.utils.KinesisEncryptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResult;


public class EnryptedProducerWithStreams
public class EncryptedProducerWithStreams
{

private static final String DELIM = ",";

private static final Logger log = LoggerFactory.getLogger(EnryptedProducerWithStreams.class);
private static final Logger log = LoggerFactory.getLogger(EncryptedProducerWithStreams.class);
private List<TickerSalesObject> tickerSymbolList;

public List<TickerSalesObject> getTickerSymbolList()
Expand Down Expand Up @@ -77,48 +68,34 @@ public static void main(String[] args)
log.info("Successfully retrieved file location property " + fileLocation);

List<TickerSalesObject> tickerSymbolsList = KinesisEncryptionUtils.getDataObjects(fileLocation);
EnryptedProducerWithStreams producer = new EnryptedProducerWithStreams();
EncryptedProducerWithStreams producer = new EncryptedProducerWithStreams();
producer.setTickerSymbolList(tickerSymbolsList);

final Map<String, String> context = Collections.singletonMap("Kinesis", encryptionContext);
final KmsMasterKeyProvider prov = new KmsMasterKeyProvider(keyArn);
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();

List<PutRecordsRequestEntry> ptreList = new ArrayList<PutRecordsRequestEntry>();
int batch = 1;
while (true)
{
int i = 1;

for (TickerSalesObject ticker : tickerSymbolsList)
{
if (i == 500)//Put Record batch requests are 500
{
putRecordsRequest.setRecords(ptreList);
putRecordsRequest.setStreamName(streamName);
PutRecordsResult putRecordsResult =
kinesis.putRecords(putRecordsRequest);
log.info("PutRecordsResult : " + putRecordsResult.toString()
+ " has Batch Number : " + batch);
break;
}
PutRecordsRequestEntry ptre = new PutRecordsRequestEntry();
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(streamName);
log.info("Before encryption size of String Object is "
+ KinesisEncryptionUtils.calculateSizeOfObject(ticker.toString()));
String encryptedString = KinesisEncryptionUtils.toEncryptedString(crypto, ticker, prov, context);
log.info("Size of encrypted object is : " + KinesisEncryptionUtils.calculateSizeOfObject(encryptedString));
if (KinesisEncryptionUtils.calculateSizeOfObject(encryptedString) > 1024000)
log.warn("Record added is greater than 1MB and may be throttled");
ByteBuffer data = KinesisEncryptionUtils.toEncryptedByteStream(encryptedString);
ptre.setData(data);
ptre.setPartitionKey(randomPartitionKey());
ptreList.add(ptre);
putRecordRequest.setData(data);
putRecordRequest.setPartitionKey(randomPartitionKey());
kinesis.putRecord(putRecordRequest);
log.info("Ticker added :" + ticker.toString() + "Ticker Cipher :" + data.toString() + "and size : "
+ KinesisEncryptionUtils.calculateSizeOfObject(data.toString()));
i++;

}
ptreList = new ArrayList();
tickerSymbolsList = producer.getTickerSymbolList();
batch++;
Thread.sleep(100);
}
} catch (IOException ioe)
Expand Down
11 changes: 4 additions & 7 deletions src/test/java/com/tayo/KinesisEncryption/TestEncryptionSDK.java
Expand Up @@ -30,21 +30,18 @@ public class TestEncryptionSDK extends TestCase
{
BootCarObject car;
String keyId;
AmazonKinesisClient kinesis;
private static final String STREAM_NAME = "UnitTestStream";

AWSKMSClient kms;
private final static CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
final static String keyArn = "arn:aws:kms:us-east-1:xxxxxxx:key/mykey-3f1c-4a77-a51d-isinaws";
final static String keyArn = "arn:aws:kms:us-east-1:573906581002:key/37dc90dc-3f1c-4a77-a51d-a653b173fcdb";
final AwsCrypto crypto = new AwsCrypto();
final KmsMasterKeyProvider prov = new KmsMasterKeyProvider(keyArn);

public void setUp() throws Exception
{
super.setUp();
car = new BootCarObject("Volvo 740 GL", "2012","134000");
keyId="mykey-3f1c-4a77-a51d-isinaws";
kinesis = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()
.getCredentials()).withRegion(Regions.US_EAST_1);
keyId="37dc90dc-3f1c-4a77-a51d-a653b173fcdb";

kms = new AWSKMSClient(new DefaultAWSCredentialsProviderChain()
.getCredentials()).withRegion(Regions.US_EAST_1);

Expand Down

0 comments on commit 34134ab

Please sign in to comment.