From a4c3a5ea41b7e6e5e16f9f1057ebdfa38a8611bd Mon Sep 17 00:00:00 2001 From: pangolulu Date: Thu, 7 Apr 2016 19:45:27 +0800 Subject: [PATCH] SAMOA-59, add an adapter for Apache Gearpump --- bin/samoa | 30 ++++ pom.xml | 14 ++ samoa-gearpump/README.md | 23 +++ samoa-gearpump/pom.xml | 163 ++++++++++++++++++ .../impl/gearpump/ComponentFactory.java | 58 +++++++ .../samoa/topology/impl/gearpump/DoTask.java | 42 +++++ .../gearpump/EntranceProcessingItemTask.java | 73 ++++++++ .../GearpumpEntranceProcessingItem.java | 78 +++++++++ .../impl/gearpump/GearpumpMessage.java | 66 +++++++ .../impl/gearpump/GearpumpProcessingItem.java | 90 ++++++++++ .../impl/gearpump/GearpumpStream.java | 78 +++++++++ .../impl/gearpump/GearpumpTopology.java | 71 ++++++++ .../impl/gearpump/ProcessingItemTask.java | 77 +++++++++ .../gearpump/SamoaMessagePartitioner.java | 60 +++++++ .../impl/gearpump/SerializableSerializer.java | 70 ++++++++ .../topology/impl/gearpump/TopologyNode.java | 31 ++++ .../samoa/topology/impl/gearpump/Utils.java | 102 +++++++++++ .../src/main/resources/geardefault.conf | 68 ++++++++ 18 files changed, 1194 insertions(+) create mode 100644 samoa-gearpump/README.md create mode 100644 samoa-gearpump/pom.xml create mode 100644 samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/ComponentFactory.java create mode 100644 samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/DoTask.java create mode 100644 samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/EntranceProcessingItemTask.java create mode 100644 samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpEntranceProcessingItem.java create mode 100644 samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpMessage.java create mode 100644 samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpProcessingItem.java create mode 100644 samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpStream.java create mode 100644 samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpTopology.java create mode 100644 samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/ProcessingItemTask.java create mode 100644 samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/SamoaMessagePartitioner.java create mode 100644 samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/SerializableSerializer.java create mode 100644 samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/TopologyNode.java create mode 100644 samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/Utils.java create mode 100644 samoa-gearpump/src/main/resources/geardefault.conf diff --git a/bin/samoa b/bin/samoa index 1a26caa9..7579fdcb 100755 --- a/bin/samoa +++ b/bin/samoa @@ -205,6 +205,36 @@ elif [ $PLATFORM = 'STORM' ]; then java -cp $CLASSPATH org.apache.samoa.LocalStormDoTask $COMPLETE_ARG $NUM_WORKER fi +elif [ $PLATFORM = 'GEARPUMP' ]; then + + echo "Deploying to $PLATFORM" + if [ -z $GEARPUMP_HOME ];then + echo "GEARPUMP_HOME is not set!" + echo "Please set GEARPUMP_HOME to point to your Gearpump installation" + exit -1 + fi + + if [ ! -f $2 ];then + echo "$2 does not exist!" + echo "Please use a valid jar file for Gearpump execution" + exit -1 + fi + + GEARPUMP_EXEC="sh $GEARPUMP_HOME/bin/gear" + + COMPLETE_ARG="" + COUNTER=0 + for var in "$@" + do + COUNTER=`expr $COUNTER + 1` + if [ $COUNTER -gt 2 ];then + COMPLETE_ARG="$COMPLETE_ARG $var" + fi + done + + DEPLOYABLE=$JAR_PATH + $GEARPUMP_EXEC app -jar $DEPLOYABLE org.apache.samoa.topology.impl.gearpump.DoTask $COMPLETE_ARG + elif [ $PLATFORM = 'SAMZA' ]; then echo "Deploying to SAMZA" diff --git a/pom.xml b/pom.xml index 33049375..9e039c04 100644 --- a/pom.xml +++ b/pom.xml @@ -93,6 +93,15 @@ samoa-test + + gearpump + + samoa-instances + samoa-api + samoa-gearpump + samoa-test + + all @@ -103,6 +112,7 @@ samoa-storm samoa-flink samoa-samza + samoa-gearpump samoa-test @@ -127,6 +137,9 @@ 1.0.3 0.7.0 0.10.1 + 0.8.1 + 2.11 + 2.11.5 1.7.2 1.7.5 2.18 @@ -210,6 +223,7 @@ samoa-storm samoa-flink samoa-samza + samoa-gearpump samoa-test samoa-threads bin diff --git a/samoa-gearpump/README.md b/samoa-gearpump/README.md new file mode 100644 index 00000000..4bb8363f --- /dev/null +++ b/samoa-gearpump/README.md @@ -0,0 +1,23 @@ +# Executing Apache SAMOA with Apache Gearpump + +In this tutorial README we describe how to execute Apache SAMOA on top of [Apache Gearpump(incubating)](http://gearpump.apache.org/). + +## Build + +Simply clone the repository and install SAMOA. + +``` +git clone http://git.apache.org/incubator-samoa.git +cd incubator-samoa +mvn -Pgearpump package +``` + +The deployable jar for SAMOA will be in `target/SAMOA-gearpump-0.4.0-incubating-SNAPSHOT.jar`. + +## Executing SAMOA with Gearpump step-by-step + +1. Ensure that you already have Gearpump running. You can follow this [tutorial](http://gearpump.apache.org/releases/latest/deployment-local.html) to deploy Gearpump in local mode. +2. Set `GEARPUMP_HOME` to point to your Gearpump installation path. +3. In the SAMOA path, you can input command to execute SAMOA tasks. For example, `bin/samoa gearpump target/SAMOA-gearpump-0.4.0-incubating-SNAPSHOT.jar "PrequentialEvaluation -d /tmp/dump.csv -i 1000000 -f 100000 -l (classifiers.trees.VerticalHoeffdingTree -p 4) -s (generators.RandomTreeGenerator -c 2 -o 10 -u 10)"` + + diff --git a/samoa-gearpump/pom.xml b/samoa-gearpump/pom.xml new file mode 100644 index 00000000..0fa23ccc --- /dev/null +++ b/samoa-gearpump/pom.xml @@ -0,0 +1,163 @@ + + + + + 4.0.0 + + UTF-8 + + + samoa-gearpump + gearpump bindings for SAMOA + + samoa-gearpump + + org.apache.samoa + samoa + 0.4.0-incubating-SNAPSHOT + + + + + apache.snapshots + Apache Development Snapshot Repository + https://repository.apache.org/content/repositories/snapshots/ + + false + + + true + + + + gearpump-shaded-repo + Vincent at Bintray + http://dl.bintray.com/fvunicorn/maven + + + + + + org.apache.samoa + samoa-api + ${project.version} + + + org.apache.samoa + samoa-test + test-jar + test-jar-with-dependencies + ${project.version} + test + + + org.apache.gearpump + gearpump-streaming_${scala.binary.version} + ${gearpump.version} + + + org.apache.gearpump + gearpump-daemon_${scala.binary.version} + ${gearpump.version} + + + org.apache.gearpump + gearpump-experimental-cgroup_${scala.binary.version} + + + + + org.slf4j + slf4j-log4j12 + ${slf4j-log4j12.version} + test + + + + + + + net.alchim31.maven + scala-maven-plugin + 3.2.2 + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile-first + process-test-resources + + testCompile + + + + + + + maven-assembly-plugin + ${maven-assembly-plugin.version} + + SAMOA-gearpump-${project.version} + false + false + ../target + + jar-with-dependencies + + + + ${parsedVersion.osgiVersion} + ${project.description} + ${project.version} + Yahoo Labs + SAMOA + + + + + + make-assembly + package + + single + + + + + + org.apache.maven.plugins + maven-surefire-plugin + ${maven-surefire-plugin.version} + + -Xmx1G + false + + + + + diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/ComponentFactory.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/ComponentFactory.java new file mode 100644 index 00000000..14574610 --- /dev/null +++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/ComponentFactory.java @@ -0,0 +1,58 @@ +package org.apache.samoa.topology.impl.gearpump; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.EntranceProcessingItem; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.Topology; + +public class ComponentFactory implements org.apache.samoa.topology.ComponentFactory { + @Override + public ProcessingItem createPi(Processor processor) { + return createPi(processor, 1); + } + + @Override + public ProcessingItem createPi(Processor processor, int parallelism) { + return new GearpumpProcessingItem(processor, parallelism); + } + + @Override + public EntranceProcessingItem createEntrancePi(EntranceProcessor + entranceProcessor) { + return new GearpumpEntranceProcessingItem(entranceProcessor); + } + + @Override + public Stream createStream(IProcessingItem sourcePi) { + TopologyNode topologyNode = (TopologyNode) sourcePi; + return topologyNode.createStream(); + } + + @Override + public Topology createTopology(String topoName) { + return new GearpumpTopology(topoName); + } +} diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/DoTask.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/DoTask.java new file mode 100644 index 00000000..90832fc1 --- /dev/null +++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/DoTask.java @@ -0,0 +1,42 @@ +package org.apache.samoa.topology.impl.gearpump; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.cluster.client.ClientContext; +import org.apache.gearpump.streaming.StreamApplication; +import org.apache.gearpump.util.Graph; + +public class DoTask { + + public static void main(String[] args) { + + GearpumpTopology topology = Utils.argsToTopology(args); + String topologyName = topology.getTopologyName(); + Graph graph = topology.getGraph(); + StreamApplication app = StreamApplication.apply(topologyName, graph, UserConfig.empty()); + ClientContext context = ClientContext.apply(); + context.submit(app); + context.close(); + + } + +} diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/EntranceProcessingItemTask.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/EntranceProcessingItemTask.java new file mode 100644 index 00000000..857bffa2 --- /dev/null +++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/EntranceProcessingItemTask.java @@ -0,0 +1,73 @@ +package org.apache.samoa.topology.impl.gearpump; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.gearpump.Message; +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.streaming.task.StartTime; +import org.apache.gearpump.streaming.task.Task; +import org.apache.gearpump.streaming.task.TaskContext; + +import org.apache.samoa.core.EntranceProcessor; + +public class EntranceProcessingItemTask extends Task { + EntranceProcessor entranceProcessor; + private TaskContext taskContext; + private UserConfig userConfig; + private GearpumpStream outputStream; + + public EntranceProcessingItemTask(TaskContext taskContext, UserConfig userConf) { + super(taskContext, userConf); + this.taskContext = taskContext; + this.userConfig = userConf; + byte[] bytes = userConf.getBytes(Utils.entrancePiConf).get(); + GearpumpEntranceProcessingItem entranceProcessingItem = + ((GearpumpEntranceProcessingItem) Utils.bytesToObject(bytes)); + this.entranceProcessor = entranceProcessingItem.getProcessor(); + this.outputStream = entranceProcessingItem.getStream(); + } + + @Override + public void onStart(StartTime startTime) { + outputStream.setTaskContext(this.taskContext); + + entranceProcessor.onCreate(taskContext.taskId().index()); + self().tell(new Message("start", System.currentTimeMillis()), self()); + } + + @Override + public void onNext(Message msg) { + if (entranceProcessor.hasNext()) { + GearpumpMessage message = + new GearpumpMessage(entranceProcessor.nextEvent(), outputStream.getTargetId(), + outputStream.getScheme()); + taskContext.output(new Message(message, System.currentTimeMillis())); + } + self().tell(new Message("continue", System.currentTimeMillis()), self()); + } + + @Override + public void onStop() { + super.onStop(); + } + +} + diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpEntranceProcessingItem.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpEntranceProcessingItem.java new file mode 100644 index 00000000..d2a490d3 --- /dev/null +++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpEntranceProcessingItem.java @@ -0,0 +1,78 @@ +package org.apache.samoa.topology.impl.gearpump; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.streaming.Processor; + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.topology.AbstractEntranceProcessingItem; +import org.apache.samoa.topology.Stream; + +import java.io.IOException; +import java.io.Serializable; + +public class GearpumpEntranceProcessingItem extends AbstractEntranceProcessingItem + implements TopologyNode, Serializable { + + private GearpumpStream stream; + + public GearpumpEntranceProcessingItem(EntranceProcessor entranceProcessor) { + super(entranceProcessor); + this.setName(entranceProcessor.getClass().getName()); + } + + @Override + public GearpumpStream createStream() { + GearpumpStream stream = new GearpumpStream(this); + this.stream = stream; + return stream; + } + + @Override + public Processor createGearpumpProcessor() { + byte[] bytes = Utils.objectToBytes(this); + UserConfig userConfig = UserConfig.empty().withBytes(Utils.entrancePiConf, bytes); + return new org.apache.gearpump.streaming.Processor.DefaultProcessor<>( + 1, this.getName(), userConfig, EntranceProcessingItemTask.class); + } + + public GearpumpStream getStream() { + return stream; + } + + private void writeObject(java.io.ObjectOutputStream stream) + throws IOException { + stream.writeObject(getProcessor()); + stream.writeObject(getName()); + stream.writeObject(getOutputStream()); + stream.writeObject(this.stream); + } + + private void readObject(java.io.ObjectInputStream stream) + throws IOException, ClassNotFoundException { + setProcessor((org.apache.samoa.core.EntranceProcessor) stream.readObject()); + setName((String) stream.readObject()); + setOutputStream((Stream) stream.readObject()); + this.stream = (GearpumpStream) stream.readObject(); + } + +} diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpMessage.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpMessage.java new file mode 100644 index 00000000..d38bd6e0 --- /dev/null +++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpMessage.java @@ -0,0 +1,66 @@ +package org.apache.samoa.topology.impl.gearpump; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.utils.PartitioningScheme; + +import java.io.Serializable; + +public class GearpumpMessage implements Serializable { + private ContentEvent event; + private PartitioningScheme scheme; + private String targetId; + + public GearpumpMessage() { + this(null, null, null); + } + + public GearpumpMessage(ContentEvent event, String targetId, PartitioningScheme scheme) { + this.event = event; + this.targetId = targetId; + this.scheme = scheme; + } + + public String getTargetId() { + return targetId; + } + + public void setTargetId(String targetId) { + this.targetId = targetId; + } + + public ContentEvent getEvent() { + return event; + } + + public void setEvent(ContentEvent event) { + this.event = event; + } + + public PartitioningScheme getScheme() { + return scheme; + } + + public void setScheme(PartitioningScheme scheme) { + this.scheme = scheme; + } +} diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpProcessingItem.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpProcessingItem.java new file mode 100644 index 00000000..2453a804 --- /dev/null +++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpProcessingItem.java @@ -0,0 +1,90 @@ +package org.apache.samoa.topology.impl.gearpump; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.gearpump.cluster.UserConfig; + +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.AbstractProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.utils.PartitioningScheme; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; + +public class GearpumpProcessingItem extends AbstractProcessingItem implements TopologyNode, Serializable { + private static final long serialVersionUID = -9066409791668954099L; + private Set streams; + + public GearpumpProcessingItem(Processor processor, int parallelism) { + super(processor, parallelism); + this.setName(processor.getClass().getSimpleName()); + this.streams = new HashSet<>(); + } + + @Override + protected org.apache.samoa.topology.ProcessingItem addInputStream(Stream inputStream, + PartitioningScheme scheme) { + ((GearpumpStream) inputStream).setTargetPi(this); + ((GearpumpStream) inputStream).setScheme(scheme); + ((GearpumpStream) inputStream).setTargetId(this.getName()); + + return this; + } + + @Override + public GearpumpStream createStream() { + GearpumpStream stream = new GearpumpStream(this); + streams.add(stream); + return stream; + } + + @Override + public org.apache.gearpump.streaming.Processor createGearpumpProcessor() { + byte[] bytes = Utils.objectToBytes(this); + UserConfig userConfig = UserConfig.empty().withBytes(Utils.piConf, bytes); + return new org.apache.gearpump.streaming.Processor.DefaultProcessor<>( + this.getParallelism(), this.getName(), userConfig, ProcessingItemTask.class); + } + + public Set getStreams() { + return streams; + } + + private void writeObject(java.io.ObjectOutputStream stream) + throws IOException { + stream.writeObject(getProcessor()); + stream.writeObject(getName()); + stream.writeInt(getParallelism()); + stream.writeObject(streams); + } + + private void readObject(java.io.ObjectInputStream stream) + throws IOException, ClassNotFoundException { + setProcessor((Processor) stream.readObject()); + setName((String) stream.readObject()); + setParallelism(stream.readInt()); + streams = (Set) stream.readObject(); + } + +} diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpStream.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpStream.java new file mode 100644 index 00000000..146ce174 --- /dev/null +++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpStream.java @@ -0,0 +1,78 @@ +package org.apache.samoa.topology.impl.gearpump; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.gearpump.Message; +import org.apache.gearpump.streaming.task.TaskContext; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.topology.AbstractStream; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.utils.PartitioningScheme; + +import java.io.Serializable; + +public class GearpumpStream extends AbstractStream implements Serializable { + + private TaskContext taskContext; + private String targetId; + private IProcessingItem targetPi; + private PartitioningScheme scheme; + + public GearpumpStream(IProcessingItem sourcePi) { + super(sourcePi); + } + + public void setTaskContext(TaskContext taskContext) { + this.taskContext = taskContext; + } + + public String getTargetId() { + return targetId; + } + + public void setTargetId(String targetId) { + this.targetId = targetId; + } + + public IProcessingItem getTargetPi() { + return targetPi; + } + + public void setTargetPi(IProcessingItem targetPi) { + this.targetPi = targetPi; + } + + public PartitioningScheme getScheme() { + return scheme; + } + + public void setScheme(PartitioningScheme scheme) { + this.scheme = scheme; + } + + @Override + public void put(ContentEvent event) { + GearpumpMessage message = new GearpumpMessage(event, targetId, scheme); + taskContext.output(new Message(message, System.currentTimeMillis())); + } + +} diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpTopology.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpTopology.java new file mode 100644 index 00000000..f53ada32 --- /dev/null +++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpTopology.java @@ -0,0 +1,71 @@ +package org.apache.samoa.topology.impl.gearpump; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.gearpump.partitioner.Partitioner; +import org.apache.gearpump.streaming.Processor; +import org.apache.gearpump.util.Graph; + +import org.apache.samoa.topology.AbstractTopology; +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.topology.Stream; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class GearpumpTopology extends AbstractTopology { + + private Graph graph; + private Map piToProcessor; + + public GearpumpTopology(String name) { + super(name); + graph = Graph.empty(); + piToProcessor = new HashMap<>(); + } + + public Graph getGraph() { + buildGraph(); + return graph; + } + + private void buildGraph() { + Set processingItems = getProcessingItems(); + for (IProcessingItem procItem : processingItems) { + TopologyNode gearpumpNode = (TopologyNode) procItem; + Processor gearpumpProcessor = gearpumpNode.createGearpumpProcessor(); + piToProcessor.put(procItem, gearpumpProcessor); + graph.addVertex(gearpumpProcessor); + } + + Set streams = getStreams(); + Partitioner partitioner = new SamoaMessagePartitioner(); + for (Stream stream : streams) { + GearpumpStream gearpumpStream = (GearpumpStream) stream; + IProcessingItem sourcePi = gearpumpStream.getSourceProcessingItem(); + IProcessingItem targetPi = gearpumpStream.getTargetPi(); + Processor sourceProcessor = piToProcessor.get(sourcePi); + Processor targetProcessor = piToProcessor.get(targetPi); + graph.addEdge(sourceProcessor, partitioner, targetProcessor); + } + } +} diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/ProcessingItemTask.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/ProcessingItemTask.java new file mode 100644 index 00000000..8f2e0dd2 --- /dev/null +++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/ProcessingItemTask.java @@ -0,0 +1,77 @@ +package org.apache.samoa.topology.impl.gearpump; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.gearpump.Message; +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.streaming.task.StartTime; +import org.apache.gearpump.streaming.task.Task; +import org.apache.gearpump.streaming.task.TaskContext; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; + +import java.util.Set; + +public class ProcessingItemTask extends Task { + + private TaskContext taskContext; + private UserConfig userConfig; + private Processor processor; + private Set streams; + + public ProcessingItemTask(TaskContext taskContext, UserConfig userConf) { + super(taskContext, userConf); + this.taskContext = taskContext; + this.userConfig = userConf; + byte[] bytes = userConf.getBytes(Utils.piConf).get(); + GearpumpProcessingItem pi = (GearpumpProcessingItem) Utils.bytesToObject(bytes); + this.processor = pi.getProcessor(); + this.streams = pi.getStreams(); + } + + public void setProcessor(Processor processor) { + this.processor = processor; + } + + @Override + public void onStart(StartTime startTime) { + for (GearpumpStream stream : streams) { + stream.setTaskContext(this.taskContext); + } + + processor.onCreate(taskContext.taskId().index()); + } + + @Override + public void onNext(Message msg) { + GearpumpMessage message = (GearpumpMessage) msg.msg(); + String targetId = message.getTargetId(); + if (targetId.equals(this.processor.getClass().getSimpleName())) { + ContentEvent event = message.getEvent(); + processor.process(event); + } + } + + @Override + public void onStop() { + } +} diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/SamoaMessagePartitioner.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/SamoaMessagePartitioner.java new file mode 100644 index 00000000..b3ed655c --- /dev/null +++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/SamoaMessagePartitioner.java @@ -0,0 +1,60 @@ +package org.apache.samoa.topology.impl.gearpump; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2016 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.gearpump.Message; +import org.apache.gearpump.partitioner.BroadcastPartitioner; +import org.apache.gearpump.partitioner.HashPartitioner; +import org.apache.gearpump.partitioner.MulticastPartitioner; +import org.apache.gearpump.partitioner.ShufflePartitioner; + +public class SamoaMessagePartitioner implements MulticastPartitioner { + ShufflePartitioner shufflePartitioner = new ShufflePartitioner(); + BroadcastPartitioner broadcastPartitioner = new BroadcastPartitioner(); + HashPartitioner hashPartitioner = new HashPartitioner(); + + @Override + public int[] getPartitions(Message msg, int partitionNum, int currentPartitionId) { + GearpumpMessage message = (GearpumpMessage) msg.msg(); + int[] partitions = null; + switch (message.getScheme()) { + case SHUFFLE: + partitions = new int[]{ + shufflePartitioner.getPartition(msg, partitionNum, currentPartitionId) + }; + break; + case BROADCAST: + partitions = broadcastPartitioner.getPartitions(msg, partitionNum); + break; + case GROUP_BY_KEY: + partitions = new int[]{ + hashPartitioner.getPartition(msg, partitionNum, currentPartitionId) + }; + break; + } + return partitions; + } + + @Override + public int[] getPartitions(Message msg, int partitionNum) { + return this.getPartitions(msg, partitionNum, -1); + } +} diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/SerializableSerializer.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/SerializableSerializer.java new file mode 100644 index 00000000..4a14a2ae --- /dev/null +++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/SerializableSerializer.java @@ -0,0 +1,70 @@ +package org.apache.samoa.topology.impl.gearpump; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +/** + * Serialize and deserialize objects with Java serialization + * + * @author Anh Thu Vu + */ +public class SerializableSerializer extends Serializer { + @Override + public void write(Kryo kryo, Output output, Object object) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try { + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(object); + oos.flush(); + } catch (IOException e) { + throw new RuntimeException(e); + } + byte[] ser = bos.toByteArray(); + output.writeInt(ser.length); + output.writeBytes(ser); + } + + @SuppressWarnings("rawtypes") + @Override + public Object read(Kryo kryo, Input input, Class c) { + int len = input.readInt(); + byte[] ser = new byte[len]; + input.readBytes(ser); + ByteArrayInputStream bis = new ByteArrayInputStream(ser); + try { + ObjectInputStream ois = new ObjectInputStream(bis); + return ois.readObject(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/TopologyNode.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/TopologyNode.java new file mode 100644 index 00000000..e21bc8a9 --- /dev/null +++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/TopologyNode.java @@ -0,0 +1,31 @@ +package org.apache.samoa.topology.impl.gearpump; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.gearpump.streaming.Processor; + +public interface TopologyNode { + + GearpumpStream createStream(); + + Processor createGearpumpProcessor(); + +} diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/Utils.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/Utils.java new file mode 100644 index 00000000..613abc81 --- /dev/null +++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/Utils.java @@ -0,0 +1,102 @@ +package org.apache.samoa.topology.impl.gearpump; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.github.javacliparser.ClassOption; + +import org.apache.samoa.tasks.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +public class Utils { + + private static final Logger logger = LoggerFactory.getLogger(Utils.class); + public static String piConf = "processingItem"; + public static String entrancePiConf = "entranceProcessingItem"; + + public static byte[] objectToBytes(Object object) { + ObjectOutputStream oos; + ByteArrayOutputStream baos; + try { + baos = new ByteArrayOutputStream(); + oos = new ObjectOutputStream(baos); + oos.writeObject(object); + oos.flush(); + byte[] bytes = baos.toByteArray(); + baos.close(); + oos.close(); + return bytes; + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + public static Object bytesToObject(byte[] bytes) { + ByteArrayInputStream bais; + ObjectInputStream ois; + try { + bais = new ByteArrayInputStream(bytes); + ois = new ObjectInputStream(bais); + Object object = ois.readObject(); + bais.close(); + ois.close(); + return object; + } catch (Exception e) { + e.printStackTrace(); + } + return null; + } + + public static GearpumpTopology argsToTopology(String[] args) { + StringBuilder cliString = new StringBuilder(); + for (String arg : args) { + cliString.append(" ").append(arg); + } + logger.info("Command line string = {}", cliString.toString()); + + Task task = getTask(cliString.toString()); + + // TODO: remove setFactory method with DynamicBinding + task.setFactory(new ComponentFactory()); + task.init(); + + return (GearpumpTopology) task.getTopology(); + } + + public static Task getTask(String cliString) { + Task task = null; + try { + logger.debug("Providing task [{}]", cliString); + task = ClassOption.cliStringToObject(cliString, Task.class, null); + } catch (Exception e) { + logger.warn("Fail in initializing the task!"); + e.printStackTrace(); + } + return task; + } +} diff --git a/samoa-gearpump/src/main/resources/geardefault.conf b/samoa-gearpump/src/main/resources/geardefault.conf new file mode 100644 index 00000000..51aa20e3 --- /dev/null +++ b/samoa-gearpump/src/main/resources/geardefault.conf @@ -0,0 +1,68 @@ +gearpump { + + serializers { + ## Follow this format when adding new serializer for new message types + ## "org.apache.gearpump.Message" = "org.apache.gearpump.streaming.MessageSerializer" + "org.apache.samoa.topology.impl.gearpump.GearpumpMessage" = "" + "org.apache.samoa.utils.PartitioningScheme" = "" + "org.apache.samoa.core.ContentEvent" = "" + "org.apache.samoa.core.DoubleVector" = "" + "org.apache.samoa.moa.classifiers.core.AttributeSplitSuggestion" = "" + "org.apache.samoa.moa.classifiers.rules.core.attributeclassobservers.FIMTDDNumericAttributeClassLimitObserver" = "" + "org.apache.samoa.moa.classifiers.rules.core.conditionaltests.NumericAttributeBinaryRulePredicate" = "" + "org.apache.samoa.moa.classifiers.rules.driftdetection.PageHinkleyFading" = "" + "org.apache.samoa.learners.classifiers.rules.common.Perceptron$PerceptronData" = "" + "org.apache.samoa.learners.classifiers.rules.common.TargetMean$TargetMeanData" = "" + "org.apache.samoa.learners.classifiers.rules.common.TargetMean" = "" + "org.apache.samoa.learners.classifiers.rules.common.Perceptron" = "" + "org.apache.samoa.learners.classifiers.trees.AttributeContentEvent" = "" + "org.apache.samoa.learners.classifiers.trees.ComputeContentEvent" = "" + "org.apache.samoa.learners.InstanceContentEvent" = "" + "org.apache.samoa.learners.InstancesContentEvent" = "" + "org.apache.samoa.learners.clusterers.ClusteringContentEvent" = "" + "org.apache.samoa.learners.classifiers.trees.DeleteContentEvent" = "" + "org.apache.samoa.learners.classifiers.trees.LocalResultContentEvent" = "" + "org.apache.samoa.learners.classifiers.trees.AttributeBatchContentEvent" = "" + "org.apache.samoa.learners.classifiers.rules.distributed.RuleContentEvent" = "" + "org.apache.samoa.learners.classifiers.rules.distributed.PredicateContentEvent" = "" + "org.apache.samoa.learners.classifiers.rules.distributed.AssignmentContentEvent" = "" + "org.apache.samoa.learners.classifiers.rules.common.ActiveRule" = "" + "org.apache.samoa.learners.classifiers.rules.common.ActiveRule$Builder" = "" + "org.apache.samoa.learners.classifiers.rules.common.RuleSplitNode" = "" + "org.apache.samoa.learners.classifiers.rules.common.RuleActiveRegressionNode" = "" + "org.apache.samoa.learners.classifiers.rules.common.RulePassiveRegressionNode" = "" + "org.apache.samoa.learners.ResultContentEvent" = "" + "org.apache.samoa.evaluation.ClusteringResultContentEvent" = "" + "org.apache.samoa.evaluation.ClusteringEvaluationContentEvent" = "" + "org.apache.samoa.core.SerializableInstance" = "" + "org.apache.samoa.instances.SingleClassInstanceData" = "" + "org.apache.samoa.instances.DenseInstanceData" = "" + "org.apache.samoa.instances.SparseInstanceData" = "" + "org.apache.samoa.instances.InstancesHeader" = "" + "org.apache.samoa.instances.Instances" = "" + "org.apache.samoa.instances.InstanceInformation" = "" + "org.apache.samoa.moa.core.DataPoint" = "" + "org.apache.samoa.moa.core.FastVector" = "" + "org.apache.samoa.moa.core.AutoExpandVector" = "" + "org.apache.samoa.moa.core.DoubleVector" = "" + "org.apache.samoa.moa.cluster.Clustering" = "" + "org.apache.samoa.moa.cluster.SphereCluster" = "" + "org.apache.samoa.moa.clusterers.clustream.ClustreamKernel" = "" + "org.apache.samoa.instances.Attribute" = "" + "org.apache.samoa.moa.classifiers.core.conditionaltests.NumericAttributeBinaryTest" = "" + "org.apache.samoa.moa.classifiers.core.conditionaltests.NominalAttributeMultiwayTest" = "" + "org.apache.samoa.moa.classifiers.core.conditionaltests.NominalAttributeBinaryTest" = "" + "org.apache.samoa.moa.classifiers.core.AttributeSplitSuggestion" = "" + "org.apache.samoa.learners.classifiers.trees.ActiveLearningNode" = "" + "[Lorg.apache.samoa.learners.classifiers.trees.AttributeBatchContentEvent;" = "" + "com.github.javacliparser.IntOption" = "" + "com.github.javacliparser.FloatOption" = "" + "org.apache.samoa.learners.InstanceContent" = "" + "java.util.ArrayList" = "" + "java.util.LinkedList" = "" + "java.util.HashMap" = "" + "java.util.Arrays" = "" + "[[D" = "" + } + +}