Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
("set user-defined headers at time of broadcast")

Allow per request filtering broadcast. Added support for Opera streaming using the new API.
  • Loading branch information
jfarcand committed Nov 19, 2010
1 parent d866c4a commit 0dae9e1
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 58 deletions.
Expand Up @@ -37,15 +37,17 @@
package org.atmosphere.client;

import org.atmosphere.cpr.BroadcastFilter;
import org.atmosphere.cpr.PerRequestBroadcastFilter;

import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Filter that inject Javascript code to a broadcast so it can be used with the Atmosphere JQuery Plugin.
*
*
* @author Jeanfrancois Arcand
*/
public class JavascriptClientFilter implements BroadcastFilter {
public class JavascriptClientFilter implements BroadcastFilter, PerRequestBroadcastFilter {

private final AtomicInteger uniqueScriptToken = new AtomicInteger();

Expand All @@ -65,4 +67,25 @@ public BroadcastAction filter(Object originalMessage, Object message) {
return new BroadcastAction(BroadcastAction.ACTION.CONTINUE, message);
}

@Override
public BroadcastAction filter(HttpServletRequest request, Object message) {

String userAgent = request.getHeader("User-Agent").toLowerCase();
if (userAgent != null && userAgent.startsWith("opera") && message instanceof String) {
StringBuilder sb = new StringBuilder("<script id=\"atmosphere_")
.append(uniqueScriptToken.getAndIncrement())
.append("\">")
.append("window.parent.$.atmosphere.streamingCallback")
.append("('")
.append(message.toString())
.append("');</script>");
message = sb.toString();
}
return new BroadcastAction(BroadcastAction.ACTION.CONTINUE, message);
}
}





Expand Up @@ -126,7 +126,6 @@ public interface AtmosphereResource<E, F> {
*/
public E getRequest();


/**
* Return the underlying <F> Response.
*
Expand Down Expand Up @@ -155,7 +154,6 @@ public interface AtmosphereResource<E, F> {
*/
public void setBroadcaster(Broadcaster broadcaster);


/**
* Set the {@link Serializer} to use when {@link AtmosphereResource#write}
* execute the operation. By default, the {@link Serializer} is null.
Expand Down
Expand Up @@ -37,6 +37,8 @@

package org.atmosphere.cpr;

import javax.servlet.http.HttpServletRequest;

/**
* Transform a message of type 'E" before it get broadcasted to
* {@link AtmosphereHandler#onStateChange(org.atmosphere.cpr.AtmosphereResourceEvent) }
Expand All @@ -48,7 +50,7 @@
public interface BroadcastFilter {

/**
* Simple class that tells the {@link Broadcaster} to broacast or not
* Simple class that tells the {@link Broadcaster} to broadcast or not
* the transformed value.
*/
public class BroadcastAction {
Expand Down Expand Up @@ -93,8 +95,11 @@ void setOriginalMsg(Object originalMsg) {
* {@link Broadcaster} to discard the message, e.g to not broadcast it.
*
* @param originalMessage The original message which was {@link Broadcaster#broadcast(Object)};
* @param message Object a message
* @param message Object a message
* @return a transformed message.
*/
BroadcastAction filter(Object originalMessage, Object message);

}


Expand Up @@ -40,6 +40,7 @@
import org.atmosphere.cpr.BroadcastFilter.BroadcastAction;
import org.atmosphere.util.LoggerUtils;

import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand All @@ -63,6 +64,9 @@ public class BroadcasterConfig {
protected final ConcurrentLinkedQueue<BroadcastFilter> filters =
new ConcurrentLinkedQueue<BroadcastFilter>();

protected final ConcurrentLinkedQueue<PerRequestBroadcastFilter> perRequestFilters =
new ConcurrentLinkedQueue<PerRequestBroadcastFilter>();

private ExecutorService executorService;

private ExecutorService defaultExecutorService;
Expand Down Expand Up @@ -153,6 +157,10 @@ public boolean addFilter(BroadcastFilter e) {
((BroadcastFilterLifecycle) e).init();
}

if (e instanceof PerRequestBroadcastFilter) {
perRequestFilters.add((PerRequestBroadcastFilter)e);
}

return filters.offer(e);
}

Expand Down Expand Up @@ -191,6 +199,11 @@ public boolean removeFilter(BroadcastFilter e) {
if (e instanceof BroadcastFilterLifecycle) {
((BroadcastFilterLifecycle) e).destroy();
}

if (e instanceof PerRequestBroadcastFilter) {
perRequestFilters.remove(e);
}

return filters.remove(e);
}

Expand Down Expand Up @@ -229,6 +242,23 @@ protected BroadcastAction filter(Object object) {
return transformed;
}

/**
* Invoke {@link BroadcastFilter} in the other they were added, with a unique {@link javax.servlet.http.HttpServletRequest}
* @param request {@link javax.servlet.http.HttpServletRequest}
* @param object the broadcasted object.
* @return BroadcastAction that tell Atmosphere to invoke the next filter or not.
*/
protected BroadcastAction filter(HttpServletRequest request, Object object) {
BroadcastAction transformed = new BroadcastAction(object);
for (PerRequestBroadcastFilter mf : perRequestFilters) {
transformed = mf.filter(request, transformed.message());
if (transformed == null || transformed.action() == BroadcastAction.ACTION.ABORT) {
return transformed;
}
}
return transformed;
}

/**
* Return the default {@link ExecutorService}.
*
Expand Down
Expand Up @@ -208,12 +208,14 @@ public class Entry {
public Object multipleAtmoResources;
public BroadcasterFuture<?> future;
public boolean writeLocally;
public Object originalMessage;

public Entry(Object message, Object multipleAtmoResources, BroadcasterFuture<?> future) {
public Entry(Object message, Object multipleAtmoResources, BroadcasterFuture<?> future, Object originalMessage) {
this.message = message;
this.multipleAtmoResources = multipleAtmoResources;
this.future = future;
this.writeLocally = true;
this.originalMessage = originalMessage;
}

public Entry(Object message, Object multipleAtmoResources, BroadcasterFuture<?> future, boolean writeLocally) {
Expand Down Expand Up @@ -306,20 +308,21 @@ protected void push(Entry msg) {

if (msg.multipleAtmoResources == null) {
for (AtmosphereResource<?, ?> r : resources) {
trackBroadcastMessage(r, finalMsg);
finalMsg = perRequestFilter(r, msg);
if (msg.writeLocally) {
push(r, finalMsg);
}
}
} else if (msg.multipleAtmoResources instanceof AtmosphereResource<?, ?>) {
trackBroadcastMessage((AtmosphereResource<?, ?>) msg.multipleAtmoResources, finalMsg);
finalMsg = perRequestFilter((AtmosphereResource<?, ?>) msg.multipleAtmoResources, msg);

if (msg.writeLocally) {
push((AtmosphereResource<?, ?>) msg.multipleAtmoResources, finalMsg);
}
} else if (msg.multipleAtmoResources instanceof Set) {
Set<AtmosphereResource<?, ?>> sub = (Set<AtmosphereResource<?, ?>>) msg.multipleAtmoResources;
for (AtmosphereResource<?, ?> r : sub) {
trackBroadcastMessage(r, finalMsg);
finalMsg = perRequestFilter(r, msg);
if (msg.writeLocally) {
push(r, finalMsg);
}
Expand All @@ -328,6 +331,21 @@ protected void push(Entry msg) {
msg.message = prevMessage;
}

protected Object perRequestFilter(AtmosphereResource<?, ?> r, Entry msg) {
Object finalMsg = msg.originalMessage;
if (r.getRequest() instanceof HttpServletRequest) {
Object message = msg.originalMessage;
BroadcastAction a = bc.filter( (HttpServletRequest) r.getRequest(), message);
if (a.action() == BroadcastAction.ACTION.ABORT || msg == null)
finalMsg = message;
else
finalMsg = a.message();
}

trackBroadcastMessage(r, finalMsg);
return finalMsg;
}

private Object translate(Object msg) {
if (Callable.class.isAssignableFrom(msg.getClass())) {
try {
Expand Down Expand Up @@ -437,7 +455,7 @@ public <T> Future<T> broadcast(T msg) {
if (newMsg == null) return null;

BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg);
messages.offer(new Entry(newMsg, null, f));
messages.offer(new Entry(newMsg, null, f, msg));
return f;
}

Expand Down Expand Up @@ -468,7 +486,7 @@ public <T> Future<T> broadcast(T msg, AtmosphereResource<?, ?> r) {
if (newMsg == null) return null;

BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg);
messages.offer(new Entry(newMsg, r, f));
messages.offer(new Entry(newMsg, r, f, msg));
return f;
}

Expand All @@ -485,7 +503,7 @@ public <T> Future<T> broadcastOnResume(T msg) {
if (newMsg == null) return null;

BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg);
broadcastOnResume.offer(new Entry(newMsg, null, f));
broadcastOnResume.offer(new Entry(newMsg, null, f, msg));
return f;
}

Expand Down Expand Up @@ -516,7 +534,7 @@ public <T> Future<T> broadcast(T msg, Set<AtmosphereResource<?, ?>> subset) {
if (newMsg == null) return null;

BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(newMsg);
messages.offer(new Entry(newMsg, subset, f));
messages.offer(new Entry(newMsg, subset, f, msg));
return f;
}

Expand Down Expand Up @@ -614,7 +632,7 @@ public <T> Future<T> delayBroadcast(final T o, long delay, TimeUnit t) {
if (msg == null) return null;

final BroadcasterFuture<Object> future = new BroadcasterFuture<Object>(msg);
final Entry e = new Entry(msg, null, future);
final Entry e = new Entry(msg, null, future, o);
Future<T> f;
if (delay > 0) {
f = bc.getScheduledExecutorService().schedule(new Callable<T>() {
Expand All @@ -626,7 +644,7 @@ public T call() throws Exception {
Object r = Callable.class.cast(o).call();
final Object msg = filter(r);
if (msg != null) {
Entry entry = new Entry(msg, null, null);
Entry entry = new Entry(msg, null, null, o);
push(entry);
}
return (T) msg;
Expand All @@ -635,7 +653,7 @@ public T call() throws Exception {
}
}
final Object msg = filter(o);
final Entry e = new Entry(msg, null, null);
final Entry e = new Entry(msg, null, null, o);
push(e);
return (T) msg;
}
Expand Down Expand Up @@ -674,7 +692,7 @@ public void run() {
Object r = Callable.class.cast(o).call();
final Object msg = filter(r);
if (msg != null) {
Entry entry = new Entry(msg, null, null);
Entry entry = new Entry(msg, null, null, o);
push(entry);
}
return;
Expand All @@ -683,7 +701,7 @@ public void run() {
}
}
final Object msg = filter(o);
final Entry e = new Entry(msg, null, null);
final Entry e = new Entry(msg, null, null, o);
push(e);
}
}, waitFor, period, t);
Expand Down
@@ -0,0 +1,59 @@
/*
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
*
* The contents of this file are subject to the terms of either the GNU
* General Public License Version 2 only ("GPL") or the Common Development
* and Distribution License("CDDL") (collectively, the "License"). You
* may not use this file except in compliance with the License. You can obtain
* a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
* or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific
* language governing permissions and limitations under the License.
*
* When distributing the software, include this License Header Notice in each
* file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
* Sun designates this particular file as subject to the "Classpath" exception
* as provided by Sun in the GPL Version 2 section of the License file that
* accompanied this code. If applicable, add the following below the License
* Header, with the fields enclosed by brackets [] replaced by your own
* identifying information: "Portions Copyrighted [year]
* [name of copyright owner]"
*
* Contributor(s):
*
* If you wish your version of this file to be governed by only the CDDL or
* only the GPL Version 2, indicate your decision by adding "[Contributor]
* elects to include this software in this distribution under the [CDDL or GPL
* Version 2] license." If you don't indicate a single choice of license, a
* recipient has the option to distribute your version of this file under
* either the CDDL, the GPL Version 2 or to extend the choice of license to
* its licensees as provided above. However, if you add GPL Version 2 code
* and therefore, elected the GPL Version 2 license, then the option applies
* only if the new code is made subject to such option by the copyright
* holder.
*/

package org.atmosphere.cpr;

import org.atmosphere.cpr.BroadcastFilter.BroadcastAction;

import javax.servlet.http.HttpServletRequest;

/**
* A markable interface that can be used in conjonction with {@link BroadcastFilter} to filter
* message per request.
*
*/
public interface PerRequestBroadcastFilter {

/**
* Transform or Filter a message per request, with V as an indicator.
*
* @param request The {@link javax.servlet.http.HttpServletRequest} send before the response was suspended/upgraded.
* @param message Object a message
* @return a transformed message.
*/
BroadcastAction filter(HttpServletRequest request, Object message);
}
Expand Up @@ -87,6 +87,7 @@ public void configure(AtmosphereServlet.AtmosphereConfig config) {

/**
* Implement this method to broadcast message to external source like JGroups, Redis, etc.
* @param message outgoing message
*/
abstract public void outgoingBroadcast(Object message);

Expand Down Expand Up @@ -127,7 +128,7 @@ protected void broadcast(final AtmosphereResource<?, ?> r, final AtmosphereResou
protected void broadcastReceivedMessage(Object message) {
try {
Object newMsg = filter(message);
push(new Entry(newMsg, null, new BroadcasterFuture<Object>(newMsg)));
push(new Entry(newMsg, null, new BroadcasterFuture<Object>(newMsg), message));
} catch (Throwable t) {
logger.log(Level.SEVERE, "", t);
}
Expand Down

0 comments on commit 0dae9e1

Please sign in to comment.