Permalink
Browse files

S4-103 and S4-76 Fix stream ids and remove app ids

  • Loading branch information...
1 parent 755ed6b commit cd8f28a08b519b0b9c8ecf6aa194535ca519b726 @matthieumorel matthieumorel committed Mar 1, 2013
View
33 subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
@@ -38,8 +38,7 @@
private static final Logger logger = LoggerFactory.getLogger(Event.class);
final private long time;
- private String streamName;
- private int appId;
+ private String streamId;
private Map<String, Data<?>> map;
/** Default constructor sets time using system time. */
@@ -67,8 +66,8 @@ public long getTime() {
*
* @return the target stream id
*/
- public String getStreamName() {
- return streamName;
+ public String getStreamId() {
+ return streamId;
}
/**
@@ -78,31 +77,7 @@ public String getStreamName() {
* mode.
*/
public void setStreamId(String streamName) {
- this.streamName = streamName;
- }
-
- /**
- * All events must be assigned the unique App ID of the App that owns the stream to which this event is injected.
- * The assignment must be done automatically by the stream that receives the event. Each application has a unique
- * ID. We use the app ID in combination with the stream ID to identify stream instances in a cluster.
- *
- *
- * @return the unique application ID.
- */
- public int getAppId() {
- return appId;
- }
-
- /**
- * All events must be assigned the unique App ID of the App that owns the stream to which this event is injected.
- * The assignment must be done automatically by the stream that receives the event. Each application has a unique
- * ID. We use the app ID in combination with the stream ID to identify stream instances in a cluster.
- *
- * @param appId
- * a unique application identifier, typically assigned by the deployment system.
- */
- public void setAppId(int appId) {
- this.appId = appId;
+ this.streamId = streamName;
}
/**
View
18 subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
@@ -17,17 +17,27 @@
public interface RemoteStreams {
+ /**
+ * Lists consumers of a given stream
+ */
public abstract Set<StreamConsumer> getConsumers(String streamName);
- public abstract void addOutputStream(String appId, String clusterName, String streamName);
+ /**
+ * Publishes availability of an output stream
+ *
+ * @param clusterName
+ * originating cluster
+ * @param streamName
+ * name of stream
+ */
+ public abstract void addOutputStream(String clusterName, String streamName);
/**
- * Publishes interest in a stream from an application.
+ * Publishes interest in a stream, by a given cluster
*
- * @param appId
* @param clusterName
* @param streamName
*/
- public abstract void addInputStream(int appId, String clusterName, String streamName);
+ public abstract void addInputStream(String clusterName, String streamName);
}
View
11 subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
@@ -25,19 +25,13 @@
*/
public class StreamConsumer {
- int appId;
String clusterName;
- public StreamConsumer(int appId, String clusterName) {
+ public StreamConsumer(String clusterName) {
super();
- this.appId = appId;
this.clusterName = clusterName;
}
- public int getAppId() {
- return appId;
- }
-
public String getClusterName() {
return clusterName;
}
@@ -46,7 +40,6 @@ public String getClusterName() {
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + appId;
result = prime * result + ((clusterName == null) ? 0 : clusterName.hashCode());
return result;
}
@@ -60,8 +53,6 @@ public boolean equals(Object obj) {
if (getClass() != obj.getClass())
return false;
StreamConsumer other = (StreamConsumer) obj;
- if (appId != other.appId)
- return false;
if (clusterName == null) {
if (other.clusterName != null)
return false;
View
23 subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
@@ -164,8 +164,7 @@ private void update(String streamName, StreamType type) {
for (String element : elements) {
ZNRecord producerData = zkClient.readData(type.getPath(streamName) + "/" + element, true);
if (producerData != null) {
- StreamConsumer consumer = new StreamConsumer(Integer.valueOf(producerData.getSimpleField("appId")),
- producerData.getSimpleField("clusterName"));
+ StreamConsumer consumer = new StreamConsumer(producerData.getSimpleField("clusterName"));
consumers.add(consumer);
}
}
@@ -179,20 +178,18 @@ private void update(String streamName, StreamType type) {
* java.lang.String)
*/
@Override
- public void addOutputStream(String appId, String clusterName, String streamName) {
+ public void addOutputStream(String clusterName, String streamName) {
lock.lock();
try {
- logger.debug("Adding output stream [{}] for app [{}] in cluster [{}]", new String[] { streamName, appId,
- clusterName });
+ logger.debug("Adding output stream [{}] in cluster [{}]", new String[] { streamName, clusterName });
createStreamPaths(streamName);
- ZNRecord producer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
- producer.putSimpleField("appId", appId);
+ ZNRecord producer = new ZNRecord(streamName + "/" + clusterName);
producer.putSimpleField("clusterName", clusterName);
try {
zkClient.createEphemeralSequential(StreamType.PRODUCER.getPath(streamName) + "/producer-", producer);
} catch (Throwable e) {
logger.error("Exception trying to create producer stream [{}] for app [{}] and cluster [{}] : [{}] :",
- new String[] { streamName, appId, clusterName, e.getMessage() });
+ new String[] { streamName, clusterName, e.getMessage() });
}
refreshStreams();
} finally {
@@ -214,21 +211,19 @@ private void createStreamPaths(String streamName) {
* @see org.apache.s4.comm.topology.RemoteStreams#addInputStream(int, java.lang.String, java.lang.String)
*/
@Override
- public void addInputStream(int appId, String clusterName, String streamName) {
+ public void addInputStream(String clusterName, String streamName) {
lock.lock();
try {
- logger.debug("Adding input stream [{}] for app [{}] in cluster [{}]",
- new String[] { streamName, String.valueOf(appId), clusterName });
+ logger.debug("Adding input stream [{}] in cluster [{}]", new String[] { streamName, clusterName });
createStreamPaths(streamName);
- ZNRecord consumer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
- consumer.putSimpleField("appId", String.valueOf(appId));
+ ZNRecord consumer = new ZNRecord(streamName + "/" + clusterName);
consumer.putSimpleField("clusterName", clusterName);
try {
// NOTE: We create 1 sequential znode per consumer node instance
zkClient.createEphemeralSequential(StreamType.CONSUMER.getPath(streamName) + "/consumer-", consumer);
} catch (Throwable e) {
logger.error("Exception trying to create consumer stream [{}] for app [{}] and cluster [{}] : [{}] :",
- new String[] { streamName, String.valueOf(appId), clusterName, e.getMessage() });
+ new String[] { streamName, clusterName, e.getMessage() });
}
refreshStreams();
} finally {
View
18 subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -63,7 +63,6 @@
Map<String, ProcessingElement> peByName = Maps.newHashMap();
private ClockType clockType = ClockType.WALL_CLOCK;
- private int id = -1;
@Inject
private Sender sender;
@@ -112,21 +111,6 @@ private void initSerDeser() {
WALL_CLOCK, EVENT_CLOCK
};
- /**
- * @return the unique app id
- */
- public int getId() {
- return id;
- }
-
- /**
- * @param id
- * the unique id for this app
- */
- public void setId(int id) {
- this.id = id;
- }
-
/* Should only be used within the core package. */
void addPEPrototype(ProcessingElement pePrototype) {
pePrototypes.add(pePrototype);
@@ -379,7 +363,7 @@ public StreamExecutorServiceFactory getStreamExecutorFactory() {
*/
protected <T extends Event> Stream<T> createInputStream(String streamName, KeyFinder<T> finder,
ProcessingElement... processingElements) {
- remoteStreams.addInputStream(getId(), clusterName, streamName);
+ remoteStreams.addInputStream(clusterName, streamName);
return createStream(streamName, finder, processingElements);
}
View
3 subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
@@ -76,8 +76,7 @@ public DefaultRemoteSenders(RemoteEmitters remoteEmitters, RemoteStreams remoteS
@Override
public void send(String hashKey, Event event) {
- Set<StreamConsumer> consumers = remoteStreams.getConsumers(event.getStreamName());
- event.setAppId(-1);
+ Set<StreamConsumer> consumers = remoteStreams.getConsumers(event.getStreamId());
for (StreamConsumer consumer : consumers) {
// NOTE: even though there might be several ephemeral znodes for the same app and topology, they are
// represented by a single stream consumer
View
27 subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
@@ -54,7 +54,7 @@
final private Listener listener;
final private SerializerDeserializer serDeser;
- private final Map<Integer, Map<String, Stream<? extends Event>>> streams;
+ private final Map<String, Stream<? extends Event>> streams;
@Inject
S4Metrics metrics;
@@ -74,40 +74,27 @@ public int getPartitionId() {
/** Save stream keyed by app id and stream id. */
void addStream(Stream<? extends Event> stream) {
- int appId = stream.getApp().getId();
- Map<String, Stream<? extends Event>> appMap = streams.get(appId);
- if (appMap == null) {
- appMap = new MapMaker().makeMap();
- streams.put(appId, appMap);
- }
- appMap.put(stream.getName(), stream);
+ streams.put(stream.getName(), stream);
}
/** Remove stream when it is no longer needed. */
void removeStream(Stream<? extends Event> stream) {
- int appId = stream.getApp().getId();
- Map<String, Stream<? extends Event>> appMap = streams.get(appId);
- if (appMap == null) {
- logger.error("Tried to remove a stream that is not registered in the receiver.");
- return;
- }
- appMap.remove(stream.getName());
+ streams.remove(stream.getName());
}
@Override
public void receive(ByteBuffer message) {
metrics.receivedEventFromCommLayer(message.array().length);
Event event = (Event) serDeser.deserialize(message);
- String streamId = event.getStreamName();
+ String streamId = event.getStreamId();
/*
- * Match appId and streamId in event to the target stream and pass the event to the target stream. TODO: make
- * this more efficient for the case in which we send the same event to multiple PEs.
+ * Match streamId in event to the target stream and pass the event to the target stream. TODO: make this more
+ * efficient for the case in which we send the same event to multiple PEs.
*/
try {
- Map<String, Stream<? extends Event>> map = streams.get(-1);
- map.get(streamId).receiveEvent(event);
+ streams.get(streamId).receiveEvent(event);
} catch (NullPointerException e) {
logger.error("Could not find target stream for event with streamId={}", streamId);
}
View
3 subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java
@@ -55,14 +55,13 @@ public RemoteStream(App app, String name, KeyFinder<Event> finder, RemoteSenders
} else {
this.key = new Key<Event>(finder, DEFAULT_SEPARATOR);
}
- remoteStreams.addOutputStream(String.valueOf(app.getId()), clusterName, name);
+ remoteStreams.addOutputStream(clusterName, name);
}
@Override
public void put(Event event) {
event.setStreamId(getName());
- event.setAppId(app.getId());
if (key != null) {
remoteSenders.send(key.get(event), event);
View
1 subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -163,7 +163,6 @@ public void start() {
@SuppressWarnings("unchecked")
public void put(Event event) {
event.setStreamId(getName());
- event.setAppId(app.getId());
/*
* Events may be sent to local or remote partitions or both. The following code implements the logic.

0 comments on commit cd8f28a

Please sign in to comment.