Skip to content

Commit

Permalink
Consolidated logger for AttributeStore impls. Added rtc and http to I…
Browse files Browse the repository at this point in the history
…Connection encodings. Added deleteStream protection against non-publisher. Removed extra WS timeout logic
  • Loading branch information
mondain committed Feb 23, 2023
1 parent a71dbcb commit fa13a16
Show file tree
Hide file tree
Showing 19 changed files with 100 additions and 79 deletions.
2 changes: 1 addition & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.3.13</version>
<version>1.3.14</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-client</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion client/src/main/java/org/red5/client/Red5Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public final class Red5Client {
/**
* Current server version with revision
*/
public static final String VERSION = "Red5 Client 1.3.13";
public static final String VERSION = "Red5 Client 1.3.14";

/**
* Create a new Red5Client object using the connection local to the current thread A bit of magic that lets you access the red5 scope
Expand Down
4 changes: 2 additions & 2 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.3.13</version>
<version>1.3.14</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-server-common</artifactId>
Expand Down Expand Up @@ -124,7 +124,7 @@
<dependency>
<groupId>net.engio</groupId>
<artifactId>mbassador</artifactId>
<version>1.3.13</version>
<version>1.3.14</version>
</dependency> -->
<dependency>
<groupId>junit</groupId>
Expand Down
6 changes: 5 additions & 1 deletion common/src/main/java/org/red5/server/AttributeStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@

public class AttributeStore implements ICastingAttributeStore {

protected static Logger log = LoggerFactory.getLogger(AttributeStore.class);
protected Logger log = LoggerFactory.getLogger(getClass());

protected boolean isTrace = log.isTraceEnabled();

protected boolean isDebug = log.isDebugEnabled();

/**
* Map for attributes with initialCapacity = 1, loadFactor = .9, concurrencyLevel = (# of processors)
Expand Down
4 changes: 0 additions & 4 deletions common/src/main/java/org/red5/server/BaseConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,12 @@
import org.red5.server.api.scope.IScope;
import org.red5.server.scope.Scope;
import org.red5.server.so.SharedObjectScope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Base abstract class for connections. Adds connection specific functionality like work with clients to AttributeStore.
*/
public abstract class BaseConnection extends AttributeStore implements IConnection {

private static final Logger log = LoggerFactory.getLogger(BaseConnection.class);

/**
* Connection type
*/
Expand Down
2 changes: 1 addition & 1 deletion common/src/main/java/org/red5/server/api/IConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public interface IConnection extends ICoreObject, ICastingAttributeStore {
* Encoding type.
*/
public static enum Encoding {
AMF0, AMF3, WEBSOCKET, SOCKETIO, RTP, SRTP, BLOB, RAW, RTSP, SRT, MPEGTS, DATACHANNEL
AMF0, AMF3, WEBSOCKET, SOCKETIO, RTP, SRTP, BLOB, RAW, RTSP, SRT, MPEGTS, DATACHANNEL, WEBRTC, HTTP
};

/**
Expand Down
4 changes: 2 additions & 2 deletions common/src/main/java/org/red5/server/api/Red5.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ public final class Red5 {
/**
* Server version with revision
*/
public static final String VERSION = "Red5 Server 1.3.13";
public static final String VERSION = "Red5 Server 1.3.14";

/**
* Server version for fmsVer requests
*/
public static final String FMS_VERSION = "RED5/1,3,13,0";
public static final String FMS_VERSION = "RED5/1,3,14,0";

/**
* Server capabilities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@
import org.red5.server.stream.SingleItemSubscriberStream;
import org.red5.server.stream.StreamService;
import org.red5.server.util.ScopeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
Expand All @@ -92,12 +90,6 @@
*/
public abstract class RTMPConnection extends BaseConnection implements IStreamCapableConnection, IServiceCapableConnection, IReceivedMessageTaskQueueListener {

private static Logger log = LoggerFactory.getLogger(RTMPConnection.class);

private static boolean isTrace = log.isTraceEnabled();

private static boolean isDebug = log.isDebugEnabled();

public static final String RTMP_SESSION_ID = "rtmp.sessionid";

public static final String RTMP_HANDSHAKE = "rtmp.handshake";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,20 @@ public void deleteStream(Number streamId) {
public void deleteStream(IStreamCapableConnection conn, Number streamId) {
IClientStream stream = conn.getStreamById(streamId);
if (stream != null) {
log.debug("Delete id: {} stream: {}", streamId, stream);
if (!IConnection.Duty.PUBLISHER.equals(conn.getDuty())) {
// this is a subscriber dont close / delete the stream
log.warn("Connection is not a publisher, not closing stream");
return;
}
if (stream instanceof IClientBroadcastStream) {
IClientBroadcastStream bs = (IClientBroadcastStream) stream;
IBroadcastScope bsScope = getBroadcastScope(conn.getScope(), bs.getPublishedName());
if (bsScope != null && conn instanceof BaseConnection) {
((BaseConnection) conn).unregisterBasicScope(bsScope);
}
} else {
log.debug("Stream is not instance of IClientBroadcastStream");
}
stream.close();
}
Expand Down
2 changes: 1 addition & 1 deletion io/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.3.13</version>
<version>1.3.14</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-io</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<name>Red5</name>
<description>The Red5 server</description>
<groupId>org.red5</groupId>
<version>1.3.13</version>
<version>1.3.14</version>
<url>https://github.com/Red5/red5-server</url>
<inceptionYear>2005</inceptionYear>
<organization>
Expand Down
2 changes: 1 addition & 1 deletion server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.3.13</version>
<version>1.3.14</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.stream.Stream;

import javax.websocket.Extension;
Expand Down Expand Up @@ -51,9 +52,13 @@ public class WebSocketConnection extends AttributeStore implements Comparable<We
private static final boolean isDebug = log.isDebugEnabled();

// Sending async on windows times out
private static boolean useAsync; // = !System.getProperty("os.name").contains("Windows");
private static boolean useAsync;

private static long sendTimeout = 3000L, readTimeout = 30000L;
private static long sendTimeout = 8000L, readTimeout = 30000L;

private static final AtomicLongFieldUpdater<WebSocketConnection> readBytesUpdater = AtomicLongFieldUpdater.newUpdater(WebSocketConnection.class, "readBytes");

private static final AtomicLongFieldUpdater<WebSocketConnection> writeBytesUpdater = AtomicLongFieldUpdater.newUpdater(WebSocketConnection.class, "writtenBytes");

private AtomicBoolean connected = new AtomicBoolean(false);

Expand Down Expand Up @@ -96,9 +101,6 @@ public class WebSocketConnection extends AttributeStore implements Comparable<We
// stats
private volatile long readBytes, writtenBytes;

// last read time will be set when we've received; last write will be on any write, not just ping
private long lastReadTime, lastWriteTime;

// send future for when async is enabled
private Future<Void> sendFuture;

Expand Down Expand Up @@ -158,9 +160,16 @@ public WebSocketConnection(WebSocketScope scope, Session session) {
}
// get user props
Map<String, Object> userProps = session.getUserProperties();
// add the timeouts to the user props
userProps.put(Constants.READ_IDLE_TIMEOUT_MS, readTimeout);
userProps.put(Constants.WRITE_IDLE_TIMEOUT_MS, sendTimeout);
if (isDebug) {
log.debug("userProps: {}", userProps);
}
// set maximum messages size to 10,000 bytes
session.setMaxTextMessageBufferSize(10000);
// set maximum idle timeout to 30 seconds (read timeout)
session.setMaxIdleTimeout(readTimeout);
}

/**
Expand Down Expand Up @@ -352,19 +361,28 @@ public void close() {
}
}

/*
WsSession uses these userProperties for checkExpiration along with maxIdleTimeout
configuration for read idle timeout on WebSocket session
READ_IDLE_TIMEOUT_MS = "org.apache.tomcat.websocket.READ_IDLE_TIMEOUT_MS";
configuration for write idle timeout on WebSocket session
WRITE_IDLE_TIMEOUT_MS = "org.apache.tomcat.websocket.WRITE_IDLE_TIMEOUT_MS";
*/
public void timeoutAsync(long now) {
long readDelta = (now - lastReadTime), writeDelta = (now - lastWriteTime);
log.debug("timeoutAsync: {} on {} last read: {} last write: {}", now, wsSessionId, readDelta, writeDelta);
if (isConnected()) {
// if the delta is less than now, then the last time isn't 0
if (readDelta != now && readDelta > readTimeout) {
log.warn("Read timeout: {} on id: {}", readDelta, wsSessionId);
close();
} else if (writeDelta != now && writeDelta > sendTimeout) {
log.warn("Write timeout: {} on id: {}", writeDelta, wsSessionId);
close();
}
}
// XXX(paul) only logging here as we should more than likely rely upon the container checking expiration
log.trace("timeoutAsync: {} on session id: {} read: {} written: {}", now, wsSessionId, readBytes, writtenBytes);
/*
WsSession session = wsSession.get();
Map<String, Object> props = session.getUserProperties();
log.debug("Session properties: {}", props);
long maxIdleTimeout = session.getMaxIdleTimeout();
long readTimeout = (long) props.get(Constants.READ_IDLE_TIMEOUT_MS);
long sendTimeout = (long) props.get(Constants.WRITE_IDLE_TIMEOUT_MS);
log.debug("Session timeouts - max: {} read: {} write: {}", maxIdleTimeout, readTimeout, sendTimeout);
//long readDelta = (now - lastReadTime), writeDelta = (now - lastWriteTime);
//log.debug("timeoutAsync: {} on {} last read: {} last write: {}", now, wsSessionId, readDelta, writeDelta);
*/
}

/**
Expand Down Expand Up @@ -656,25 +674,19 @@ public long getReadBytes() {
}

public void updateReadBytes(long read) {
readBytes += read;
lastReadTime = System.currentTimeMillis();
log.debug("updateReadBytes: {} by: {}", readBytes, read);
readBytesUpdater.addAndGet(this, read);
// read time is updated on WsSession by WsFrameBase when the read is performed
}

public long getWrittenBytes() {
return writtenBytes;
}

public void updateWriteBytes(long wrote) {
writtenBytes += wrote;
lastWriteTime = System.currentTimeMillis();
}

public long getLastReadTime() {
return lastReadTime;
}

public long getLastWriteTime() {
return lastWriteTime;
log.debug("updateWriteBytes: {} by: {}", writtenBytes, wrote);
writeBytesUpdater.addAndGet(this, wrote);
// write time is updated on WsSession by WsRemoteEndpointImplBase when the write is performed
}

public String getWsSessionId() {
Expand Down
10 changes: 10 additions & 0 deletions server/src/main/java/org/red5/net/websocket/model/WSMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ public WSMessage(String message) throws UnsupportedEncodingException {
payload = IoBuffer.wrap(message.getBytes("UTF8"));
}

public WSMessage(String message, WebSocketConnection conn) throws UnsupportedEncodingException {
setPayload(IoBuffer.wrap(message.getBytes("UTF8")));
setConnection(conn);
}

public WSMessage(IoBuffer payload, WebSocketConnection conn) {
setPayload(payload);
setConnection(conn);
}

/**
* Returns the payload data as a UTF8 string.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ public void onOpen(Session session, EndpointConfig config) {
if (conn == null) {
log.warn("WebSocketConnection null at onOpen for {}", session.getId());
}
// Set maximum messages size to 10,000 bytes
session.setMaxTextMessageBufferSize(10000);
session.addMessageHandler(new WholeMessageHandler(conn));
session.addMessageHandler(new WholeBinaryHandler(conn));
session.addMessageHandler(new WholePongHandler(conn));
Expand Down Expand Up @@ -132,13 +130,12 @@ public void onMessage(String message) {
if (isTrace) {
log.trace("Message received {}", message);
}
if (conn != null && conn.isConnected()) {
if (conn != null) {
// update the byte received counter
conn.updateReadBytes(message.getBytes().length);
try {
// update the byte received counter
conn.updateReadBytes(message.getBytes().length);
// create a websocket message and add the current connection for listener access
WSMessage wsMessage = new WSMessage(message);
wsMessage.setConnection(conn);
WSMessage wsMessage = new WSMessage(message, conn);
// fire the message off to the scope for handling
scope.onMessage(wsMessage);
} catch (UnsupportedEncodingException e) {
Expand All @@ -164,13 +161,11 @@ public void onMessage(ByteBuffer message) {
if (isTrace) {
log.trace("Message received {}", message);
}
if (conn != null && conn.isConnected()) {
if (conn != null) {
// update the byte received counter
conn.updateReadBytes(message.limit());
// create a websocket message and add the current connection for listener access
WSMessage wsMessage = new WSMessage();
wsMessage.setPayload(IoBuffer.wrap(message));
wsMessage.setConnection(conn);
WSMessage wsMessage = new WSMessage(IoBuffer.wrap(message), conn);
// fire the message off to the scope for handling
scope.onMessage(wsMessage);
} else {
Expand All @@ -194,7 +189,7 @@ public void onMessage(PongMessage message) {
log.trace("Pong received {}", message);
}
// update the byte received counter
if (conn != null && conn.isConnected()) {
if (conn != null) {
conn.updateReadBytes(1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,23 @@ public Set<String> getRegisteredEndpointPaths() {
return Collections.unmodifiableSet(registeredEndpointPaths);
}

@Override
public void backgroundProcess() {
// some comments say 1s others say 10s
//log.debug("backgroundProcess - period: {}", getProcessPeriod());
/*
This method gets called once a second (this is super class content)
backgroundProcessCount ++;
if (backgroundProcessCount >= processPeriod) {
backgroundProcessCount = 0;
for (WsSession wsSession : sessions.keySet()) {
wsSession.checkExpiration();
}
}
*/
super.backgroundProcess();
}

/**
* {@inheritDoc}
*
Expand Down

0 comments on commit fa13a16

Please sign in to comment.