Permalink
Browse files

first version of serial broad caster cache

  • Loading branch information...
1 parent 94458b1 commit cdd8515964b1d9528c1947cfbbdf225aeee0f75b @haed committed Jan 31, 2012
@@ -1,6 +1,5 @@
package haed.notification;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletRequest;
@@ -41,8 +40,8 @@
public Response createChannel()
throws Exception {
- final String channelID = UUID.randomUUID().toString();
-// final String channelID = "1";
+// final String channelID = UUID.randomUUID().toString();
+ final String channelID = "1";
final NotificationMgr notificationMgr = NotificationMgr.getInstance();
notificationMgr.getBroadcaster(channelID, true);
@@ -60,6 +59,7 @@ public Response createChannel()
@Produces(MediaType.TEXT_PLAIN + ";charset=utf-8")
public SuspendResponse<String> openChannel(
final @Context HttpServletRequest request,
+ final @Context HttpServletResponse response,
final @QueryParam("channelID") String channelID,
final @QueryParam("outputComments") @DefaultValue("false") Boolean outputComments)
throws Exception {
@@ -104,8 +104,33 @@ public Response createChannel()
}
- // always track current serial at request
- request.setAttribute(SerialBroadcasterCache.HEADER, serial);
+
+
+ if (serial != null) {
+
+ // always track current serial at request
+ request.setAttribute(SerialBroadcasterCache.HEADER, serial);
+
+ // track X-Cache-Serial
+ response.setHeader(SerialBroadcasterCache.HEADER, "" + serial);
+
+// final HttpServletResponse response = event.getResource().getResponse();
+// if (event.getMessage() == null) {
+//
+// // no message in, simply return same serial
+// if (response.containsHeader(SerialBroadcasterCache.HEADER) == false)
+// response.setHeader(SerialBroadcasterCache.HEADER, "" + _serial.longValue());
+//
+// } else {
+//
+// // we send a message, check if a serial header is set
+// if (response.containsHeader(SerialBroadcasterCache.HEADER) == false) {
+//
+// // no header sent, set explicitly
+// response.setHeader(SerialBroadcasterCache.HEADER, "" + (_serial.longValue() + 1));
+// }
+// }
+ }
final Long _serial = serial;
@@ -133,17 +158,26 @@ public void onResume(final AtmosphereResourceEvent<HttpServletRequest, HttpServl
if (logger.isDebugEnabled())
logger.debug("channel with id '" + channelID + "' resumed");
- // set serial header (only in a serial cache environment)
- if (_serial != -1 && event.getMessage() != null) {
-
- // we send a message, check if a serial header is set
- final HttpServletResponse response = event.getResource().getResponse();
- if (response.containsHeader(SerialBroadcasterCache.HEADER) == false) {
-
- // no header sent, set explicitly
- response.setHeader(SerialBroadcasterCache.HEADER, "" + (_serial.longValue() + 1));
- }
- }
+// // set serial header (only in a serial cache environment)
+// if (_serial != -1) {
+//
+// final HttpServletResponse response = event.getResource().getResponse();
+// if (event.getMessage() == null) {
+//
+// // no message in, simply return same serial
+// if (response.containsHeader(SerialBroadcasterCache.HEADER) == false)
+// response.setHeader(SerialBroadcasterCache.HEADER, "" + _serial.longValue());
+//
+// } else {
+//
+// // we send a message, check if a serial header is set
+// if (response.containsHeader(SerialBroadcasterCache.HEADER) == false) {
+//
+// // no header sent, set explicitly
+// response.setHeader(SerialBroadcasterCache.HEADER, "" + (_serial.longValue() + 1));
+// }
+// }
+// }
}
public void onDisconnect(final AtmosphereResourceEvent<HttpServletRequest, HttpServletResponse> event) {
@@ -177,7 +177,7 @@ public void send(final Object message) {
if (broadcasterCache instanceof SerialBroadcasterCache)
// always cache if its a serial broad caster cache instance
- broadcasterCache.addToCache(null, message);
+ ((SerialBroadcasterCache) broadcasterCache)._addToCache(message);
// if (send || resources.isEmpty())
@@ -78,7 +78,6 @@ static Long parseSerial(final HttpServletRequest r) {
CachedMessage head = new CachedMessage(null, 0L, null);
CachedMessage tail = head;
- // TODO: clean up strategy
final Map<Long, CachedMessage> cache = Collections.synchronizedMap(new HashMap<Long, CachedMessage>());
@Override
@@ -122,34 +121,23 @@ CachedMessage _addToCache(final Object e) {
@Override
public void addToCache(final AtmosphereResource<HttpServletRequest, HttpServletResponse> r, final Object e) {
-// if (e instanceof List) {
-// throw new RuntimeException("???");
-// }
-
-
synchronized (serial) {
- _addToCache(e);
-
-// if (r == null) {
-// _addToCache(e);
-// } else {
-//
-// Long s = parseSerial(r.getRequest());
-// if (s == null) {
-//
-// if (logger.isDebugEnabled())
-// logger.fatal("no serial found");
-//
-// final CachedMessage cm = _addToCache(e);
-// r.getResponse().setHeader(HEADER, "" + cm.serial);
-//
-// } else {
+ if (r != null) {
+ final Long s = parseSerial(r.getRequest());
+ if (s == null)
+ r.getResponse().setHeader(SerialBroadcasterCache.HEADER, "" + tail.serial);
+ else
+ r.getResponse().setHeader(SerialBroadcasterCache.HEADER, "" + (s.longValue() + 1));
+
+// // we send a message, check if a serial header is set
+// final HttpServletResponse response = r.getResponse();
+// if (response.containsHeader(SerialBroadcasterCache.HEADER) == false) {
//
-// if (s.longValue() != this.serial.get())
-// logger.warn("found invalid serial: " + s + " (@request) != " + this.serial + " (actual)");
+// // no header sent, set explicitly
+// response.setHeader(SerialBroadcasterCache.HEADER, "" + (serial.longValue() + 1));
// }
-// }
+ }
}
}
View
@@ -21,21 +21,15 @@
<!--
<init-param>
<param-name>org.atmosphere.cpr.broadcasterCacheClass</param-name>
- <param-value>haed.notification.NotificationBroadcasterCache</param-value>
- </init-param>
-
- <init-param>
- <param-name>org.atmosphere.cpr.broadcasterCacheClass</param-name>
- <param-value>haed.notification.SerialBroadcasterCache</param-value>
+ <param-value>org.atmosphere.cache.HeaderBroadcasterCache</param-value>
</init-param>
-->
<init-param>
<param-name>org.atmosphere.cpr.broadcasterCacheClass</param-name>
- <param-value>org.atmosphere.cache.HeaderBroadcasterCache</param-value>
+ <param-value>haed.notification.SerialBroadcasterCache</param-value>
</init-param>
-
<load-on-startup>1</load-on-startup>
</servlet>
@@ -66,4 +60,4 @@
<url-pattern>/haed.app1/*</url-pattern>
</servlet-mapping>
-</web-app>
+</web-app>
View
@@ -3,7 +3,7 @@
<head>
<script type="text/javascript" src="/js/jquery.js"></script>
- <script type="text/javascript" src="/js/jquery.atmosphere.js"></script>
+ <script type="text/javascript" src="/js/jquery.atmosphere.patched.js"></script>
<script type="text/javascript" src="/js/haed.notification.js"></script>
</head>
@@ -69,8 +69,8 @@
jQuery(".count").text("Count: " + count);
};
-jQuery(".stress10").click(function() { incCount(10); sendMessages(10); });
-jQuery(".stress100").click(function() { incCount(100); sendMessages(100); });
+jQuery(".stress10").click(function() { sendMessages(10); });
+jQuery(".stress100").click(function() { sendMessages(100); });
jQuery(".resetCount").click(resetCount);
@@ -106,12 +106,7 @@
.done(function() {
channel.subscribe(topic, function(message) {
println("received: " + message);
- if (count > 0) {
- decCount(1);
- if (count == 0) {
- println("received all messages");
- }
- }
+ incCount(1);
});
println("subscribed to: " + topic);
});

0 comments on commit cdd8515

Please sign in to comment.