Skip to content
This repository has been archived by the owner on Feb 8, 2019. It is now read-only.

Commit

Permalink
Updated EDSL
Browse files Browse the repository at this point in the history
- stream fields not arrays anymore
- added tests using EDSL
  - wordcount regression test
  - enabled CounterApp example
  • Loading branch information
matthieumorel committed Jun 29, 2012
1 parent b712f9c commit db94ea8
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 168 deletions.
Expand Up @@ -616,10 +616,10 @@ protected Object clone() {
* Helper method to be used by PE implementation classes. Sends an event to all the target streams.
*
*/
protected <T extends Event> void emit(T event, Stream<T>[] streamArray) {
protected <T extends Event> void emit(T event, Stream<T>... streams) {

for (int i = 0; i < streamArray.length; i++) {
streamArray[i].put(event);
for (int i = 0; i < streams.length; i++) {
streams[i].put(event);
}
}

Expand Down
Expand Up @@ -126,6 +126,7 @@ public Stream<T> setKey(String keyName) {
* @return the stream object
*/
public Stream<T> setPE(ProcessingElement pe) {
this.targetPEs = new ProcessingElement[] { pe };
app.addStream(this);
return this;
}
Expand Down

This file was deleted.

Expand Up @@ -53,14 +53,17 @@ public void prepare() throws IOException, InterruptedException, KeeperException
*/
@Test
public void testSimple() throws Exception {
testWordCountApp(WordCountApp.class);
}

protected void testWordCountApp(Class<?> appClass) throws IOException, KeeperException, InterruptedException {
final ZooKeeper zk = CommTestUtils.createZkClient();
TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
String clusterName = "clusterA";
taskSetup.clean("s4");
taskSetup.setup(clusterName, 1, 10000);

Main.main(new String[] { "-cluster=" + clusterName, "-appClass=" + WordCountApp.class.getName(),
"-extraModulesClasses=" + WordCountModule.class.getName() });
Main.main(new String[] { "-cluster=" + clusterName, "-appClass=" + appClass.getName() });

CountDownLatch signalTextProcessed = new CountDownLatch(1);
CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
Expand All @@ -76,7 +79,6 @@ public void testSimple() throws Exception {
File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
String s = CommTestUtils.readFile(results);
Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);

}

@After
Expand Down
2 changes: 1 addition & 1 deletion subprojects/s4-edsl/s4-edsl.gradle
Expand Up @@ -7,7 +7,7 @@ def diezelSrcDir = "${projectDir}/src/main/diezel";

dependencies {
compile project(":s4-core")
//testCompile project(path: ':s4-core', configuration: 'tests')
testCompile project(':s4-core').sourceSets.test.output
}


Expand Down
@@ -1,6 +1,5 @@
package org.apache.s4.edsl;

import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Iterator;
Expand All @@ -16,6 +15,7 @@
import org.slf4j.LoggerFactory;

import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;

Expand Down Expand Up @@ -221,7 +221,7 @@ private void setStreamField(ProcessingElement pe, Collection<StreamBuilder<? ext
for (Field field : fields) {
logger.trace("Field [{}] is of generic type [{}].", field.getName(), field.getGenericType());

if (field.getType() == Stream[].class) {
if (field.getType() == Stream.class) {
logger.debug("Found stream field: {}", field.getGenericType());

/* Track what fields have streams with the same event type. */
Expand All @@ -231,12 +231,12 @@ private void setStreamField(ProcessingElement pe, Collection<StreamBuilder<? ext
}

/* Assign streams to stream fields. */
Multimap<Field, Stream<? extends Event>> assignment = LinkedListMultimap.create();
Map<Field, Stream<? extends Event>> assignment = Maps.newHashMap();
for (StreamBuilder<? extends Event> sm : streams) {

Stream<? extends Event> stream = sm.stream;
Class<? extends Event> eventType = sm.type;
String key = Stream.class.getCanonicalName() + "<" + eventType.getCanonicalName() + ">[]";
String key = Stream.class.getCanonicalName() + "<" + eventType.getCanonicalName() + ">";
if (typeMap.containsKey(key)) {
String fieldName;
Field field;
Expand Down Expand Up @@ -297,22 +297,17 @@ private void setStreamField(ProcessingElement pe, Collection<StreamBuilder<? ext
}
/* Now we construct the array and do the final assignment. */

Map<Field, Collection<Stream<? extends Event>>> assignmentMap = assignment.asMap();
for (Map.Entry<Field, Collection<Stream<? extends Event>>> entry : assignmentMap.entrySet()) {
// Map<Field, Stream<? extends Event>> assignmentMap = assignment.asMap();
for (Map.Entry<Field, Stream<? extends Event>> entry : assignment.entrySet()) {
Field f = entry.getKey();

int arraySize = entry.getValue().size();
@SuppressWarnings("unchecked")
Stream<? extends Event> streamArray[] = (Stream<? extends Event>[]) Array.newInstance(Stream.class,
arraySize);
int i = 0;
for (Stream<? extends Event> s : entry.getValue()) {
streamArray[i++] = s;

f.setAccessible(true);
f.set(pe, streamArray);
logger.debug("Assigned [{}] streams to field [{}].", streamArray.length, f.getName());
}
// int arraySize = entry.getValue().size();
// Stream<? extends Event> streamArray[] = (Stream<? extends Event>[]) Array.newInstance(Stream.class,
// arraySize);
f.setAccessible(true);
f.set(pe, entry.getValue());

logger.debug("Assigned stream [{}] to field [{}].", entry.getValue().getName(), f.getName());
}
}

Expand Down
@@ -0,0 +1,36 @@
package org.apache.s4.edsl.wordcount;

import java.io.IOException;

import org.apache.s4.core.Stream;
import org.apache.s4.edsl.BuilderS4DSL;
import org.apache.s4.fixtures.SocketAdapter;
import org.apache.s4.wordcount.SentenceKeyFinder;
import org.apache.s4.wordcount.StringEvent;
import org.apache.s4.wordcount.WordClassifierPE;
import org.apache.s4.wordcount.WordCountEvent;
import org.apache.s4.wordcount.WordCountKeyFinder;
import org.apache.s4.wordcount.WordCounterPE;
import org.apache.s4.wordcount.WordSeenEvent;
import org.apache.s4.wordcount.WordSeenKeyFinder;
import org.apache.s4.wordcount.WordSplitterPE;

public class WordCountApp extends BuilderS4DSL {

SocketAdapter<StringEvent> socketAdapter;

protected void onInit() {
pe("Classifier").type(WordClassifierPE.class).pe("Counter").type(WordCounterPE.class)
.emit(WordCountEvent.class).withKeyFinder(WordCountKeyFinder.class).to("Classifier").pe("Splitter")
.type(WordSplitterPE.class).emit(WordSeenEvent.class).withKeyFinder(WordSeenKeyFinder.class)
.to("Counter").build();
Stream<StringEvent> sentenceStream = createStream("sentences stream", new SentenceKeyFinder(),
getPE("Splitter"));
try {
socketAdapter = new SocketAdapter<StringEvent>(sentenceStream, new SocketAdapter.SentenceEventFactory());
} catch (IOException e) {
throw new RuntimeException(e);
}
}

}
@@ -0,0 +1,13 @@
package org.apache.s4.edsl.wordcount;

import org.junit.Test;

public class WordCountTest extends org.apache.s4.wordcount.WordCountTest {

@Test
public void testSimple() throws Exception {
testWordCountApp(WordCountApp.class);

}

}
4 changes: 3 additions & 1 deletion subprojects/s4-example/s4-example.gradle
Expand Up @@ -15,12 +15,14 @@
*/

description = 'Examples of S4 applications.'

dependencies {
compile project( ":s4-base" )
compile project( ":s4-core" )
compile project( ":s4-comm" )
compile project( ":s4-edsl" )
compile libraries.ejml
compile libraries.junit
// TODO remove this dependency only used for initializing a zookeeper server
compile project(':s4-core').sourceSets.test.output
}

0 comments on commit db94ea8

Please sign in to comment.