Skip to content
This repository has been archived by the owner on Feb 8, 2019. It is now read-only.

[PIRK-4] Add Streaming Implementation for Apache Storm #74

Closed
wants to merge 38 commits into from

Conversation

clharris
Copy link
Contributor

This is an initial implementation for a streaming version of Pirk to run on Apache Storm. I am leaving it temporarily as WIP so people have a chance to look over it and add feedback. Right now there is only one integration test which runs the Storm topology 4 times with each of the different significant configuration possibilities. I wanted to unit test the bolts, but it seems not very straightforward with the way that the Pirk processing works. I'll try to add some documentation (at the minimum a diagram of the Pirk Storm topology) by early next week.

clharris and others added 29 commits July 19, 2016 10:33
fixed logging and scala version issues via pom modifications; refacto…
@@ -65,6 +66,13 @@ else if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("spark"
ComputeResponse computeResponse = new ComputeResponse(fs);
computeResponse.performQuery();
}
else if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("storm"))
Copy link
Member

@smarthi smarthi Aug 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will need to abstract this out to implement this as a Strategy Pattern - the more backends we add like Flink, Spark, Beam and others in the future its only gonna get messy with multi - if-else stmts.

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.1</version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this a property ${kafka.version}

@smarthi
Copy link
Member

smarthi commented Aug 19, 2016

Travis build fails on an Integration Test, here's the stack trace

org.apache.pirk.storm.KafkaStormIntegrationTest testKafkaStormIntegration(org.apache.pirk.storm.KafkaStormIntegrationTest) Time elapsed: 23.867 sec <<< ERROR! java.io.EOFException: null at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2353) at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2822) at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804) at java.io.ObjectInputStream.<init>(ObjectInputStream.java:301) at org.apache.pirk.serialization.JavaSerializer.read(JavaSerializer.java:60) at org.apache.pirk.serialization.LocalFileSystemStore.recall(LocalFileSystemStore.java:108) at org.apache.pirk.serialization.LocalFileSystemStore.recall(LocalFileSystemStore.java:90) at org.apache.pirk.storm.KafkaStormIntegrationTest.performDecryption(KafkaStormIntegrationTest.java:317) at org.apache.pirk.storm.KafkaStormIntegrationTest.runTest(KafkaStormIntegrationTest.java:153) at org.apache.pirk.storm.KafkaStormIntegrationTest.testKafkaStormIntegration(KafkaStormIntegrationTest.java:132) Running org.apache.pirk.wideskies.standalone.StandaloneTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 11.615 sec - in org.apache.pirk.wideskies.standalone.StandaloneTest Running org.apache.pirk.schema.query.LoadQuerySchemaTest Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.058 sec - in org.apache.pirk.schema.query.LoadQuerySchemaTest Running org.apache.pirk.schema.data.LoadDataSchemaTest Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.015 sec - in org.apache.pirk.schema.data.LoadDataSchemaTest Results : Tests in error: KafkaStormIntegrationTest.testKafkaStormIntegration:132->runTest:153->performDecryption:317 » EOF

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to avoid import *, best to specify each import explicitly.

@clharris clharris changed the title PIRK-4 [WIP] Add Streaming Implementation for Apache Storm [PIRK-4] Add Streaming Implementation for Apache Storm Aug 26, 2016
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p/>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ASF header should not contain markup.
True for a number of places in the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how that happened. I'll clean it up. (As well as the other comments you mentioned).

@@ -90,7 +127,7 @@ static boolean validateResponderProperties()
}

String platform = SystemConfiguration.getProperty(PLATFORM).toLowerCase();
if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("standalone"))
if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("storm") && !platform.equals("standalone"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this to a switch stmt using ENUM - the if list is only gonna get longer with addition of other backends and streaming and batch versions for each of them.

@clharris clharris closed this Aug 29, 2016
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
4 participants