Skip to content

Commit

Permalink
Fixed red5-server issue 39
Browse files Browse the repository at this point in the history
  • Loading branch information
mondain committed Dec 13, 2014
1 parent 0a2acb0 commit 21c65a2
Show file tree
Hide file tree
Showing 18 changed files with 188 additions and 159 deletions.
14 changes: 7 additions & 7 deletions src/main/java/org/red5/server/BaseConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,7 @@ public void setStreamId(int id) {
*/
@ConstructorProperties(value = { "persistent" })
public BaseConnection() {
log.debug("New BaseConnection");
this.type = PERSISTENT;
this.sessionId = RandomStringUtils.randomAlphanumeric(13).toUpperCase();
log.debug("Generated session id: {}", sessionId);
this(PERSISTENT);
}

/**
Expand Down Expand Up @@ -201,6 +198,7 @@ public BaseConnection(String type, String host, String remoteAddress, int remote
this.path = path;
this.sessionId = sessionId;
this.params = params;
log.debug("Generated session id: {}", sessionId);
}

/** {@inheritDoc} */
Expand All @@ -226,13 +224,15 @@ public Semaphore getLock() {
*/
public void initialize(IClient client) {
log.debug("initialize - client: {}", client);
if (this.client != null && this.client instanceof Client) {
if (this.client != null && this.client instanceof Client && !this.client.equals(client)) {
// unregister old client
log.trace("Unregistering previous client: {}", this.client);
((Client) this.client).unregister(this, false);
}
this.client = client;
if (this.client instanceof Client) {
if (this.client instanceof Client && !((Client) this.client).isRegistered(this)) {
// register new client
log.trace("Registering client: {}", this.client);
((Client) this.client).register(this);
}
}
Expand Down Expand Up @@ -317,7 +317,7 @@ public IClient getClient() {
* @return true if connection is bound to scope, false otherwise
*/
public boolean isConnected() {
log.debug("Connected: {}", (scope != null));
//log.debug("Connected: {}", (scope != null));
return scope != null;
}

Expand Down
92 changes: 60 additions & 32 deletions src/main/java/org/red5/server/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,47 +55,74 @@ public class Client extends AttributeStore implements IClient {
*/
protected static final String PERMISSIONS = IPersistable.TRANSIENT_PREFIX + "_red5_permissions";

/**
* Client registry where Client is registered
*/
protected transient WeakReference<ClientRegistry> registry;

/**
* Connections this client is associated with.
*/
protected CopyOnWriteArraySet<IConnection> connections = new CopyOnWriteArraySet<IConnection>();
protected transient CopyOnWriteArraySet<IConnection> connections = new CopyOnWriteArraySet<IConnection>();

/**
* Creation time as Timestamp
*/
protected long creationTime;
protected final long creationTime;

/**
* Clients identifier
*/
protected String id;

/**
* Client registry where Client is registered
*/
protected WeakReference<ClientRegistry> registry;
protected final String id;

/**
* Whether or not the bandwidth has been checked.
*/
protected boolean bandwidthChecked;

/**
* Creates client, sets creation time and registers it in ClientRegistry
* DW: nope, does not currently register it in ClientRegistry!
* Creates client, sets creation time and registers it in ClientRegistry.
*
* @param id Client id
* @param registry ClientRegistry
*/
@ConstructorProperties({ "id", "registry" })
public Client(String id, ClientRegistry registry) {
super();
this.id = id;
if (id != null) {
this.id = id;
} else {
this.id = registry.nextId();
}
this.creationTime = System.currentTimeMillis();
// use a weak reference to prevent any hard-links to the registry
this.registry = new WeakReference<ClientRegistry>(registry);
this.creationTime = System.currentTimeMillis();
}

/**
* Creates client, sets creation time and registers it in ClientRegistry.
*
* @param id Client id
* @param creationTime Creation time
* @param registry ClientRegistry
*/
@ConstructorProperties({ "id", "creationTime", "registry" })
public Client(String id, Long creationTime, ClientRegistry registry) {
super();
if (id != null) {
this.id = id;
} else {
this.id = registry.nextId();
}
if (creationTime != null) {
this.creationTime = creationTime;
} else {
this.creationTime = System.currentTimeMillis();
}
// use a weak reference to prevent any hard-links to the registry
this.registry = new WeakReference<ClientRegistry>(registry);
}

/**
* Disconnects client from Red5 application
*/
Expand Down Expand Up @@ -149,15 +176,6 @@ public Set<IConnection> getConnections(IScope scope) {
return Collections.emptySet();
}

/**
* Sets the time at which the client was created.
*
* @param creationTime
*/
public void setCreationTime(long creationTime) {
this.creationTime = creationTime;
}

/**
* Returns the time at which the client was created.
*
Expand All @@ -168,14 +186,8 @@ public long getCreationTime() {
}

/**
* Sets the client id
*/
public void setId(String id) {
this.id = id;
}

/**
* Returns the client id
* Returns the client id.
*
* @return client id
*/
public String getId() {
Expand Down Expand Up @@ -217,12 +229,28 @@ public List<String> iterateScopeNameList() {
return scopeNames;
}

/**
* Returns registration status of given connection.
*
* @param conn
* @return
*/
public boolean isRegistered(IConnection conn) {
return connections.contains(conn);
}

/**
* Associate connection with client
* @param conn Connection object
*/
protected void register(IConnection conn) {
log.debug("Registering connection for this client {}", id);
if (log.isDebugEnabled()) {
if (conn == null) {
log.debug("Register null connection, client id: {}", id);
} else {
log.debug("Register connection ({}:{}) client id: {}", conn.getRemoteAddress(), conn.getRemotePort(), id);
}
}
if (conn != null) {
IScope scope = conn.getScope();
if (scope != null) {
Expand Down Expand Up @@ -250,6 +278,7 @@ protected void unregister(IConnection conn) {
* @param deleteIfNoConns Whether to delete this client if it no longer has any connections
*/
protected void unregister(IConnection conn, boolean deleteIfNoConns) {
log.debug("Unregister connection ({}:{}) client id: {}", conn.getRemoteAddress(), conn.getRemotePort(), id);
// remove connection from connected scopes list
connections.remove(conn);
// If client is not connected to any scope any longer then remove
Expand Down Expand Up @@ -321,8 +350,7 @@ public static Client from(CompositeData cd) {
Client instance = null;
if (cd.containsKey("id")) {
String id = (String) cd.get("id");
instance = new Client(id, null);
instance.setCreationTime((Long) cd.get("creationTime"));
instance = new Client(id, (Long) cd.get("creationTime"), null);
instance.setAttribute(PERMISSIONS, cd.get(PERMISSIONS));
}
if (cd.containsKey("attributes")) {
Expand Down
34 changes: 17 additions & 17 deletions src/main/java/org/red5/server/ClientRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.red5.server.exception.ClientNotFoundException;
import org.red5.server.exception.ClientRejectedException;
import org.red5.server.jmx.mxbeans.ClientRegistryMXBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jmx.export.annotation.ManagedResource;

/**
Expand All @@ -45,6 +47,8 @@
@ManagedResource(objectName="org.red5.server:type=ClientRegistry,name=default", description="ClientRegistry")
public class ClientRegistry implements IClientRegistry, ClientRegistryMXBean {

private Logger log = LoggerFactory.getLogger(ClientRegistry.class);

/**
* Clients map
*/
Expand Down Expand Up @@ -89,19 +93,11 @@ public void addClient(IClient client) {
* Add the client to the registry
*/
private void addClient(String id, IClient client) {
//check to see if the id already exists first
// check to see if the id already exists first
if (!hasClient(id)) {
clients.put(id, client);
} else {
// DW the Client object is meant to be unifying connections from a remote user. But currently the only case we
// specify this currently is when we use a remoting session. So we actually just create an arbitrary id, which means
// RTMP connections from same user are not combined.
//get the next available client id
String newId = nextId();
//update the client
client.setId(newId);
//add the client to the list
addClient(newId, client);
log.debug("Client id: {} already registered", id);
}
}

Expand Down Expand Up @@ -140,7 +136,7 @@ protected boolean hasClients() {
@SuppressWarnings("unchecked")
protected Collection<IClient> getClients() {
if (!hasClients()) {
// Avoid creating new Collection object if no clients exist.
// avoid creating new Collection object if no clients exist.
return Collections.EMPTY_SET;
}
return Collections.unmodifiableCollection(clients.values());
Expand Down Expand Up @@ -192,19 +188,23 @@ public IClient newClient(Object[] params) throws ClientNotFoundException, Client
* @return Next client id
*/
public String nextId() {
//when we reach max int, reset to zero
if (nextId.get() == Integer.MAX_VALUE) {
nextId.set(0);
}
return String.format("%s", nextId.getAndIncrement());
String id = "-1";
do {
// when we reach max int, reset to zero
if (nextId.get() == Integer.MAX_VALUE) {
nextId.set(0);
}
id = String.format("%d", nextId.getAndIncrement());
} while (hasClient(id));
return id;
}

/**
* Return previous client id
* @return Previous client id
*/
public String previousId() {
return String.format("%s", nextId.get());
return String.format("%d", nextId.get());
}

/**
Expand Down
23 changes: 9 additions & 14 deletions src/main/java/org/red5/server/api/IClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,29 +45,24 @@ public interface IClient extends IAttributeStore {
*/
public static final String ID = "red5.client";

/**
* Sets the clients id
* @param id client id
*/
public void setId(String id);

/**
* Get the unique ID for this client. This will be generated by the server
* if not passed upon connection from client-side Flex/Flash app. To assign a custom ID to the client use
* <code>params</code> object of
* {@link IApplication#appConnect(IConnection, Object[])} method, that
* contains 2nd all the rest values you pass to
* <code>NetConnection.connect</code> method.
* <code>params</code> object of {@link IApplication#appConnect(IConnection, Object[])} method, that
* contains 2nd all the rest values you pass to <code>NetConnection.connect</code> method.
*
* Example:
*
* At client side:
* <code>NetConnection.connect( "http://localhost/killerapp/", "user123" );</code>
* <code>
* NetConnection.connect("http://localhost/killerapp/", "user123");
* </code>
*
* then at server side: <code>
* public boolean appConnect( IConnection connection, Object[] params ){<br/>
* then at server side:
* <code>
* public boolean appConnect( IConnection connection, Object[] params){
* try {
* connection.getClient().setId( (String) params[0] );
* connection.getClient().setAttribute("param0", (String) params[0]);
* } catch(Exception e){<br/>
* log.error("{}", e);
* }
Expand Down
8 changes: 2 additions & 6 deletions src/main/java/org/red5/server/api/IConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,8 @@
/**
* The connection object.
*
* Each connection has an associated client and scope. Connections may be
* persistent, polling, or transient. The aim of this interface is to provide
* basic connection methods shared between different types of connections
*
* Future subclasses: RTMPConnection, RemotingConnection, AJAXConnection,
* HttpConnection, etc
* Each connection has an associated client and scope. Connections may be persistent, polling, or transient.
* The aim of this interface is to provide basic connection methods shared between different types of connections.
*
* @author The Red5 Project
* @author Luke Hubbard (luke@codegent.com)
Expand Down
15 changes: 12 additions & 3 deletions src/main/java/org/red5/server/api/Red5.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,15 @@ public Red5() {
* @param connection Thread local connection
*/
public static void setConnectionLocal(IConnection connection) {
log.debug("Set connection: {} with thread: {}", (connection != null ? connection.getSessionId() : null), Thread.currentThread().getName());
if (log.isDebugEnabled()) {
log.debug("Set connection: {} with thread: {}", (connection != null ? connection.getSessionId() : null), Thread.currentThread().getName());
try {
StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
StackTraceElement stackTraceElement = stackTraceElements[2];
log.debug("Caller: {}.{} #{}", stackTraceElement.getClassName(), stackTraceElement.getMethodName(), stackTraceElement.getLineNumber());
} catch (Exception e) {
}
}
if (connection != null) {
connThreadLocal.set(new WeakReference<IConnection>(connection));
IScope scope = connection.getScope();
Expand All @@ -144,10 +152,11 @@ public static void setConnectionLocal(IConnection connection) {
* @return Connection object
*/
public static IConnection getConnectionLocal() {
log.debug("Get connection on thread: {}", Thread.currentThread().getName());
WeakReference<IConnection> ref = connThreadLocal.get();
if (ref != null) {
return ref.get();
IConnection connection = ref.get();
log.debug("Get connection: {} on thread: {}", (connection != null ? connection.getSessionId() : null), Thread.currentThread().getName());
return connection;
} else {
return null;
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/red5/server/api/scope/IScope.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
*
* @author The Red5 Project
* @author Luke Hubbard (luke@codegent.com)
* @author Paul Gregoire (mondain@gmail.com)
*/
public interface IScope extends IBasicScope, ResourcePatternResolver, IServiceHandlerProvider {

Expand Down
Loading

0 comments on commit 21c65a2

Please sign in to comment.