Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fix classloading issue

- classes with dependency on the application classloader must be bound through the AppModule,
and relevant classes now include Listener implementations for instance
  • Loading branch information...
commit cc2c4e8886df7b69e861f9e7d32fdad7094fe25f 1 parent f9689ea
@matthieumorel matthieumorel authored
Showing with 177 additions and 112 deletions.
  1. +2 −1  subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java
  2. +4 −6 subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
  3. +4 −4 subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
  4. +3 −2 subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
  5. +0 −30 subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/CommMetrics.java
  6. +26 −0 subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java
  7. +3 −2 subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
  8. +3 −2 subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
  9. +18 −0 subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TCPTransportModule.java
  10. +18 −0 subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/UDPTransportModule.java
  11. +5 −0 subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
  12. +12 −0 subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
  13. +0 −8 subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
  14. +0 −3  subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
  15. +7 −5 subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
  16. +0 −5 subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
  17. +2 −1  subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
  18. +9 −7 subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
  19. +6 −3 subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
  20. +18 −19 subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
  21. +12 −4 subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
  22. +3 −9 subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
  23. +1 −1  subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
  24. +21 −0 test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
View
3  subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/simpleApp1/SimpleApp.java
@@ -13,6 +13,7 @@
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.yammer.metrics.reporting.ConsoleReporter;
+import com.yammer.metrics.reporting.CsvReporter;
public class SimpleApp extends App {
@@ -34,7 +35,7 @@ protected void onInit() {
throw new RuntimeException("Cannot create log dir " + logDirectory.getAbsolutePath());
}
}
- // CsvReporter.enable(logDirectory, 5, TimeUnit.SECONDS);
+ CsvReporter.enable(logDirectory, 10, TimeUnit.SECONDS);
ConsoleReporter.enable(10, TimeUnit.SECONDS);
SimplePE1 simplePE1 = createPE(SimplePE1.class, "simplePE1");
View
10 subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -27,7 +27,6 @@
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.s4.base.Emitter;
import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
import org.apache.s4.base.RemoteEmitter;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.KryoSerDeser;
@@ -100,6 +99,10 @@ protected void configure() {
bind(Clusters.class).to(ClustersFromZK.class);
+ bind(RemoteEmitters.class);
+
+ bind(DeserializerExecutorFactory.class).to(DefaultDeserializerExecutorFactory.class);
+
try {
Class<? extends Emitter> emitterClass = (Class<? extends Emitter>) Class.forName(config
.getString("s4.comm.emitter.class"));
@@ -112,14 +115,9 @@ protected void configure() {
RemoteEmitterFactory.class));
bind(RemoteEmitters.class);
- bind(Listener.class).to(
- (Class<? extends Listener>) Class.forName(config.getString("s4.comm.listener.class")));
-
- bind(DeserializerExecutorFactory.class).to(DefaultDeserializerExecutorFactory.class);
} catch (ClassNotFoundException e) {
logger.error("Cannot find class implementation ", e);
}
-
}
@SuppressWarnings("serial")
View
8 subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -32,8 +32,7 @@
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.ClusterChangeListener;
import org.apache.s4.comm.topology.ClusterNode;
-import org.apache.s4.comm.util.CommMetrics;
-import org.apache.s4.comm.util.CommMetrics.EmitterMetrics;
+import org.apache.s4.comm.util.EmitterMetrics;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
@@ -94,7 +93,7 @@
SerializerDeserializerFactory serDeserFactory;
SerializerDeserializer serDeser;
- CommMetrics.EmitterMetrics metrics;
+ EmitterMetrics metrics;
@Inject
public TCPEmitter(Cluster topology, @Named("s4.comm.timeout") int timeout) throws InterruptedException {
@@ -133,8 +132,8 @@ private void init() {
refreshCluster();
this.topology.addListener(this);
serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
-
metrics = new EmitterMetrics(topology);
+
}
private boolean connectTo(Integer partitionId) {
@@ -205,6 +204,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
});
}
+ @Override
public void close() {
try {
channels.close().await();
View
5 subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
@@ -18,6 +18,7 @@
package org.apache.s4.comm.udp;
+import org.apache.s4.base.RemoteEmitter;
import org.apache.s4.comm.topology.Cluster;
import com.google.inject.Inject;
@@ -25,9 +26,9 @@
/**
* UDP-based emitter for sending events to remote clusters.
- *
+ *
*/
-public class UDPRemoteEmitter extends UDPEmitter {
+public class UDPRemoteEmitter extends UDPEmitter implements RemoteEmitter {
/**
* Sends to remote subclusters. This is dynamically created, through an injected factory, when new subclusters are
View
30 subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/CommMetrics.java
@@ -1,30 +0,0 @@
-package org.apache.s4.comm.util;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.comm.topology.Cluster;
-
-import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.Meter;
-
-public class CommMetrics {
-
- public static class EmitterMetrics {
- private Meter[] emittersMeters;
-
- public EmitterMetrics(Cluster cluster) {
- emittersMeters = new Meter[cluster.getPhysicalCluster().getPartitionCount()];
- for (int i = 0; i < cluster.getPhysicalCluster().getPartitionCount(); i++) {
- emittersMeters[i] = Metrics
- .newMeter(TCPEmitter.class, "event-emitted@" + cluster.getPhysicalCluster().getName()
- + "@partition-" + i, "event-emitted", TimeUnit.SECONDS);
- }
- }
-
- public void sentMessage(int partitionId) {
- emittersMeters[partitionId].mark();
- }
- }
-
-}
View
26 subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java
@@ -0,0 +1,26 @@
+package org.apache.s4.comm.util;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.topology.Cluster;
+
+import com.google.inject.Inject;
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Meter;
+
+public class EmitterMetrics {
+ private final Meter[] emittersMeters;
+
+ public EmitterMetrics(Cluster cluster) {
+ emittersMeters = new Meter[cluster.getPhysicalCluster().getPartitionCount()];
+ for (int i = 0; i < cluster.getPhysicalCluster().getPartitionCount(); i++) {
+ emittersMeters[i] = Metrics.newMeter(TCPEmitter.class, "event-emitted@"
+ + cluster.getPhysicalCluster().getName() + "@partition-" + i, "event-emitted", TimeUnit.SECONDS);
+ }
+ }
+
+ public void sentMessage(int partitionId) {
+ emittersMeters[partitionId].mark();
+ }
+}
View
5 subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java
@@ -11,6 +11,7 @@
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.MockReceiverModule;
import org.apache.s4.fixtures.NoOpReceiverModule;
+import org.apache.s4.fixtures.TCPTransportModule;
import org.apache.s4.fixtures.ZkBasedTest;
import org.junit.Test;
@@ -26,12 +27,12 @@ public void testSingleMessage() {
try {
Injector injector1 = Guice
.createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
- .openStream(), "cluster1"), new NoOpReceiverModule());
+ .openStream(), "cluster1"), new TCPTransportModule(), new NoOpReceiverModule());
Emitter emitter = injector1.getInstance(Emitter.class);
Injector injector2 = Guice
.createInjector(new DefaultCommModule(Resources.getResource("default.s4.comm.properties")
- .openStream(), "cluster1"), new MockReceiverModule());
+ .openStream(), "cluster1"), new TCPTransportModule(), new MockReceiverModule());
// creating the listener will inject assignment (i.e. assign a partition) and receiver (delegatee for
// listener, here a mock which simply intercepts the message and notifies through a countdown latch)
injector2.getInstance(Listener.class);
View
5 subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java
@@ -11,6 +11,7 @@
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.MockReceiverModule;
import org.apache.s4.fixtures.NoOpReceiverModule;
+import org.apache.s4.fixtures.UDPTransportModule;
import org.apache.s4.fixtures.ZkBasedTest;
import org.junit.Test;
@@ -26,12 +27,12 @@ public void testSingleMessage() {
try {
Injector injector1 = Guice.createInjector(
new DefaultCommModule(Resources.getResource("udp.s4.comm.properties").openStream(), "cluster1"),
- new NoOpReceiverModule());
+ new UDPTransportModule(), new NoOpReceiverModule());
Emitter emitter = injector1.getInstance(Emitter.class);
Injector injector2 = Guice.createInjector(
new DefaultCommModule(Resources.getResource("udp.s4.comm.properties").openStream(), "cluster1"),
- new MockReceiverModule());
+ new UDPTransportModule(), new MockReceiverModule());
// creating the listener will inject assignment (i.e. assign a partition) and receiver (delegatee for
// listener)
injector2.getInstance(Listener.class);
View
18 subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TCPTransportModule.java
@@ -0,0 +1,18 @@
+package org.apache.s4.fixtures;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Listener;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.comm.tcp.TCPListener;
+
+import com.google.inject.AbstractModule;
+
+public class TCPTransportModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ bind(Listener.class).to(TCPListener.class);
+ bind(Emitter.class).to(TCPEmitter.class);
+ }
+
+}
View
18 subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/UDPTransportModule.java
@@ -0,0 +1,18 @@
+package org.apache.s4.fixtures;
+
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Listener;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+
+import com.google.inject.AbstractModule;
+
+public class UDPTransportModule extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ bind(Listener.class).to(UDPListener.class);
+ bind(Emitter.class).to(UDPEmitter.class);
+ }
+
+}
View
5 subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -32,6 +32,7 @@
import org.apache.s4.comm.topology.RemoteStreams;
import org.apache.s4.core.ft.CheckpointingFramework;
import org.apache.s4.core.staging.StreamExecutorServiceFactory;
+import org.apache.s4.core.util.S4Metrics;
import org.apache.s4.core.window.AbstractSlidingWindowPE;
import org.apache.s4.core.window.SlotFactory;
import org.slf4j.Logger;
@@ -86,6 +87,9 @@
@Inject
CheckpointingFramework checkpointingFramework;
+ @Inject
+ S4Metrics metrics;
+
// serialization uses the application class loader
@Inject
private SerializerDeserializerFactory serDeserFactory;
@@ -222,6 +226,7 @@ void addPEPrototype(ProcessingElement pePrototype, Stream<? extends Event> strea
// logger.info("Add PE prototype [{}] with stream [{}].", toString(pePrototype), toString(stream));
pePrototypes.add(pePrototype);
+ metrics.createCacheGauges(pePrototype, pePrototype.peInstances);
}
View
12 subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java
@@ -1,7 +1,12 @@
package org.apache.s4.core;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.Receiver;
+import org.apache.s4.base.Sender;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.tcp.TCPListener;
+import org.apache.s4.core.util.S4Metrics;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
@@ -21,6 +26,13 @@ public SerializerDeserializer provideSerializerDeserializer(SerializerDeserializ
@Override
protected void configure() {
+ bind(S4Metrics.class);
+
+ bind(Receiver.class).to(ReceiverImpl.class);
+ bind(Sender.class).to(SenderImpl.class);
+
+ // TODO allow pluggable transport implementation
+ bind(Listener.class).to(TCPListener.class);
}
View
8 subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -25,8 +25,6 @@
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Receiver;
-import org.apache.s4.base.Sender;
import org.apache.s4.base.util.S4RLoaderFactory;
import org.apache.s4.comm.DefaultHasher;
import org.apache.s4.core.ft.CheckpointingFramework;
@@ -37,7 +35,6 @@
import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
import org.apache.s4.core.staging.SenderExecutorServiceFactory;
import org.apache.s4.core.staging.StreamExecutorServiceFactory;
-import org.apache.s4.core.util.S4Metrics;
import org.apache.s4.deploy.DeploymentManager;
import org.apache.s4.deploy.DistributedDeploymentManager;
import org.slf4j.Logger;
@@ -80,9 +77,6 @@ protected void configure() {
/* The hashing function to map keys top partitions. */
bind(Hasher.class).to(DefaultHasher.class);
- bind(Receiver.class).to(ReceiverImpl.class);
- bind(Sender.class).to(SenderImpl.class);
-
bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
bind(S4RLoaderFactory.class);
@@ -91,8 +85,6 @@ protected void configure() {
// org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
- bind(S4Metrics.class);
-
bind(SenderExecutorServiceFactory.class).to(DefaultSenderExecutorServiceFactory.class);
bind(RemoteSendersExecutorServiceFactory.class).to(DefaultRemoteSendersExecutorServiceFactory.class);
View
3  subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -40,7 +40,6 @@
import org.apache.s4.core.ft.CheckpointingTask;
import org.apache.s4.core.gen.OverloadDispatcher;
import org.apache.s4.core.gen.OverloadDispatcherGenerator;
-import org.apache.s4.core.util.S4Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -172,8 +171,6 @@ public ProcessingElement load(String key) throws Exception {
*/
this.pePrototype = this;
- S4Metrics.createCacheGauges(this, peInstances);
-
processingTimer = Metrics.newTimer(getClass(), getClass().getName() + "-pe-processing-time");
}
View
12 subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
@@ -25,7 +25,6 @@
import org.apache.s4.base.Listener;
import org.apache.s4.base.Receiver;
import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
import org.apache.s4.core.util.S4Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,12 +54,15 @@
final private Listener listener;
final private SerializerDeserializer serDeser;
- private Map<Integer, Map<String, Stream<? extends Event>>> streams;
+ private final Map<Integer, Map<String, Stream<? extends Event>>> streams;
@Inject
- public ReceiverImpl(Listener listener, SerializerDeserializerFactory serDeserFactory) {
+ S4Metrics metrics;
+
+ @Inject
+ public ReceiverImpl(Listener listener, SerializerDeserializer serDeser) {
this.listener = listener;
- this.serDeser = serDeserFactory.createSerializerDeserializer(getClass().getClassLoader());
+ this.serDeser = serDeser;
streams = new MapMaker().makeMap();
}
@@ -94,7 +96,7 @@ void removeStream(Stream<? extends Event> stream) {
@Override
public void receive(ByteBuffer message) {
- S4Metrics.receivedEventFromCommLayer(message.array().length);
+ metrics.receivedEventFromCommLayer(message.array().length);
Event event = (Event) serDeser.deserialize(message);
String streamId = event.getStreamName();
View
5 subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -23,7 +23,6 @@
import org.apache.s4.base.Emitter;
import org.apache.s4.base.Hasher;
-import org.apache.s4.core.util.S4Metrics;
/**
* Sends events to a remote cluster.
@@ -42,8 +41,6 @@ public RemoteSender(Emitter emitter, Hasher hasher, String clusterName) {
this.hasher = hasher;
this.remoteClusterName = clusterName;
- S4Metrics.createRemoteStreamMeters(clusterName, emitter.getPartitionCount());
-
}
public void send(String hashKey, ByteBuffer message) {
@@ -55,7 +52,5 @@ public void send(String hashKey, ByteBuffer message) {
partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
}
emitter.send(partition, message);
- S4Metrics.sentEventToRemoteCluster(remoteClusterName, partition);
-
}
}
View
3  subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
@@ -58,7 +58,7 @@
ConcurrentMap<String, RemoteSender> sendersByTopology = new ConcurrentHashMap<String, RemoteSender>();
- private ExecutorService executorService;
+ private final ExecutorService executorService;
@Inject
public RemoteSenders(RemoteEmitters remoteEmitters, RemoteStreams remoteStreams, Clusters remoteClusters,
@@ -111,6 +111,7 @@ public SendToRemoteClusterTask(String hashKey, Event event, RemoteSender sender)
@Override
public void run() {
sender.send(hashKey, serDeser.serialize(event));
+
}
}
View
16 subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java
@@ -26,7 +26,6 @@
import org.apache.s4.base.Hasher;
import org.apache.s4.base.Sender;
import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.ClusterNode;
import org.apache.s4.core.staging.SenderExecutorServiceFactory;
@@ -55,7 +54,10 @@
Assignment assignment;
private int localPartitionId = -1;
- private ExecutorService tpe;
+ private final ExecutorService tpe;
+
+ @Inject
+ S4Metrics metrics;
/**
*
@@ -67,10 +69,10 @@
* a hashing function to map keys to partition IDs.
*/
@Inject
- public SenderImpl(Emitter emitter, SerializerDeserializerFactory serDeserFactory, Hasher hasher,
- Assignment assignment, SenderExecutorServiceFactory senderExecutorServiceFactory) {
+ public SenderImpl(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment,
+ SenderExecutorServiceFactory senderExecutorServiceFactory) {
this.emitter = emitter;
- this.serDeser = serDeserFactory.createSerializerDeserializer(getClass().getClassLoader());
+ this.serDeser = serDeser;
this.hasher = hasher;
this.assignment = assignment;
this.tpe = senderExecutorServiceFactory.create();
@@ -98,7 +100,7 @@ public boolean checkAndSendIfNotLocal(String hashKey, Event event) {
}
// TODO asynch
send(partition, serDeser.serialize(event));
- S4Metrics.sentEvent(partition);
+ metrics.sentEvent(partition);
return true;
}
@@ -154,7 +156,7 @@ public void run() {
/* Don't use the comm layer when we send to the same partition. */
if (localPartitionId != i) {
emitter.send(i, serializedEvent);
- S4Metrics.sentEvent(i);
+ metrics.sentEvent(i);
}
}
View
9 subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -29,7 +29,6 @@
import org.apache.s4.base.Sender;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
-import org.apache.s4.core.util.S4Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,8 +78,9 @@ public Stream(App app) {
this.receiver = app.getReceiver();
}
+ @Override
public void start() {
- S4Metrics.createStreamMeters(name);
+ app.metrics.createStreamMeters(getName());
if (logger.isTraceEnabled()) {
if (targetPEs != null) {
for (ProcessingElement pe : targetPEs) {
@@ -184,6 +184,7 @@ public void start() {
*
* @param event
*/
+ @Override
@SuppressWarnings("unchecked")
public void put(Event event) {
event.setStreamId(getName());
@@ -240,6 +241,7 @@ public void receiveEvent(Event event) {
/**
* @return the name
*/
+ @Override
public String getName() {
return name;
}
@@ -268,6 +270,7 @@ public App getApp() {
/**
* Stop and close this stream.
*/
+ @Override
public void close() {
}
@@ -329,7 +332,7 @@ public StreamEventProcessingTask(T event) {
@Override
public void run() {
- S4Metrics.dequeuedEvent(name);
+ app.metrics.dequeuedEvent(name);
/* Send event to each target PE. */
for (int i = 0; i < targetPEs.length; i++) {
View
37 subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
@@ -36,18 +36,18 @@
static List<Meter> partitionSenderMeters = Lists.newArrayList();
- private static Meter eventMeter = Metrics.newMeter(ReceiverImpl.class, "received-events", "event-count",
+ private final Meter eventMeter = Metrics.newMeter(ReceiverImpl.class, "received-events", "event-count",
TimeUnit.SECONDS);
- private static Meter bytesMeter = Metrics.newMeter(ReceiverImpl.class, "received-bytes", "bytes-count",
+ private final Meter bytesMeter = Metrics.newMeter(ReceiverImpl.class, "received-bytes", "bytes-count",
TimeUnit.SECONDS);
- private static Meter[] senderMeters;
+ private Meter[] senderMeters;
- private static Map<String, Meter> dequeuingStreamMeters = Maps.newHashMap();
- private static Map<String, Meter> droppedStreamMeters = Maps.newHashMap();
- private static Map<String, Meter> streamQueueFullMeters = Maps.newHashMap();
+ private final Map<String, Meter> dequeuingStreamMeters = Maps.newHashMap();
+ private final Map<String, Meter> droppedStreamMeters = Maps.newHashMap();
+ private final Map<String, Meter> streamQueueFullMeters = Maps.newHashMap();
- private static Map<String, Meter[]> remoteSenderMeters = Maps.newHashMap();
+ private final Map<String, Meter[]> remoteSenderMeters = Maps.newHashMap();
@Inject
private void init() {
@@ -58,24 +58,23 @@ private void init() {
}
}
- public static void createCacheGauges(ProcessingElement prototype,
- final LoadingCache<String, ProcessingElement> cache) {
+ public void createCacheGauges(ProcessingElement prototype, final LoadingCache<String, ProcessingElement> cache) {
- Metrics.newGauge(prototype.getClass(), "PE-cache-entries", new Gauge<Long>() {
+ Metrics.newGauge(prototype.getClass(), prototype.getClass().getName() + "-cache-entries", new Gauge<Long>() {
@Override
public Long value() {
return cache.size();
}
});
- Metrics.newGauge(prototype.getClass(), "PE-cache-evictions", new Gauge<Long>() {
+ Metrics.newGauge(prototype.getClass(), prototype.getClass().getName() + "-cache-evictions", new Gauge<Long>() {
@Override
public Long value() {
return cache.stats().evictionCount();
}
});
- Metrics.newGauge(prototype.getClass(), "PE-cache-misses", new Gauge<Long>() {
+ Metrics.newGauge(prototype.getClass(), prototype.getClass().getName() + "-cache-misses", new Gauge<Long>() {
@Override
public Long value() {
@@ -84,17 +83,17 @@ public Long value() {
});
}
- public static void receivedEventFromCommLayer(int bytes) {
+ public void receivedEventFromCommLayer(int bytes) {
eventMeter.mark();
bytesMeter.mark(bytes);
}
- public static void queueIsFull(String name) {
+ public void queueIsFull(String name) {
streamQueueFullMeters.get(name).mark();
}
- public static void sentEvent(int partition) {
+ public void sentEvent(int partition) {
try {
senderMeters[partition].mark();
} catch (NullPointerException e) {
@@ -104,7 +103,7 @@ public static void sentEvent(int partition) {
}
}
- public static void createStreamMeters(String name) {
+ public void createStreamMeters(String name) {
// TODO avoid maps to avoid map lookups?
dequeuingStreamMeters.put(name,
Metrics.newMeter(Stream.class, "dequeued@" + name, "dequeued", TimeUnit.SECONDS));
@@ -113,11 +112,11 @@ public static void createStreamMeters(String name) {
Metrics.newMeter(Stream.class, "stream-full@" + name, "stream-full", TimeUnit.SECONDS));
}
- public static void dequeuedEvent(String name) {
+ public void dequeuedEvent(String name) {
dequeuingStreamMeters.get(name).mark();
}
- public static void createRemoteStreamMeters(String remoteClusterName, int partitionCount) {
+ public void createRemoteStreamMeters(String remoteClusterName, int partitionCount) {
Meter[] meters = new Meter[partitionCount];
for (int i = 0; i < partitionCount; i++) {
meters[i] = Metrics.newMeter(RemoteSender.class, "remote-sender@" + remoteClusterName + "@partition-" + i,
@@ -129,7 +128,7 @@ public static void createRemoteStreamMeters(String remoteClusterName, int partit
}
- public static void sentEventToRemoteCluster(String remoteClusterName, int partition) {
+ public void sentEventToRemoteCluster(String remoteClusterName, int partition) {
remoteSenderMeters.get(remoteClusterName)[partition].mark();
}
View
16 subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
@@ -18,7 +18,9 @@
package org.apache.s4.fixtures;
+import org.apache.s4.base.Emitter;
import org.apache.s4.base.Hasher;
+import org.apache.s4.base.RemoteEmitter;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.DefaultHasher;
import org.apache.s4.comm.RemoteEmitterFactory;
@@ -26,8 +28,10 @@
import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
import org.apache.s4.comm.tcp.RemoteEmitters;
import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.ClusterNode;
import org.apache.s4.comm.topology.Clusters;
+import org.apache.s4.comm.topology.PhysicalCluster;
import org.apache.s4.comm.topology.RemoteStreams;
import org.apache.s4.core.RemoteSenders;
import org.mockito.Mockito;
@@ -57,13 +61,17 @@ protected void configure() {
bind(RemoteStreams.class).toInstance(Mockito.mock(RemoteStreams.class));
bind(RemoteSenders.class).toInstance(Mockito.mock(RemoteSenders.class));
bind(RemoteEmitters.class).toInstance(Mockito.mock(RemoteEmitters.class));
- bind(RemoteEmitterFactory.class).toInstance(Mockito.mock(RemoteEmitterFactory.class));
bind(Clusters.class).toInstance(Mockito.mock(Clusters.class));
-
+ Cluster mockedCluster = Mockito.mock(Cluster.class);
+ Mockito.when(mockedCluster.getPhysicalCluster()).thenReturn(new PhysicalCluster(1));
+ bind(Cluster.class).toInstance(mockedCluster);
Assignment mockedAssignment = Mockito.mock(Assignment.class);
Mockito.when(mockedAssignment.assignClusterNode()).thenReturn(new ClusterNode(0, 0, "machine", "Task-0"));
bind(Assignment.class).toInstance(mockedAssignment);
- Names.bindProperties(binder(), ImmutableMap.of("s4.cluster.name", "testCluster"));
- }
+ Names.bindProperties(binder(), ImmutableMap.of("s4.cluster.name", "testCluster", "s4.comm.timeout", "10000"));
+ bind(Emitter.class).toInstance(Mockito.mock(Emitter.class));
+ install(new FactoryModuleBuilder().implement(RemoteEmitter.class, Mockito.mock(RemoteEmitter.class).getClass())
+ .build(RemoteEmitterFactory.class));
+ }
}
View
12 subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
@@ -18,17 +18,14 @@
package org.apache.s4.fixtures;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.Sender;
-import org.apache.s4.core.ReceiverImpl;
+import org.apache.s4.comm.DefaultDeserializerExecutorFactory;
+import org.apache.s4.comm.DeserializerExecutorFactory;
import org.apache.s4.core.staging.DefaultSenderExecutorServiceFactory;
import org.apache.s4.core.staging.DefaultStreamProcessingExecutorServiceFactory;
import org.apache.s4.core.staging.SenderExecutorServiceFactory;
import org.apache.s4.core.staging.StreamExecutorServiceFactory;
import org.apache.s4.deploy.DeploymentManager;
import org.apache.s4.deploy.NoOpDeploymentManager;
-import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,16 +47,13 @@ public MockCoreModule() {
@Override
protected void configure() {
bind(DeploymentManager.class).to(NoOpDeploymentManager.class);
- bind(Emitter.class).toInstance(Mockito.mock(Emitter.class));
- bind(Listener.class).toInstance(Mockito.mock(Listener.class));
- bind(ReceiverImpl.class).toInstance(Mockito.mock(ReceiverImpl.class));
- bind(Sender.class).toInstance(Mockito.mock(Sender.class));
// Although we want to mock as much as possible, most tests still require the machinery for routing events
// within a stream/node, therefore sender and stream executors are not mocked
bind(StreamExecutorServiceFactory.class).to(DefaultStreamProcessingExecutorServiceFactory.class);
bind(SenderExecutorServiceFactory.class).to(DefaultSenderExecutorServiceFactory.class);
+ bind(DeserializerExecutorFactory.class).to(DefaultDeserializerExecutorFactory.class);
bind(Integer.class).annotatedWith(Names.named("s4.sender.parallelism")).toInstance(8);
bind(Integer.class).annotatedWith(Names.named("s4.sender.workQueueSize")).toInstance(10000);
View
2  subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java
@@ -131,6 +131,6 @@ public static void main(String[] args) {
e.printStackTrace();
}
myApp.close();
- receiver.close();
+ // receiver.close();
}
}
View
21 test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
@@ -18,16 +18,21 @@
package org.apache.s4.example.twitter;
+import java.io.File;
+import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
import org.apache.s4.base.KeyFinder;
import org.apache.s4.core.App;
import org.apache.s4.core.Stream;
import org.apache.s4.core.ft.CheckpointingConfig;
import org.apache.s4.core.ft.CheckpointingConfig.CheckpointingMode;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableList;
+import com.yammer.metrics.reporting.CsvReporter;
public class TwitterCounterApp extends App {
@@ -39,6 +44,9 @@ protected void onClose() {
protected void onInit() {
try {
+ // uncomment the following in order to get metrics outputs in .csv files
+ // prepareMetricsOutputs();
+
TopNTopicPE topNTopicPE = createPE(TopNTopicPE.class);
topNTopicPE.setTimerInterval(10, TimeUnit.SECONDS);
// we checkpoint this PE every 20s
@@ -77,6 +85,19 @@ protected void onInit() {
}
}
+ private void prepareMetricsOutputs() throws IOException {
+ File metricsDirForPartition = new File("metrics/" + getReceiver().getPartitionId());
+ if (metricsDirForPartition.exists()) {
+ FileUtils.deleteDirectory(metricsDirForPartition);
+ }
+ // activate metrics csv dump
+ if (!metricsDirForPartition.mkdirs()) {
+ LoggerFactory.getLogger(getClass()).error("Cannot create directory {}",
+ new File("metrics").getAbsolutePath());
+ }
+ CsvReporter.enable(metricsDirForPartition, 10, TimeUnit.SECONDS);
+ }
+
@Override
protected void onStart() {
Please sign in to comment.
Something went wrong with that request. Please try again.