Permalink
Browse files

S4-91 Minor cleanups

  • Loading branch information...
Daniel Gómez Ferro
Daniel Gómez Ferro committed Mar 11, 2013
1 parent 6aaa553 commit be95df13ba4ce8459663cd4c6b76f5fa0df8abac
@@ -18,7 +18,6 @@
* @return true if the event was sent because the destination is <b>not</b> local.
*
*/
- // boolean checkAndSendIfNotLocal(String hashKey, Event event);
boolean checkAndSendIfNotLocal(int partition, Event event);
/**
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.s4.comm.topology;
import java.util.ArrayList;
@@ -60,7 +78,7 @@ public void setExclusive(boolean isExclusive) {
this.isExclusive = isExclusive;
}
- public int getGlobalePartitionId(int partitionId) {
+ public int getGlobalPartitionId(int partitionId) {
return globalPartitionMap.get(partitionId);
}
@@ -79,15 +79,12 @@
@Inject
private Hasher hasher;
+ @Inject
private ZkClient zkClient;
@Inject
private RemoteStreams remoteStreams;
- public void setZkClient(ZkClient zkClient) {
- this.zkClient = zkClient;
- }
-
@Inject
private Cluster topology;
@@ -241,7 +238,7 @@ void validatePartition(Cluster topology) {
public void schedule() {
schedule(topology);
- writeToZK();
+ writePartitionDataToZK();
}
/**
@@ -250,7 +247,7 @@ public void schedule() {
*
* @param pes
*/
- private void writeToZK() {
+ private void writePartitionDataToZK() {
List<String> streamsOfNEPE = new ArrayList<String>();
ProcessingElement NEPeInstance = null;
for (int i = 0; i < getPePrototypes().size(); i++) {
@@ -180,19 +180,19 @@ public ProcessingElement load(String key) throws Exception {
}
- public void addGlobalPartitionId(int partitionId, int nodeId) {
+ void addGlobalPartitionId(int partitionId, int nodeId) {
partitionData.addPartitionMappingInfo(partitionId, nodeId);
}
- public int getGlobalPartitionId(int partitionId) {
- return partitionData.getGlobalePartitionId(partitionId);
+ int getGlobalPartitionId(int partitionId) {
+ return partitionData.getGlobalPartitionId(partitionId);
}
- public void addInputStream(String stream) {
+ void addInputStream(String stream) {
partitionData.addStream(stream);
}
- public List<String> getInputStreams() {
+ List<String> getInputStreams() {
return partitionData.getStreams();
}
@@ -510,29 +510,38 @@ public void checkpoint() {
clearDirty();
}
+ /**
+ * @return whether this PE runs exclusively on some partitions
+ */
public boolean isExclusive() {
return partitionData.isExclusive();
}
/**
- * If set a PE to be exclusive, user need give the partition count of this PE
+ * Forces this PE to run on the given number of partitions exclusively, i.e., no other PE will share
+ * the same partition.
*
- * @param isExclusive
- * @param partitionCount
+ * @param partitionCount Number of partitions to run exclusively on
*/
public void setExclusive(int partitionCount) {
this.partitionData.setExclusive(true);
this.partitionData.setPartitionCount(partitionCount);
}
- public void setPartitionCount(int partitionCount) {
+ void setPartitionCount(int partitionCount) {
this.partitionData.setPartitionCount(partitionCount);
for (int i = 0; i < partitionCount; i++) {
this.partitionData.addPartitionMappingInfo(i, i);
}
}
+ /**
+ * Returns the number of partitions this PE runs on. It can be different than the total number of
+ * partitions for the application if some PE (this one or others) run exclusively on some partitions.
+ *
+ * @return Number of partitions this PE runs on
+ */
public int getPartitionCount() {
return partitionData.getPartitionCount();
}
@@ -60,16 +60,14 @@ public void send(String hashKey, ByteBuffer message) throws InterruptedException
Set<Integer> partitions = new HashSet<Integer>();
- logger.warn("Remote sending with hash: " + hashKey);
int hashValue = (hashKey == null) ? targetPartition.incrementAndGet() : (int) hasher.hash(hashKey);
for (String prototype : partitionDatas.keySet()) {
PartitionData data = partitionDatas.get(prototype);
- partitions.add(data.getGlobalePartitionId(hashValue % data.getPartitionCount()));
+ partitions.add(data.getGlobalPartitionId(hashValue % data.getPartitionCount()));
}
for (Integer partition : partitions) {
- logger.warn("Remote sending to partition: " + partition);
emitter.send(partition, message);
}
}
@@ -134,7 +134,6 @@ private void startS4App(AppConfig appConfig, Injector parentInjector, ClassLoade
// use correct classLoader for running the app initialization
Thread.currentThread().setContextClassLoader(app.getClass().getClassLoader());
- app.setZkClient(zkClient);
app.init();
app.schedule();
app.start();
@@ -93,8 +93,6 @@ private void resolveLocalPartitionId() {
*/
@Override
public boolean checkAndSendIfNotLocal(int partition, Event event) {
- // public boolean checkAndSendIfNotLocal(String hashKey, Event event) {
- // int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
if (partition == localPartitionId) {
metrics.sentLocal();
/* Hey we are in the same JVM, don't use the network. */
@@ -1,11 +1,31 @@
-package org.apache.s4.wordcount;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.core.ri;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.s4.core.App;
import org.apache.s4.core.ProcessingElement;
import org.apache.s4.core.Stream;
+import org.apache.s4.wordcount.WordCountEvent;
+import org.apache.s4.wordcount.WordSeenEvent;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.s4.core.ri;
import org.apache.s4.base.Event;
@@ -1,13 +1,27 @@
-package org.apache.s4.core.ri;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
-import static org.apache.s4.core.ri.RuntimeIsolationTest.counterNumber;
+package org.apache.s4.core.ri;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import org.apache.s4.comm.tools.TaskSetup;
import org.apache.s4.comm.topology.ZkClient;
@@ -16,31 +30,19 @@
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.CoreTestUtils;
import org.apache.s4.wordcount.WordCountModule;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableList;
+/**
+ * Resue {@link RuntimeIsolationTest} but using an external adapter instead of injecting the events directly.
+ *
+ * Test case is {@link RuntimeIsolationTest.testSimple}
+ *
+ */
public class RemoteStreamRITest extends RuntimeIsolationTest {
- private static Logger logger = LoggerFactory.getLogger(RemoteStreamRITest.class);
-
- @Override
- public void injectData() throws InterruptedException, IOException {
- // Use remote stream
-
- }
-
@Override
public void startNodes() throws IOException, InterruptedException {
- final ZooKeeper zk = CommTestUtils.createZkClient();
-
TaskSetup taskSetup = new TaskSetup("localhost:" + CommTestUtils.ZK_PORT);
taskSetup.setup("cluster2", 1, 1500);
@@ -62,58 +64,15 @@ public void startNodes() throws IOException, InterruptedException {
"localhost:2181"), 10, "cluster2", 1)));
s4nodes = nodes.toArray(new Process[] {});
-
- CountDownLatch signalTextProcessed = new CountDownLatch(1);
- try {
- CommTestUtils.watchAndSignalCreation("/results", signalTextProcessed, zk);
- // add authorizations for processing
- for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
- zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- }
-
-// injectData();
-
- Assert.assertTrue(signalTextProcessed.await(30, TimeUnit.SECONDS));
- String results = new String(zk.getData("/results", false, null));
- Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", results);
-
- List<String> counterInstances = zk.getChildren("/counters", false);
-
- int totalCount = 0;
- int activeInstances = 0;
- for (String instance : counterInstances) {
- int count = Integer.parseInt(new String(zk.getData("/counters/" + instance, false, null)));
- if (count != 0) {
- activeInstances++;
- }
- totalCount += count;
- }
- Assert.assertEquals(numberTasks, counterInstances.size());
- Assert.assertEquals(counterNumber, activeInstances);
-
- Assert.assertEquals(13, totalCount);
- } catch (KeeperException e) {
- e.printStackTrace();
- }
-
}
-
+
@Override
- @Test
- public void testSimple() {
- ZooKeeper zk;
- try {
- zk = CommTestUtils.createZkClient();
- zk.create("/counters", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- startNodes();
-
- } catch (IOException e) {
- logger.error("", e);
- } catch (KeeperException e) {
- logger.error("", e);
- } catch (InterruptedException e) {
- logger.error("", e);
- }
+ public void createEmitter() throws IOException {
+ // No need for an emitter, we use an adapter
+ }
+ @Override
+ public void injectData() throws InterruptedException, IOException {
+ // No nedd for data injection, we use an adapter
}
}
@@ -31,7 +31,6 @@
import org.apache.s4.deploy.DeploymentUtils;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.CoreTestUtils;
-import org.apache.s4.wordcount.IsolationWordCounterPE;
import org.apache.s4.wordcount.SentenceKeyFinder;
import org.apache.s4.wordcount.WordClassifierPE;
import org.apache.s4.wordcount.WordCountEvent;
@@ -32,10 +32,13 @@
import org.apache.s4.comm.topology.ClusterNode;
import org.apache.s4.comm.topology.Clusters;
import org.apache.s4.comm.topology.PhysicalCluster;
+import org.apache.s4.comm.topology.ZkClient;
+import org.apache.s4.core.ZkClientProvider;
import org.mockito.Mockito;
import com.google.common.collect.ImmutableMap;
import com.google.inject.AbstractModule;
+import com.google.inject.Scopes;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.name.Names;
@@ -68,6 +71,7 @@ protected void configure() {
bind(Emitter.class).toInstance(Mockito.mock(Emitter.class));
install(new FactoryModuleBuilder().implement(RemoteEmitter.class, Mockito.mock(RemoteEmitter.class).getClass())
.build(RemoteEmitterFactory.class));
+ bind(ZkClient.class).toInstance(Mockito.mock(ZkClient.class));
}
}

0 comments on commit be95df1

Please sign in to comment.