Permalink
Browse files

% first steps in automated RTCP handling

% more unit testing
% slight API changes and improvements
  • Loading branch information...
1 parent eea6af1 commit 3de98307257c663b582925c2ca46c5290add7c72 Bruno de Carvalho committed Sep 14, 2010
Showing with 1,348 additions and 440 deletions.
  1. +1 −1 README.md
  2. +5 −0 TODO.txt
  3. +12 −0 pom.xml
  4. +187 −2 src/functionaltest/java/org/factor45/efflux/session/ControlPacketFunctionalTest.java
  5. +1 −1 src/functionaltest/java/org/factor45/efflux/session/MultiParticipantSessionFunctionalTest.java
  6. +14 −4 src/functionaltest/java/org/factor45/efflux/session/SingleParticipantSessionFunctionalTest.java
  7. +85 −0 src/main/java/org/factor45/efflux/packet/AbstractReportPacket.java
  8. +49 −0 src/main/java/org/factor45/efflux/packet/AppDataPacket.java
  9. +48 −21 src/main/java/org/factor45/efflux/packet/DataPacket.java
  10. +67 −56 src/main/java/org/factor45/efflux/packet/ReceiverReportPacket.java
  11. +1 −52 src/main/java/org/factor45/efflux/packet/SenderReportPacket.java
  12. +509 −49 src/main/java/org/factor45/efflux/session/AbstractRtpSession.java
  13. +134 −0 src/main/java/org/factor45/efflux/session/DefaultRtpParticipantContext.java
  14. +1 −142 src/main/java/org/factor45/efflux/session/MultiParticipantSession.java
  15. +134 −26 src/main/java/org/factor45/efflux/session/RtpParticipant.java
  16. +9 −35 src/main/java/org/factor45/efflux/session/RtpParticipantContext.java
  17. +6 −6 src/main/java/org/factor45/efflux/session/RtpSession.java
  18. +3 −0 src/main/java/org/factor45/efflux/session/RtpSessionControlListener.java
  19. +8 −1 src/main/java/org/factor45/efflux/session/RtpSessionEventListener.java
  20. +20 −28 src/main/java/org/factor45/efflux/session/SingleParticipantSession.java
  21. +12 −6 src/site/index.html
  22. +0 −1 src/test/java/org/factor45/efflux/packet/ByePacketTest.java
  23. +0 −1 src/test/java/org/factor45/efflux/packet/ControlPacketTest.java
  24. +31 −7 src/test/java/org/factor45/efflux/{RtpPacketTest.java → packet/DataPacketTest.java}
  25. +11 −1 src/test/java/org/factor45/efflux/session/MultiParticipantSessionTest.java
View
2 README.md
@@ -3,7 +3,7 @@ efflux
A Java RTP stack
----------------
-**efflux** is a simple Java RTP stack, whose purpose are applications who do not directly generate RTP content themselves but need to send or receive data using this protocol.
+**efflux** is a simple Java RTP stack, whose target are applications who do not directly generate RTP content themselves but need to send or receive data using this protocol.
It aims to be fully RFC compliant but has utilities for those special cases where the other end doesn't quite follow the RFC (which, sadly, happens often).
Project page: [http://efflux.factor45.org](http://efflux.factor45.org)
View
5 TODO.txt
@@ -0,0 +1,5 @@
+* Finish off automated RTCP handling
+* Update local sender statistics
+* Process receiver reports and fire events related to traffic conditions changing
+* RTCP bandwidth adjustment
+* Automated sending of SR/RR packets
View
12 pom.xml
@@ -101,6 +101,18 @@
</configuration>
</plugin>
+ <!-- jar code -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.3.1</version>
+ <configuration>
+ <excludes>
+ <exclude>**/log4j.properties</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+
<!-- tests -->
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
View
189 src/functionaltest/java/org/factor45/efflux/session/ControlPacketFunctionalTest.java
@@ -16,8 +16,10 @@
package org.factor45.efflux.session;
+import org.factor45.efflux.packet.AppDataPacket;
import org.factor45.efflux.packet.ByePacket;
import org.factor45.efflux.packet.CompoundControlPacket;
+import org.factor45.efflux.packet.DataPacket;
import org.factor45.efflux.packet.SdesChunk;
import org.factor45.efflux.packet.SdesChunkItems;
import org.factor45.efflux.packet.SourceDescriptionPacket;
@@ -27,8 +29,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
/**
* @author <a:mailto="bruno.carvalho@wit-software.com" />Bruno de Carvalho</a>
@@ -56,18 +58,25 @@ public void testSendAndReceive() {
RtpParticipant local1 = new RtpParticipant("127.0.0.1", 6000, 6001, 1);
RtpParticipant remote1 = new RtpParticipant("127.0.0.1", 7000, 7001, 2);
this.session1 = new SingleParticipantSession("Session1", 8, local1, remote1);
+ this.session1.setAutomatedRtcpHandling(false);
assertTrue(this.session1.init());
this.session1.addControlListener(new RtpSessionControlListener() {
@Override
public void controlPacketReceived(RtpSession session, CompoundControlPacket packet) {
System.err.println("Session 1 received rtcp packet:\n" + packet);
latch.countDown();
}
+
+ @Override
+ public void appDataReceived(RtpSession session, AppDataPacket appDataPacket) {
+ fail("Unexpected APP_DATA packet received");
+ }
});
RtpParticipant local2 = new RtpParticipant("127.0.0.1", 7000, 7001, 2);
RtpParticipant remote2 = new RtpParticipant("127.0.0.1", 6000, 6001, 1);
this.session2 = new SingleParticipantSession("Session2", 8, local2, remote2);
+ this.session2.setAutomatedRtcpHandling(false);
assertTrue(this.session2.init());
SourceDescriptionPacket sdesPacket = new SourceDescriptionPacket();
@@ -92,4 +101,180 @@ public void controlPacketReceived(RtpSession session, CompoundControlPacket pack
fail("Exception caught: " + e.getClass().getSimpleName() + " - " + e.getMessage());
}
}
+
+ @Test
+ public void testSendAndNotReceiveForAutomatedRtcpSession() {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ RtpParticipant local1 = new RtpParticipant("127.0.0.1", 6000, 6001, 1);
+ this.session1 = new MultiParticipantSession("Session1", 8, local1);
+ this.session1.setAutomatedRtcpHandling(true);
+ assertTrue(this.session1.init());
+ this.session1.addControlListener(new RtpSessionControlListener() {
+ @Override
+ public void controlPacketReceived(RtpSession session, CompoundControlPacket packet) {
+ fail("Shouldn't have received this packet...");
+ }
+
+ @Override
+ public void appDataReceived(RtpSession session, AppDataPacket appDataPacket) {
+ fail("Unexpected APP_DATA packet received");
+ }
+ });
+
+ RtpParticipant local2 = new RtpParticipant("127.0.0.1", 7000, 7001, 2);
+ RtpParticipant remote2 = new RtpParticipant("127.0.0.1", 6000, 6001, 1);
+ this.session2 = new SingleParticipantSession("Session2", 8, local2, remote2);
+ this.session2.setAutomatedRtcpHandling(false);
+ assertTrue(this.session2.init());
+
+ SourceDescriptionPacket sdesPacket = new SourceDescriptionPacket();
+ SdesChunk chunk = new SdesChunk();
+ chunk.setSsrc(2);
+ chunk.addItem(SdesChunkItems.createCnameItem("session2@127.0.0.1:7000"));
+ chunk.addItem(SdesChunkItems.createNameItem("session2"));
+ sdesPacket.addItem(chunk);
+
+ assertTrue(this.session2.sendControlPacket(sdesPacket));
+
+ ByePacket byePacket = new ByePacket();
+ byePacket.addSsrc(2);
+ byePacket.setReasonForLeaving("weeeeeeell it's about time to be hittin' the old dusty trail.");
+ CompoundControlPacket compoundPacket = new CompoundControlPacket(sdesPacket, byePacket);
+
+ assertTrue(this.session2.sendControlPacket(compoundPacket));
+
+ try {
+ assertFalse(latch.await(2000, TimeUnit.MILLISECONDS));
+ } catch (Exception e) {
+ fail("Exception caught: " + e.getClass().getSimpleName() + " - " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSendSdesAndByePackets() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(2);
+
+ RtpParticipant local1 = new RtpParticipant("127.0.0.1", 6000, 6001, 1);
+ this.session1 = new MultiParticipantSession("Session1", 8, local1);
+ this.session1.addEventListener(new RtpSessionEventListener() {
+ @Override
+ public void participantJoinedFromData(RtpSession session, RtpParticipant participant, DataPacket packet) {
+ fail("Unexpected event triggered.");
+ }
+
+ @Override
+ public void participantJoinedFromControl(RtpSession session, RtpParticipant participant, SdesChunk chunk) {
+ System.err.println("Participant joined from SDES chunk: " + chunk);
+ latch.countDown();
+ }
+
+ @Override
+ public void participantDataUpdated(RtpSession session, RtpParticipant participant) {
+ fail("Unexpected event triggered.");
+ }
+
+ @Override
+ public void participantLeft(RtpSession session, RtpParticipant participant) {
+ System.err.println("Participant left: " + participant);
+ latch.countDown();
+ }
+
+ @Override
+ public void resolvedSsrcConflict(RtpSession session, long oldSsrc, long newSsrc) {
+ fail("Unexpected event triggered.");
+ }
+
+ @Override
+ public void sessionTerminated(RtpSession session, Throwable cause) {
+ System.err.println("Session terminated: " + cause.getMessage());
+ }
+ });
+ assertTrue(this.session1.init());
+
+ RtpParticipant local2 = new RtpParticipant("127.0.0.1", 7000, 7001, 2);
+ this.session2 = new MultiParticipantSession("Session2", 8, local2);
+ this.session2.addParticipant(local1);
+ assertTrue(this.session2.init());
+
+ this.session2.terminate();
+
+ try {
+ assertTrue(latch.await(2000, TimeUnit.MILLISECONDS));
+ } catch (Exception e) {
+ fail("Exception caught: " + e.getClass().getSimpleName() + " - " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testUpdateSdes() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(2);
+
+ RtpParticipant local1 = new RtpParticipant("127.0.0.1", 6000, 6001, 1);
+ this.session1 = new MultiParticipantSession("Session1", 8, local1);
+ this.session1.addEventListener(new RtpSessionEventListener() {
+ @Override
+ public void participantJoinedFromData(RtpSession session, RtpParticipant participant, DataPacket packet) {
+ System.err.println("Participant joined from DataPacket: " + packet);
+ latch.countDown();
+ }
+
+ @Override
+ public void participantJoinedFromControl(RtpSession session, RtpParticipant participant, SdesChunk chunk) {
+ fail("Unexpected packet received");
+ }
+
+ @Override
+ public void participantDataUpdated(RtpSession session, RtpParticipant participant) {
+ System.err.println("Participant information updated: " + participant);
+ latch.countDown();
+ }
+
+ @Override
+ public void participantLeft(RtpSession session, RtpParticipant participant) {
+ System.err.println("Participant left: " + participant);
+ }
+
+ @Override
+ public void resolvedSsrcConflict(RtpSession session, long oldSsrc, long newSsrc) {
+ }
+
+ @Override
+ public void sessionTerminated(RtpSession session, Throwable cause) {
+ System.err.println("Session terminated: " + cause.getMessage());
+ }
+ });
+ assertTrue(this.session1.init());
+
+ RtpParticipant local2 = new RtpParticipant("127.0.0.1", 7000, 7001, 2);
+ this.session2 = new MultiParticipantSession("Session2", 8, local2);
+ this.session2.addParticipant(local1);
+ this.session2.setAutomatedRtcpHandling(false);
+ assertTrue(this.session2.init());
+
+ assertTrue(this.session2.sendData(new byte[]{0x45, 0x45, 0x45}, 0, false));
+ SourceDescriptionPacket sdesPacket = new SourceDescriptionPacket();
+ SdesChunk chunk = new SdesChunk();
+ chunk.setSsrc(2);
+ chunk.addItem(SdesChunkItems.createCnameItem("session2@127.0.0.1:7000"));
+ chunk.addItem(SdesChunkItems.createNameItem("session2"));
+ sdesPacket.addItem(chunk);
+
+ assertTrue(this.session2.sendControlPacket(sdesPacket));
+
+ this.session2.terminate();
+
+ try {
+ assertTrue(latch.await(2000, TimeUnit.MILLISECONDS));
+ } catch (Exception e) {
+ fail("Exception caught: " + e.getClass().getSimpleName() + " - " + e.getMessage());
+ }
+
+ RtpParticipantContext context = this.session1.getRemoteParticipant(2);
+ assertNotNull(context);
+ RtpParticipant participant = context.getParticipant();
+ assertEquals(2, participant.getSsrc());
+ assertEquals("session2@127.0.0.1:7000", participant.getCname());
+ assertEquals("session2", participant.getName());
+ }
}
View
2 ...unctionaltest/java/org/factor45/efflux/session/MultiParticipantSessionFunctionalTest.java
@@ -86,7 +86,7 @@ public void dataPacketReceived(RtpSession session, RtpParticipant participant, D
}
for (byte i = 0; i < N; i++) {
- this.sessions[i].sendData(new byte[]{(byte) 0xde, (byte) 0xad, (byte) 0xbe, (byte) 0xef}, i);
+ this.sessions[i].sendData(new byte[]{(byte) 0xde, (byte) 0xad, (byte) 0xbe, (byte) 0xef}, i, false);
}
latch.await(5000L, TimeUnit.MILLISECONDS);
View
18 ...nctionaltest/java/org/factor45/efflux/session/SingleParticipantSessionFunctionalTest.java
@@ -17,6 +17,7 @@
package org.factor45.efflux.session;
import org.factor45.efflux.packet.DataPacket;
+import org.factor45.efflux.packet.SdesChunk;
import org.junit.After;
import org.junit.Test;
@@ -167,7 +168,7 @@ public void dataPacketReceived(RtpSession session, RtpParticipant participant, D
this.session2 = new SingleParticipantSession("Session2", 8, local2, remote2) {
@Override
public boolean sendDataPacket(DataPacket packet) {
- if (!this.initialised) {
+ if (!this.running) {
return false;
}
@@ -214,7 +215,11 @@ public void participantJoinedFromData(RtpSession session, RtpParticipant partici
}
@Override
- public void participantJoinedFromControl(RtpSession session, RtpParticipant participant, DataPacket packet) {
+ public void participantJoinedFromControl(RtpSession session, RtpParticipant participant, SdesChunk chunk) {
+ }
+
+ @Override
+ public void participantDataUpdated(RtpSession session, RtpParticipant participant) {
}
@Override
@@ -226,6 +231,11 @@ public void resolvedSsrcConflict(RtpSession session, long oldSsrc, long newSsrc)
System.err.println("Resolved SSRC conflict, local SSRC was " + oldSsrc + " and now is " + newSsrc);
latch.countDown();
}
+
+ @Override
+ public void sessionTerminated(RtpSession session, Throwable cause) {
+ System.err.println("Session terminated: " + cause.getMessage());
+ }
});
RtpParticipant local2 = new RtpParticipant("127.0.0.1", 7000, 7001, 2);
@@ -241,14 +251,14 @@ public void dataPacketReceived(RtpSession session, RtpParticipant participant, D
});
long oldSsrc = this.session1.getLocalParticipant().getSsrc();
- assertTrue(this.session2.sendData(new byte[]{0x45, 0x45, 0x45, 0x45}, 6969));
+ assertTrue(this.session2.sendData(new byte[]{0x45, 0x45, 0x45, 0x45}, 6969, false));
assertTrue(latch.await(1000L, TimeUnit.MILLISECONDS));
// Make sure SSRC was updated and send it to S1 to ensure it received the expected SSRC
assertTrue(oldSsrc != this.session1.getLocalParticipant().getSsrc());
assertEquals(1, this.session2.getRemoteParticipant().getSsrc());
- assertTrue(this.session1.sendData(new byte[]{0x45, 0x45, 0x45, 0x45}, 6969));
+ assertTrue(this.session1.sendData(new byte[]{0x45, 0x45, 0x45, 0x45}, 6969, false));
assertTrue(latch2.await(1000L, TimeUnit.MILLISECONDS));
View
85 src/main/java/org/factor45/efflux/packet/AbstractReportPacket.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2010 Bruno de Carvalho
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.factor45.efflux.packet;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * @author <a:mailto="bruno.carvalho@wit-software.com" />Bruno de Carvalho</a>
+ */
+public abstract class AbstractReportPacket extends ControlPacket {
+
+ // internal vars --------------------------------------------------------------------------------------------------
+
+ protected long senderSsrc;
+ protected List<ReceptionReport> receptionReports;
+
+ // constructors ---------------------------------------------------------------------------------------------------
+
+ protected AbstractReportPacket(Type type) {
+ super(type);
+ }
+
+ // public methods -------------------------------------------------------------------------------------------------
+
+ public boolean addReceptionReportBlock(ReceptionReport block) {
+ if (this.receptionReports == null) {
+ this.receptionReports = new ArrayList<ReceptionReport>();
+ return this.receptionReports.add(block);
+ }
+
+ // 5 bits is the limit
+ return (this.receptionReports.size() < 31) && this.receptionReports.add(block);
+ }
+
+ public byte getReceptionReportCount() {
+ if (this.receptionReports == null) {
+ return 0;
+ }
+
+ return (byte) this.receptionReports.size();
+ }
+
+ // getters & setters ----------------------------------------------------------------------------------------------
+
+ public long getSenderSsrc() {
+ return senderSsrc;
+ }
+
+ public void setSenderSsrc(long senderSsrc) {
+ if ((senderSsrc < 0) || (senderSsrc > 0xffffffffL)) {
+ throw new IllegalArgumentException("Valid range for SSRC is [0;0xffffffff]");
+ }
+ this.senderSsrc = senderSsrc;
+ }
+
+ public List<ReceptionReport> getReceptionReports() {
+ if (this.receptionReports == null) {
+ return null;
+ }
+ return Collections.unmodifiableList(this.receptionReports);
+ }
+
+ public void setReceptionReports(List<ReceptionReport> receptionReports) {
+ if (receptionReports.size() >= 31) {
+ throw new IllegalArgumentException("At most 31 report blocks can be sent in a *ReportPacket");
+ }
+ this.receptionReports = receptionReports;
+ }
+}
View
49 src/main/java/org/factor45/efflux/packet/AppDataPacket.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 Bruno de Carvalho
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.factor45.efflux.packet;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * @author <a:mailto="bruno.carvalho@wit-software.com" />Bruno de Carvalho</a>
+ */
+public class AppDataPacket extends ControlPacket {
+
+ // constructors ---------------------------------------------------------------------------------------------------
+
+ public AppDataPacket(Type type) {
+ super(type);
+ }
+
+ // public static methods ------------------------------------------------------------------------------------------
+
+ public static ChannelBuffer encode(int currentCompoundLength, int fixedBlockSize, AppDataPacket packet) {
+ return null;
+ }
+
+ // ControlPacket --------------------------------------------------------------------------------------------------
+
+ @Override
+ public ChannelBuffer encode(int currentCompoundLength, int fixedBlockSize) {
+ return encode(currentCompoundLength, fixedBlockSize, this);
+ }
+
+ @Override
+ public ChannelBuffer encode() {
+ return encode(0, 0, this);
+ }
+}
View
69 src/main/java/org/factor45/efflux/packet/DataPacket.java
@@ -44,16 +44,13 @@
* | header extension |
* | .... |
*
- * // TODO padding, like RTCP.
- *
* @author <a href="http://bruno.factor45.org/">Bruno de Carvalho</a>
*/
public class DataPacket {
// internal vars --------------------------------------------------------------------------------------------------
private RtpVersion version;
- private boolean padding;
private boolean marker;
private int payloadType;
private int sequenceNumber;
@@ -68,7 +65,7 @@
private ChannelBuffer data;
// constructors ---------------------------------------------------------------------------------------------------
-
+
public DataPacket() {
this.version = RtpVersion.V2;
}
@@ -88,7 +85,7 @@ public static DataPacket decode(ChannelBuffer buffer) throws IndexOutOfBoundsExc
DataPacket packet = new DataPacket();
byte b = buffer.readByte();
packet.version = RtpVersion.fromByte(b);
- packet.padding = (b & 0x20) > 0; // mask 0010 0000
+ boolean padding = (b & 0x20) > 0; // mask 0010 0000
boolean extension = (b & 0x10) > 0; // mask 0001 0000
int contributingSourcesCount = b & 0x0f; // mask 0000 1111
@@ -117,27 +114,51 @@ public static DataPacket decode(ChannelBuffer buffer) throws IndexOutOfBoundsExc
}
}
- // Assume remaining data is the packet
- byte[] remainingBytes = new byte[buffer.readableBytes()];
- buffer.readBytes(remainingBytes);
- packet.setData(remainingBytes);
+ if (!padding) {
+ // No padding used, assume remaining data is the packet
+ byte[] remainingBytes = new byte[buffer.readableBytes()];
+ buffer.readBytes(remainingBytes);
+ packet.setData(remainingBytes);
+ } else {
+ // Padding bit was set, so last byte contains the number of padding octets that should be discarded.
+ short lastByte = buffer.getUnsignedByte(buffer.readerIndex() + buffer.readableBytes() - 1);
+ byte[] dataBytes = new byte[buffer.readableBytes() - lastByte];
+ buffer.readBytes(dataBytes);
+ packet.setData(dataBytes);
+ // Discard rest of buffer.
+ buffer.skipBytes(buffer.readableBytes());
+ }
return packet;
}
- public static ChannelBuffer encode(DataPacket packet) {
+ public static ChannelBuffer encode(int fixedBlockSize, DataPacket packet) {
int size = 12; // Fixed width
if (packet.hasExtension()) {
size += 4 + packet.getExtensionDataSize();
}
size += packet.getContributingSourcesCount() * 4;
size += packet.getDataSize();
+ // If packet was configured to have padding (fixed block size), calculate padding and add it.
+ int padding = 0;
+ if (fixedBlockSize > 0) {
+ // If padding modulus is > 0 then the padding is equal to:
+ // (global size of the compound RTCP packet) mod (block size)
+ // Block size alignment might be necessary for some encryption algorithms
+ // RFC section 6.4.1
+ padding = fixedBlockSize - (size % fixedBlockSize);
+ if (padding == fixedBlockSize) {
+ padding = 0;
+ }
+ }
+ size += padding;
+
ChannelBuffer buffer = ChannelBuffers.buffer(size);
// Version, Padding, eXtension, CSRC Count
byte b = packet.getVersion().getByte();
- if (packet.hasPadding()) {
+ if (padding > 0) {
b |= 0x20;
}
if (packet.hasExtension()) {
@@ -176,13 +197,28 @@ public static ChannelBuffer encode(DataPacket packet) {
buffer.writeBytes(packet.data.array());
}
+ if (padding > 0) {
+ // Final bytes: padding
+ for (int i = 0; i < (padding - 1); i++) {
+ buffer.writeByte(0x00);
+ }
+
+ // Final byte: the amount of padding bytes that should be discarded.
+ // Unless something's wrong, it will be a multiple of 4.
+ buffer.writeByte(padding);
+ }
+
return buffer;
}
// public methods -------------------------------------------------------------------------------------------------
+ public ChannelBuffer encode(int fixedBlockSize) {
+ return encode(fixedBlockSize, this);
+ }
+
public ChannelBuffer encode() {
- return encode(this);
+ return encode(0, this);
}
public void addContributingSourceId(long contributingSourceId) {
@@ -238,14 +274,6 @@ public void setVersion(RtpVersion version) {
this.version = version;
}
- public boolean hasPadding() {
- return padding;
- }
-
- public void setPadding(boolean padding) {
- this.padding = padding;
- }
-
public boolean hasExtension() {
return this.extensionData != null;
}
@@ -334,7 +362,6 @@ public void setData(byte[] data) {
public String toString() {
return new StringBuilder()
.append("DataPacket{V=").append(this.version)
- .append(", P=").append(this.padding)
.append(", X=").append(this.hasExtension())
.append(", CC=").append(this.getContributingSourcesCount())
.append(", M=").append(this.marker)
View
123 src/main/java/org/factor45/efflux/packet/ReceiverReportPacket.java
@@ -17,20 +17,12 @@
package org.factor45.efflux.packet;
import org.jboss.netty.buffer.ChannelBuffer;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import org.jboss.netty.buffer.ChannelBuffers;
/**
* @author <a:mailto="bruno.carvalho@wit-software.com" />Bruno de Carvalho</a>
*/
-public class ReceiverReportPacket extends ControlPacket {
-
- // internal vars --------------------------------------------------------------------------------------------------
-
- private long senderSsrc;
- private List<ReceptionReport> receptionReports;
+public class ReceiverReportPacket extends AbstractReportPacket {
// constructors ---------------------------------------------------------------------------------------------------
@@ -47,7 +39,7 @@ public static ReceiverReportPacket decode(ChannelBuffer buffer, boolean hasPaddi
int read = 4;
for (int i = 0; i < innerBlocks; i++) {
- packet.addSenderReport(ReceptionReport.decode(buffer));
+ packet.addReceptionReportBlock(ReceptionReport.decode(buffer));
read += 24; // Each SR/RR block has 24 bytes (6 32bit words)
}
@@ -63,63 +55,82 @@ public static ReceiverReportPacket decode(ChannelBuffer buffer, boolean hasPaddi
return packet;
}
- // ControlPacket --------------------------------------------------------------------------------------------------
-
- @Override
- public ChannelBuffer encode(int currentCompoundLength, int fixedBlockSize) {
- return null;
- }
-
- @Override
- public ChannelBuffer encode() {
- return null;
- }
-
- // public methods -------------------------------------------------------------------------------------------------
-
- public boolean addSenderReport(ReceptionReport block) {
- if (this.receptionReports == null) {
- this.receptionReports = new ArrayList<ReceptionReport>();
- return this.receptionReports.add(block);
+ public static ChannelBuffer encode(int currentCompoundLength, int fixedBlockSize, ReceiverReportPacket packet) {
+ if ((currentCompoundLength < 0) || ((currentCompoundLength % 4) > 0)) {
+ throw new IllegalArgumentException("Current compound length must be a non-negative multiple of 4");
+ }
+ if ((fixedBlockSize < 0) || ((fixedBlockSize % 4) > 0)) {
+ throw new IllegalArgumentException("Padding modulus must be a non-negative multiple of 4");
}
- // 5 bits is the limit
- return (this.receptionReports.size() < 31) && this.receptionReports.add(block);
- }
+ // Common header + sender ssrc
+ int size = 4 + 4;
+ ChannelBuffer buffer;
+ if (packet.receptionReports != null) {
+ size += packet.receptionReports.size() * 24;
+ }
- public byte getReceptionReportCount() {
- if (this.receptionReports == null) {
- return 0;
+ // If packet was configured to have padding, calculate padding and add it.
+ int padding = 0;
+ if (fixedBlockSize > 0) {
+ // If padding modulus is > 0 then the padding is equal to:
+ // (global size of the compound RTCP packet) mod (block size)
+ // Block size alignment might be necessary for some encryption algorithms
+ // RFC section 6.4.1
+ padding = fixedBlockSize - ((size + currentCompoundLength) % fixedBlockSize);
+ if (padding == fixedBlockSize) {
+ padding = 0;
+ }
+ }
+ size += padding;
+
+ // Allocate the buffer and write contents
+ buffer = ChannelBuffers.buffer(size);
+ // First byte: Version (2b), Padding (1b), RR count (5b)
+ byte b = packet.getVersion().getByte();
+ if (padding > 0) {
+ b |= 0x20;
+ }
+ b |= packet.getReceptionReportCount();
+ buffer.writeByte(b);
+ // Second byte: Packet Type
+ buffer.writeByte(packet.type.getByte());
+ // Third byte: total length of the packet, in multiples of 4 bytes (32bit words) - 1
+ int sizeInOctets = (size / 4) - 1;
+ buffer.writeShort(sizeInOctets);
+ // Next 24 bytes: ssrc, ntp timestamp, rtp timestamp, octet count, packet count
+ buffer.writeInt((int) packet.senderSsrc);
+ // Payload: report blocks
+ if (packet.getReceptionReportCount() > 0) {
+ for (ReceptionReport block : packet.receptionReports) {
+ buffer.writeBytes(block.encode());
+ }
}
- return (byte) this.receptionReports.size();
- }
+ if (padding > 0) {
+ // Final bytes: padding
+ for (int i = 0; i < (padding - 1); i++) {
+ buffer.writeByte(0x00);
+ }
- // getters & setters ----------------------------------------------------------------------------------------------
+ // Final byte: the amount of padding bytes that should be discarded.
+ // Unless something's wrong, it will be a multiple of 4.
+ buffer.writeByte(padding);
+ }
- public long getSenderSsrc() {
- return senderSsrc;
+ return buffer;
}
- public void setSenderSsrc(long senderSsrc) {
- if ((senderSsrc < 0) || (senderSsrc > 0xffffffffL)) {
- throw new IllegalArgumentException("Valid range for SSRC is [0;0xffffffff]");
- }
- this.senderSsrc = senderSsrc;
- }
+ // ControlPacket --------------------------------------------------------------------------------------------------
- public List<ReceptionReport> getReceptionReports() {
- if (this.receptionReports == null) {
- return null;
- }
- return Collections.unmodifiableList(this.receptionReports);
+ @Override
+ public ChannelBuffer encode(int currentCompoundLength, int fixedBlockSize) {
+ return encode(currentCompoundLength, fixedBlockSize, this);
}
- public void setReceptionReports(List<ReceptionReport> receptionReports) {
- if (receptionReports.size() >= 31) {
- throw new IllegalArgumentException("At most 31 report blocks can be sent in a ReceiverReportPacket");
- }
- this.receptionReports = receptionReports;
+ @Override
+ public ChannelBuffer encode() {
+ return encode(0, 0, this);
}
// low level overrides --------------------------------------------------------------------------------------------
View
53 src/main/java/org/factor45/efflux/packet/SenderReportPacket.java
@@ -19,24 +19,18 @@
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
/**
* @author <a:mailto="bruno.carvalho@wit-software.com" />Bruno de Carvalho</a>
*/
-public class SenderReportPacket extends ControlPacket {
+public class SenderReportPacket extends AbstractReportPacket {
// internal vars --------------------------------------------------------------------------------------------------
- private long senderSsrc;
// TODO this might not be a long...
private long ntpTimestamp;
private long rtpTimestamp;
private long senderPacketCount;
private long senderOctetCount;
- private List<ReceptionReport> receptionReports;
// constructors ---------------------------------------------------------------------------------------------------
@@ -155,39 +149,8 @@ public ChannelBuffer encode() {
return encode(0, 0, this);
}
- // public methods -------------------------------------------------------------------------------------------------
-
- public boolean addReceptionReportBlock(ReceptionReport block) {
- if (this.receptionReports == null) {
- this.receptionReports = new ArrayList<ReceptionReport>();
- return this.receptionReports.add(block);
- }
-
- // 5 bits is the limit
- return (this.receptionReports.size() < 31) && this.receptionReports.add(block);
- }
-
- public byte getReceptionReportCount() {
- if (this.receptionReports == null) {
- return 0;
- }
-
- return (byte) this.receptionReports.size();
- }
-
// getters & setters ----------------------------------------------------------------------------------------------
- public long getSenderSsrc() {
- return senderSsrc;
- }
-
- public void setSenderSsrc(long senderSsrc) {
- if ((senderSsrc < 0) || (senderSsrc > 0xffffffffL)) {
- throw new IllegalArgumentException("Valid range for SSRC is [0;0xffffffff]");
- }
- this.senderSsrc = senderSsrc;
- }
-
public long getNtpTimestamp() {
return ntpTimestamp;
}
@@ -233,20 +196,6 @@ public void setSenderOctetCount(long senderOctetCount) {
this.senderOctetCount = senderOctetCount;
}
- public List<ReceptionReport> getReceptionReports() {
- if (this.receptionReports == null) {
- return null;
- }
- return Collections.unmodifiableList(this.receptionReports);
- }
-
- public void setReceptionReports(List<ReceptionReport> receptionReports) {
- if (receptionReports.size() >= 31) {
- throw new IllegalArgumentException("At most 31 report blocks can be sent in a SenderReportPacket");
- }
- this.receptionReports = receptionReports;
- }
-
// low level overrides --------------------------------------------------------------------------------------------
@Override
View
558 src/main/java/org/factor45/efflux/session/AbstractRtpSession.java
@@ -23,9 +23,18 @@
import org.factor45.efflux.network.DataHandler;
import org.factor45.efflux.network.DataPacketDecoder;
import org.factor45.efflux.network.DataPacketEncoder;
+import org.factor45.efflux.packet.AbstractReportPacket;
+import org.factor45.efflux.packet.AppDataPacket;
+import org.factor45.efflux.packet.ByePacket;
import org.factor45.efflux.packet.CompoundControlPacket;
import org.factor45.efflux.packet.ControlPacket;
import org.factor45.efflux.packet.DataPacket;
+import org.factor45.efflux.packet.ReceiverReportPacket;
+import org.factor45.efflux.packet.ReceptionReport;
+import org.factor45.efflux.packet.SdesChunk;
+import org.factor45.efflux.packet.SdesChunkItems;
+import org.factor45.efflux.packet.SenderReportPacket;
+import org.factor45.efflux.packet.SourceDescriptionPacket;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
@@ -36,12 +45,19 @@
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
+import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @author <a:mailto="bruno.carvalho@wit-software.com" />Bruno de Carvalho</a>
@@ -51,13 +67,17 @@
// constants ------------------------------------------------------------------------------------------------------
protected static final Logger LOG = Logger.getLogger(SingleParticipantSession.class);
+ protected static final String VERSION = "efflux_0.2_13092010";
// configuration defaults -----------------------------------------------------------------------------------------
- protected static final boolean USE_NIO = false;
+ // TODO not working with USE_NIO = false
+ protected static final boolean USE_NIO = true;
protected static final boolean DISCARD_OUT_OF_ORDER = true;
protected static final int SEND_BUFFER_SIZE = 1500;
protected static final int RECEIVE_BUFFER_SIZE = 1500;
+ protected static final int MAX_COLLISIONS_BEFORE_CONSIDERING_LOOP = 3;
+ protected static final boolean AUTOMATED_RTCP_HANDLING = true;
// configuration --------------------------------------------------------------------------------------------------
@@ -67,21 +87,26 @@
protected boolean discardOutOfOrder;
protected int sendBufferSize;
protected int receiveBufferSize;
+ protected int maxCollisionsBeforeConsideringLoop;
+ protected boolean automatedRtcpHandling;
// internal vars --------------------------------------------------------------------------------------------------
protected final RtpParticipant localParticipant;
+ protected final Map<Long, DefaultRtpParticipantContext> participantTable;
protected final List<RtpSessionDataListener> dataListeners;
protected final List<RtpSessionControlListener> controlListeners;
protected final List<RtpSessionEventListener> eventListeners;
protected boolean useNio;
- protected boolean initialised;
+ protected boolean running;
protected ConnectionlessBootstrap dataBootstrap;
protected ConnectionlessBootstrap controlBootstrap;
protected DatagramChannel dataChannel;
protected DatagramChannel controlChannel;
protected final AtomicInteger sequence;
protected final AtomicBoolean sentOrReceivedPackets;
+ protected final ReentrantReadWriteLock lock;
+ protected final AtomicInteger collisions;
// constructors ---------------------------------------------------------------------------------------------------
@@ -93,17 +118,22 @@ public AbstractRtpSession(String id, int payloadType, RtpParticipant local) {
this.id = id;
this.payloadType = payloadType;
this.localParticipant = local;
+ this.participantTable = new HashMap<Long, DefaultRtpParticipantContext>();
this.dataListeners = new CopyOnWriteArrayList<RtpSessionDataListener>();
this.controlListeners = new CopyOnWriteArrayList<RtpSessionControlListener>();
this.eventListeners = new CopyOnWriteArrayList<RtpSessionEventListener>();
this.sequence = new AtomicInteger(0);
this.sentOrReceivedPackets = new AtomicBoolean(false);
+ this.lock = new ReentrantReadWriteLock();
+ this.collisions = new AtomicInteger(0);
this.useNio = USE_NIO;
this.discardOutOfOrder = DISCARD_OUT_OF_ORDER;
this.sendBufferSize = SEND_BUFFER_SIZE;
this.receiveBufferSize = RECEIVE_BUFFER_SIZE;
+ this.maxCollisionsBeforeConsideringLoop = MAX_COLLISIONS_BEFORE_CONSIDERING_LOOP;
+ this.automatedRtcpHandling = AUTOMATED_RTCP_HANDLING;
}
// RtpSession -----------------------------------------------------------------------------------------------------
@@ -174,41 +204,34 @@ public ChannelPipeline getPipeline() throws Exception {
}
LOG.debug("Data & Control channels bound for RtpSession with id {}.", this.id);
- return (this.initialised = true);
+ // Send first RTCP packet.
+ this.joinSession(this.localParticipant.getSsrc());
+ return (this.running = true);
}
@Override
- public synchronized void terminate() {
- if (!this.initialised) {
- return;
- }
-
- this.dataChannel.close();
- this.controlChannel.close();
- this.dataBootstrap.releaseExternalResources();
- this.controlBootstrap.releaseExternalResources();
- LOG.debug("RtpSession with id {} terminated.", this.id);
-
- this.initialised = false;
+ public void terminate() {
+ this.terminate(RtpSessionEventListener.TERMINATE_CALLED);
}
@Override
- public boolean sendData(byte[] data, long timestamp) {
- if (!this.initialised) {
+ public boolean sendData(byte[] data, long timestamp, boolean marked) {
+ if (!this.running) {
return false;
}
DataPacket packet = new DataPacket();
// Other fields will be set by sendDataPacket()
packet.setTimestamp(timestamp);
packet.setData(data);
+ packet.setMarker(marked);
return this.sendDataPacket(packet);
}
@Override
public boolean sendDataPacket(DataPacket packet) {
- if (!this.initialised) {
+ if (!this.running) {
return false;
}
@@ -220,12 +243,23 @@ public boolean sendDataPacket(DataPacket packet) {
@Override
public boolean sendControlPacket(ControlPacket packet) {
- return this.initialised && this.internalSendControl(packet);
+ // Only allow sending explicit RTCP packets if all the following conditions are met:
+ // 1. session is running
+ // 2. automated rtcp handling is disabled (except for APP_DATA packets)
+ if (!this.running) {
+ return false;
+ }
+
+ if (ControlPacket.Type.APP_DATA.equals(packet.getType())) {
+ return this.internalSendControl(packet);
+ }
+
+ return !this.automatedRtcpHandling && this.internalSendControl(packet);
}
@Override
public boolean sendControlPacket(CompoundControlPacket packet) {
- return this.initialised && this.internalSendControl(packet);
+ return this.running && !this.automatedRtcpHandling && this.internalSendControl(packet);
}
@Override
@@ -234,6 +268,67 @@ public RtpParticipant getLocalParticipant() {
}
@Override
+ public boolean addParticipant(RtpParticipant remoteParticipant) {
+ if (remoteParticipant.getSsrc() == this.localParticipant.getSsrc()) {
+ return false;
+ }
+
+ this.lock.writeLock().lock();
+ try {
+ DefaultRtpParticipantContext context = this.participantTable.get(remoteParticipant.getSsrc());
+ if (context == null) {
+ context = new DefaultRtpParticipantContext(remoteParticipant);
+ this.participantTable.put(remoteParticipant.getSsrc(), context);
+ return true;
+ }
+ return false;
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+
+ }
+
+ @Override
+ public RtpParticipantContext removeParticipant(long ssrc) {
+ this.lock.writeLock().lock();
+ try {
+ DefaultRtpParticipantContext context = this.participantTable.remove(ssrc);
+ if (context == null) {
+ return null;
+ }
+
+ return context;
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public RtpParticipantContext getRemoteParticipant(long ssrc) {
+ this.lock.readLock().lock();
+ try {
+ DefaultRtpParticipantContext context = this.participantTable.get(ssrc);
+ if (context == null) {
+ return null;
+ }
+
+ return context;
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public Collection<RtpParticipantContext> getRemoteParticipants() {
+ this.lock.readLock().lock();
+ try {
+ return Collections.<RtpParticipantContext>unmodifiableCollection(this.participantTable.values());
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ @Override
public void addDataListener(RtpSessionDataListener listener) {
this.dataListeners.add(listener);
}
@@ -267,13 +362,27 @@ public void removeEventListener(RtpSessionEventListener listener) {
@Override
public void dataPacketReceived(SocketAddress origin, DataPacket packet) {
+ if (!this.running) {
+ return;
+ }
+
if (packet.getPayloadType() != this.payloadType) {
// Silently discard packets of wrong payload.
return;
}
if (packet.getSsrc() == this.localParticipant.getSsrc()) {
+ // Sending data to ourselves? Consider this a loop and bail out!
+ if (origin.equals(this.localParticipant.getDataAddress())) {
+ this.terminate(new Throwable("Loop detected: session is directly receiving its own packets"));
+ return;
+ } else if (this.collisions.incrementAndGet() > this.maxCollisionsBeforeConsideringLoop) {
+ this.terminate(new Throwable("Loop detected after " + this.collisions.get() + " SSRC collisions"));
+ return;
+ }
+
long oldSsrc = this.localParticipant.getSsrc();
+ long newSsrc = this.localParticipant.resolveSsrcConflict(packet.getSsrc());
// A collision has been detected after packets were sent, resolve by updating the local SSRC and sending
// a BYE RTCP packet for the old SSRC.
@@ -283,25 +392,21 @@ public void dataPacketReceived(SocketAddress origin, DataPacket packet) {
// and avoided).
// http://tools.ietf.org/html/rfc3550#section-8.1, last paragraph
if (this.sentOrReceivedPackets.getAndSet(true)) {
- // TODO create and send RTCP BYE
+ this.leaveSession(oldSsrc, "SSRC collision detected; rejoining with new SSRC.");
+ this.joinSession(newSsrc);
}
LOG.warn("SSRC collision with remote end detected on session with id {}; updating SSRC from {} to {}.",
- this.id, oldSsrc, this.localParticipant.resolveSsrcConflict(packet.getSsrc()));
+ this.id, oldSsrc, newSsrc);
for (RtpSessionEventListener listener : this.eventListeners) {
- listener.resolvedSsrcConflict(this, oldSsrc, this.localParticipant.getSsrc());
+ listener.resolvedSsrcConflict(this, oldSsrc, newSsrc);
}
}
// Associate the packet with a participant or create one.
- RtpParticipantContext context = this.getContext(origin, packet);
+ DefaultRtpParticipantContext context = this.getOrCreateContextFromDataPacket(origin, packet);
if (context == null) {
- // Implementations of this class SHOULD never return null here...
- return;
- }
-
- if (!this.doBeforeDataReceivedValidation(packet)) {
- // Subclass does not want to proceed due to some check failing.
+ // Subclasses may chose not to create anything, in which case this packet must be discarded.
return;
}
@@ -316,34 +421,224 @@ public void dataPacketReceived(SocketAddress origin, DataPacket packet) {
// We trust the SSRC rather than the ip/port to identify participants (mostly because of NAT).
context.setLastSequenceNumber(packet.getSequenceNumber());
if (!origin.equals(context.getParticipant().getDataAddress())) {
- context.getParticipant().updateRtpAddress(origin);
+ // I know. RFC mandates against this. Experience, however, tells me otherwise.
+ context.getParticipant().updateDataAddress(origin);
LOG.debug("Updated RTP address for {} to {} (session id: {}).", context.getParticipant(), origin, this.id);
}
- if (!this.doAfterDataReceivedValidation(origin)) {
- // Subclass does not want to proceed due to some check failing.
- return;
- }
-
// Finally, dispatch the event to the data listeners.
for (RtpSessionDataListener listener : this.dataListeners) {
listener.dataPacketReceived(this, context.getParticipant(), packet);
}
}
+ // ControlPacketReceiver ------------------------------------------------------------------------------------------
+
+ @Override
+ public void controlPacketReceived(SocketAddress origin, CompoundControlPacket packet) {
+ if (!this.running) {
+ return;
+ }
+
+ if (!this.automatedRtcpHandling) {
+ for (RtpSessionControlListener listener : this.controlListeners) {
+ listener.controlPacketReceived(this, packet);
+ }
+
+ return;
+ }
+
+ for (ControlPacket controlPacket : packet.getControlPackets()) {
+ switch (controlPacket.getType()) {
+ case SENDER_REPORT:
+ case RECEIVER_REPORT:
+ this.handleReportPacket(origin, (AbstractReportPacket) controlPacket);
+ break;
+ case SOURCE_DESCRIPTION:
+ this.handleSdesPacket(origin, (SourceDescriptionPacket) controlPacket);
+ break;
+ case BYE:
+ this.handleByePacket(origin, (ByePacket) controlPacket);
+ break;
+ case APP_DATA:
+ for (RtpSessionControlListener listener : this.controlListeners) {
+ listener.appDataReceived(this, (AppDataPacket) controlPacket);
+ }
+ default:
+ // do nothing, unknown case
+ }
+ }
+ }
+
+ protected void handleReportPacket(SocketAddress origin, AbstractReportPacket abstractReportPacket) {
+ if (abstractReportPacket.getReceptionReportCount() == 0) {
+ return;
+ }
+
+ DefaultRtpParticipantContext context = this.getExistingContext(abstractReportPacket.getSenderSsrc());
+ if (context == null) {
+ // Ignore; RTCP-SDES or RTP packet must first be received.
+ return;
+ }
+
+ for (ReceptionReport receptionReport : abstractReportPacket.getReceptionReports()) {
+ // Ignore all reception reports except for the one who pertains to the local participant (only data that
+ // matters here is the link between this participant and ourselves).
+ if (receptionReport.getSsrc() == this.localParticipant.getSsrc()) {
+ // TODO
+ }
+ }
+
+ // For sender reports, also handle the sender information.
+ if (abstractReportPacket.getType().equals(ControlPacket.Type.SENDER_REPORT)) {
+ SenderReportPacket senderReport = (SenderReportPacket) abstractReportPacket;
+ // TODO
+ }
+ }
+
+ protected void handleSdesPacket(SocketAddress origin, SourceDescriptionPacket packet) {
+ for (SdesChunk chunk : packet.getChunks()) {
+ DefaultRtpParticipantContext context = this.getOrCreateContextFromSdesChunk(origin, chunk);
+ if (context == null) {
+ continue;
+ }
+ if (!context.hasReceivedSdes()) {
+ // If this participant wasn't created from an SDES packet, then update its participant's description.
+ if (context.getParticipant().updateFromSdesChunk(chunk)) {
+ for (RtpSessionEventListener listener : this.eventListeners) {
+ listener.participantDataUpdated(this, context.getParticipant());
+ }
+ }
+ }
+ // I know. RFC mandates against this. Experience, however, tells me otherwise.
+ if (!origin.equals(context.getParticipant().getControlAddress())) {
+ context.getParticipant().updateControlAddress(origin);
+ LOG.debug("Updated RTCP address for {} to {} (session id: {}).",
+ context.getParticipant(), origin, this.id);
+ }
+ }
+ }
+
+ protected void handleByePacket(SocketAddress origin, ByePacket packet) {
+ for (Long ssrc : packet.getSsrcList()) {
+ DefaultRtpParticipantContext context = this.getExistingContext(ssrc);
+ if (context != null) {
+ context.byeReceived();
+ for (RtpSessionEventListener listener : eventListeners) {
+ listener.participantLeft(this, context.getParticipant());
+ }
+ }
+ }
+ LOG.trace("Participants with SSRCs {} sent BYE for RtpSession with id {} with reason '{}'.",
+ packet.getSsrcList(), packet. getReasonForLeaving());
+ }
+
// protected helpers ----------------------------------------------------------------------------------------------
- protected abstract boolean internalSendData(DataPacket packet);
+ protected boolean internalSendData(DataPacket packet) {
+ this.lock.readLock().lock();
+ try {
+ for (RtpParticipantContext context : this.participantTable.values()) {
+ if (context.receivedBye()) {
+ continue;
+ }
+ this.writeToData(packet, context.getParticipant().getDataAddress());
+ }
+ return true;
+ } catch (Exception e) {
+ LOG.error("Failed to send RTP packet to participants in session with id {}.", this.id);
+ return false;
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
- protected abstract boolean internalSendControl(ControlPacket packet);
+ protected boolean internalSendControl(ControlPacket packet) {
+ this.lock.readLock().lock();
+ try {
+ for (RtpParticipantContext context : this.participantTable.values()) {
+ if (context.receivedBye()) {
+ continue;
+ }
+ this.writeToControl(packet, context.getParticipant().getControlAddress());
+ }
+ return true;
+ } catch (Exception e) {
+ LOG.error("Failed to send RTCP packet to participants in session with id {}.", this.id);
+ return false;
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
- protected abstract boolean internalSendControl(CompoundControlPacket packet);
+ protected boolean internalSendControl(CompoundControlPacket packet) {
+ this.lock.readLock().lock();
+ try {
+ for (RtpParticipantContext context : this.participantTable.values()) {
+ if (context.receivedBye()) {
+ continue;
+ }
+ this.writeToControl(packet, context.getParticipant().getControlAddress());
+ }
+ return true;
+ } catch (Exception e) {
+ LOG.error("Failed to send RTCP compound packet to participants in session with id {}.", this.id);
+ return false;
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
- protected abstract RtpParticipantContext getContext(SocketAddress origin, DataPacket packet);
+ protected DefaultRtpParticipantContext getOrCreateContextFromDataPacket(SocketAddress origin, DataPacket packet) {
+ // Get or create.
+ this.lock.writeLock().lock();
+ try {
+ DefaultRtpParticipantContext context = this.participantTable.get(packet.getSsrc());
+ if (context == null) {
+ // New participant
+ RtpParticipant participant = RtpParticipant
+ .createFromUnexpectedDataPacket((InetSocketAddress) origin, packet);
+ context = new DefaultRtpParticipantContext(participant);
+ this.participantTable.put(participant.getSsrc(), context);
+
+ LOG.debug("New participant joined session with id {} (from data packet): {}.", this.id, participant);
+ for (RtpSessionEventListener listener : this.eventListeners) {
+ listener.participantJoinedFromData(this, participant, packet);
+ }
+ }
- protected abstract boolean doBeforeDataReceivedValidation(DataPacket packet);
+ return context;
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ }
- protected abstract boolean doAfterDataReceivedValidation(SocketAddress origin);
+ protected DefaultRtpParticipantContext getOrCreateContextFromSdesChunk(SocketAddress origin, SdesChunk chunk) {
+ this.lock.writeLock().lock();
+ try {
+ DefaultRtpParticipantContext context = this.participantTable.get(chunk.getSsrc());
+ if (context == null) {
+ RtpParticipant participant = RtpParticipant.createFromSdesChunk((InetSocketAddress) origin, chunk);
+ context = new DefaultRtpParticipantContext(participant);
+ // Mark SDES packet as received, in order not to update SDES info in the future.
+ context.receivedSdes();
+ this.participantTable.put(participant.getSsrc(), context);
+
+ LOG.debug("New participant joined session with id {} (from SDES chunk): {}.", this.id, participant);
+ for (RtpSessionEventListener listener : this.eventListeners) {
+ listener.participantJoinedFromControl(this, participant, chunk);
+ }
+ }
+
+ return context;
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ }
+
+ protected DefaultRtpParticipantContext getExistingContext(long ssrc) {
+ return this.participantTable.get(ssrc);
+ }
protected void writeToData(DataPacket packet, SocketAddress destination) {
this.dataChannel.write(packet, destination);
@@ -357,14 +652,157 @@ protected void writeToControl(CompoundControlPacket packet, SocketAddress destin
this.controlChannel.write(packet, destination);
}
+ protected void joinSession(long currentSsrc) {
+ if (!this.automatedRtcpHandling) {
+ return;
+ }
+ // Joining a session, so send an empty receiver report.
+ ReceiverReportPacket emptyReceiverReport = new ReceiverReportPacket();
+ emptyReceiverReport.setSenderSsrc(currentSsrc);
+ // Send also an SDES packet in the compound RTCP packet.
+ SourceDescriptionPacket sdesPacket = this.buildSdesPacket(currentSsrc);
+
+ CompoundControlPacket compoundPacket = new CompoundControlPacket(emptyReceiverReport, sdesPacket);
+ this.internalSendControl(compoundPacket);
+ }
+
+ protected void leaveSession(long currentSsrc, String motive) {
+ if (!this.automatedRtcpHandling) {
+ return;
+ }
+ List<CompoundControlPacket> byePackets = this.buildLeavePackets(currentSsrc, motive);
+ for (CompoundControlPacket byePacket : byePackets) {
+ this.internalSendControl(byePacket);
+ }
+ }
+
+ protected List<CompoundControlPacket> buildLeavePackets(long currentSsrc, String motive) {
+ SourceDescriptionPacket sdesPacket = this.buildSdesPacket(currentSsrc);
+ ByePacket byePacket = new ByePacket();
+ byePacket.addSsrc(currentSsrc);
+ byePacket.setReasonForLeaving(motive);
+
+ this.lock.readLock().lock();
+ try {
+ // Strong estimate.
+ int participantCount = this.participantTable.size();
+ List<CompoundControlPacket> compoundPackets = new ArrayList<CompoundControlPacket>(participantCount);
+ for (DefaultRtpParticipantContext context : this.participantTable.values()) {
+ AbstractReportPacket reportPacket = this.buildReportPacket(currentSsrc, context);
+ compoundPackets.add(new CompoundControlPacket(reportPacket, sdesPacket, byePacket));
+ }
+
+ return compoundPackets;
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ private AbstractReportPacket buildReportPacket(long currentSsrc, DefaultRtpParticipantContext context) {
+ AbstractReportPacket packet;
+ if (context.getSentPackets() == 0) {
+ // If no packets were sent to this source, then send a receiver report.
+ packet = new ReceiverReportPacket();
+ } else {
+ // Otherwise, build a sender report.
+ SenderReportPacket senderPacket = new SenderReportPacket();
+ senderPacket.setNtpTimestamp(0); // FIXME
+ senderPacket.setRtpTimestamp(System.currentTimeMillis()); // FIXME
+ senderPacket.setSenderPacketCount(context.getSentPackets());
+ senderPacket.setSenderOctetCount(context.getSentBytes());
+ context.resetSendStats();
+ packet = senderPacket;
+ }
+ packet.setSenderSsrc(currentSsrc);
+
+ // If this source sent data, then calculate the link quality to build a reception report block.
+ if (context.getReceivedPackets() > 0) {
+ ReceptionReport block = new ReceptionReport();
+ block.setSsrc(context.getParticipant().getSsrc());
+ block.setDelaySinceLastSenderReport(0); // FIXME
+ block.setFractionLost((short) 0); // FIXME
+ block.setExtendedHighestSequenceNumberReceived(0); // FIXME
+ block.setInterArrivalJitter(0); // FIXME
+ block.setCumulativeNumberOfPacketsLost(0); // FIXME
+ packet.addReceptionReportBlock(block);
+ }
+
+ return packet;
+ }
+
+ protected SourceDescriptionPacket buildSdesPacket(long currentSsrc) {
+ SourceDescriptionPacket sdesPacket = new SourceDescriptionPacket();
+ SdesChunk chunk = new SdesChunk(currentSsrc);
+
+ if (this.localParticipant.getCname() == null) {
+ this.localParticipant.setCname(new StringBuilder()
+ .append("efflux/").append(this.id).append('@')
+ .append(this.dataChannel.getLocalAddress()).toString());
+ }
+ chunk.addItem(SdesChunkItems.createCnameItem(this.localParticipant.getCname()));
+
+ if (this.localParticipant.getName() != null) {
+ chunk.addItem(SdesChunkItems.createNameItem(this.localParticipant.getName()));
+ }
+
+ if (this.localParticipant.getEmail() != null) {
+ chunk.addItem(SdesChunkItems.createEmailItem(this.localParticipant.getEmail()));
+ }
+
+ if (this.localParticipant.getPhone() != null) {
+ chunk.addItem(SdesChunkItems.createPhoneItem(this.localParticipant.getPhone()));
+ }
+
+ if (this.localParticipant.getLocation() != null) {
+ chunk.addItem(SdesChunkItems.createLocationItem(this.localParticipant.getLocation()));
+ }
+
+ if (this.localParticipant.getTool() == null) {
+ this.localParticipant.setTool(VERSION);
+ }
+ chunk.addItem(SdesChunkItems.createToolItem(this.localParticipant.getTool()));
+
+ if (this.localParticipant.getNote() != null) {
+ chunk.addItem(SdesChunkItems.createLocationItem(this.localParticipant.getNote()));
+ }
+ sdesPacket.addItem(chunk);
+
+ return sdesPacket;
+ }
+
+ protected synchronized void terminate(Throwable cause) {
+ if (!this.running) {
+ return;
+ }
+
+ this.dataListeners.clear();
+ this.controlListeners.clear();
+
+ // Close data channel, send BYE RTCP packets and close control channel.
+ this.dataChannel.close();
+ this.leaveSession(this.localParticipant.getSsrc(), "Session terminated.");
+ this.controlChannel.close();
+
+ this.dataBootstrap.releaseExternalResources();
+ this.controlBootstrap.releaseExternalResources();
+ LOG.debug("RtpSession with id {} terminated.", this.id);
+
+ for (RtpSessionEventListener listener : this.eventListeners) {
+ listener.sessionTerminated(this, cause);
+ }
+ this.eventListeners.clear();
+
+ this.running = false;
+ }
+
// getters & setters ----------------------------------------------------------------------------------------------
public String getHost() {
return host;
}
public void setHost(String host) {
- if (this.initialised) {
+ if (this.running) {
throw new IllegalArgumentException("Cannot modify property after initialisation");
}
this.host = host;
@@ -375,22 +813,22 @@ public boolean useNio() {
}
public void setUseNio(boolean useNio) {
- if (this.initialised) {
+ if (this.running) {
throw new IllegalArgumentException("Cannot modify property after initialisation");
}
this.useNio = useNio;
}
- public boolean isInitialised() {
- return initialised;
+ public boolean isRunning() {
+ return running;
}
public boolean isDiscardOutOfOrder() {
return discardOutOfOrder;
}
public void setDiscardOutOfOrder(boolean discardOutOfOrder) {
- if (this.initialised) {
+ if (this.running) {
throw new IllegalArgumentException("Cannot modify property after initialisation");
}
this.discardOutOfOrder = discardOutOfOrder;
@@ -401,7 +839,7 @@ public int getSendBufferSize() {
}
public void setSendBufferSize(int sendBufferSize) {
- if (this.initialised) {
+ if (this.running) {
throw new IllegalArgumentException("Cannot modify property after initialisation");
}
this.sendBufferSize = sendBufferSize;
@@ -412,9 +850,31 @@ public int getReceiveBufferSize() {
}
public void setReceiveBufferSize(int receiveBufferSize) {
- if (this.initialised) {
+ if (this.running) {
throw new IllegalArgumentException("Cannot modify property after initialisation");
}
this.receiveBufferSize = receiveBufferSize;
}
+
+ public int getMaxCollisionsBeforeConsideringLoop() {
+ return maxCollisionsBeforeConsideringLoop;
+ }
+
+ public void setMaxCollisionsBeforeConsideringLoop(int maxCollisionsBeforeConsideringLoop) {
+ if (this.running) {
+ throw new IllegalArgumentException("Cannot modify property after initialisation");
+ }
+ this.maxCollisionsBeforeConsideringLoop = maxCollisionsBeforeConsideringLoop;
+ }
+
+ public boolean isAutomatedRtcpHandling() {
+ return automatedRtcpHandling;
+ }
+
+ public void setAutomatedRtcpHandling(boolean automatedRtcpHandling) {
+ if (this.running) {
+ throw new IllegalArgumentException("Cannot modify property after initialisation");
+ }
+ this.automatedRtcpHandling = automatedRtcpHandling;
+ }
}
View
134 src/main/java/org/factor45/efflux/session/DefaultRtpParticipantContext.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2010 Bruno de Carvalho
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.factor45.efflux.session;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * @author <a:mailto="bruno.carvalho@wit-software.com" />Bruno de Carvalho</a>
+ */
+public class DefaultRtpParticipantContext implements RtpParticipantContext {
+
+ // configuration --------------------------------------------------------------------------------------------------
+
+ private final RtpParticipant participant;
+
+ // internal vars --------------------------------------------------------------------------------------------------
+
+ private long byeReceptionInstant;
+ private int lastSequenceNumber;
+ private boolean receivedSdes;
+ private final AtomicLong sentByteCounter;
+ private final AtomicLong sentPacketCounter;
+ private final AtomicLong receivedByteCounter;
+ private final AtomicLong receivedPacketCounter;
+
+ // constructors ---------------------------------------------------------------------------------------------------
+
+ public DefaultRtpParticipantContext(RtpParticipant participant) {
+ this.participant = participant;
+
+ this.lastSequenceNumber = -1;
+ this.byeReceptionInstant = -1;
+
+ this.sentByteCounter = new AtomicLong();
+ this.sentPacketCounter = new AtomicLong();
+ this.receivedByteCounter = new AtomicLong();
+ this.receivedPacketCounter = new AtomicLong();
+ }
+
+ // public methods -------------------------------------------------------------------------------------------------
+
+ public void byeReceived() {
+ this.byeReceptionInstant = System.currentTimeMillis();
+ }
+
+ // getters & setters ----------------------------------------------------------------------------------------------
+
+ @Override
+ public RtpParticipant getParticipant() {
+ return participant;
+ }
+
+ @Override
+ public long getByeReceptionInstant() {
+ return byeReceptionInstant;
+ }
+
+ @Override
+ public int getLastSequenceNumber() {
+ return lastSequenceNumber;
+ }
+
+ public void setLastSequenceNumber(int lastSequenceNumber) {
+ this.lastSequenceNumber = lastSequenceNumber;
+ }
+
+ @Override
+ public boolean receivedBye() {
+ return this.byeReceptionInstant > -1;
+ }
+
+ @Override
+ public long getSentPackets() {
+ return this.sentPacketCounter.get();
+ }
+
+ @Override
+ public long getSentBytes() {
+ return this.sentByteCounter.get();
+ }
+
+ @Override
+ public long getReceivedPackets() {
+ return this.receivedPacketCounter.get();
+ }
+
+ @Override
+ public long getReceivedBytes() {
+ return this.receivedByteCounter.get();
+ }
+
+ // public methods -------------------------------------------------------------------------------------------------
+
+ public void resetSendStats() {
+ this.sentByteCounter.set(0);
+ this.sentPacketCounter.set(0);
+ }
+
+ public long incrementSentBytes(int delta) {
+ if (delta < 0) {
+ return this.sentByteCounter.get();
+ }
+
+ return this.sentByteCounter.addAndGet(delta);
+ }
+
+ public long incrementSentPackets() {
+ return this.sentPacketCounter.incrementAndGet();
+ }
+
+ public void receivedSdes() {
+ this.receivedSdes = true;
+ }
+
+ // getters & setters ----------------------------------------------------------------------------------------------
+
+ public boolean hasReceivedSdes() {
+ return receivedSdes;
+ }
+}
View
143 src/main/java/org/factor45/efflux/session/MultiParticipantSession.java
@@ -16,17 +16,8 @@
package org.factor45.efflux.session;
-import org.factor45.efflux.packet.CompoundControlPacket;
-import org.factor45.efflux.packet.ControlPacket;
-import org.factor45.efflux.packet.DataPacket;
-
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
/**
* A regular RTP session, as described in RFC3550.
@@ -46,142 +37,10 @@
*/
public class MultiParticipantSession extends AbstractRtpSession {
- // internal vars --------------------------------------------------------------------------------------------------
-
- private final Map<Long, RtpParticipantContext> participantTable;
-
// constructors ---------------------------------------------------------------------------------------------------
public MultiParticipantSession(String id, int payloadType, RtpParticipant localParticipant) {
super(id, payloadType, localParticipant);
-
- this.participantTable = new ConcurrentHashMap<Long, RtpParticipantContext>();
- }
-
- // RtpSession -----------------------------------------------------------------------------------------------------
-
- @Override
- public boolean addParticipant(RtpParticipant remoteParticipant) {
- if (remoteParticipant.getSsrc() == this.localParticipant.getSsrc()) {
- return false;
- }
-
- RtpParticipantContext context = this.participantTable.get(remoteParticipant.getSsrc());
- if (context == null) {
- context = new RtpParticipantContext(remoteParticipant);
- this.participantTable.put(remoteParticipant.getSsrc(), context);
- return true;
- }
-
- return false;
- }
-
- @Override
- public RtpParticipant removeParticipant(long ssrc) {
- RtpParticipantContext context = this.participantTable.remove(ssrc);
- if (context == null) {
- return null;
- }
-
- return context.getParticipant();
- }
-
- @Override
- public RtpParticipant getRemoteParticipant(long ssrc) {
- RtpParticipantContext context = this.participantTable.get(ssrc);
- if (context == null) {
- return null;
- }
-
- return context.getParticipant();
- }
-
- @Override
- public Collection<RtpParticipant> getRemoteParticipants() {
- Collection<RtpParticipant> participants = new ArrayList<RtpParticipant>(this.participantTable.size());
- for (RtpParticipantContext context : this.participantTable.values()) {
- participants.add(context.getParticipant());
- }
-
- return participants;
- }
-
- // AbstractRtpSession ---------------------------------------------------------------------------------------------
-
- @Override
- protected boolean internalSendData(DataPacket packet) {
- try {
- for (RtpParticipantContext context : this.participantTable.values()) {
- this.writeToData(packet, context.getParticipant().getDataAddress());
- }
- return true;
- } catch (Exception e) {
- LOG.error("Failed to send RTP packet to participants in session with id {}.", this.id);
- return false;
- }
- }
-
- @Override
- protected boolean internalSendControl(ControlPacket packet) {
- try {
- for (RtpParticipantContext context : this.participantTable.values()) {
- this.writeToControl(packet, context.getParticipant().getDataAddress());
- }
- return true;
- } catch (Exception e) {
- LOG.error("Failed to send RTCP packet to participants in session with id {}.", this.id);
- return false;
- }
- }
-
-
- @Override
- protected boolean internalSendControl(CompoundControlPacket packet) {
- try {
- for (RtpParticipantContext context : this.participantTable.values()) {
- this.writeToControl(packet, context.getParticipant().getDataAddress());
- }
- return true;
- } catch (Exception e) {
- LOG.error("Failed to send RTCP compound packet to participants in session with id {}.", this.id);
- return false;
- }
- }
-
- @Override
- protected RtpParticipantContext getContext(SocketAddress origin, DataPacket packet) {
- // Get or create.
- RtpParticipantContext context = this.participantTable.get(packet.getSsrc());
- if (context == null) {
- // New participant
- RtpParticipant participant = RtpParticipant.createFromUnexpectedDataPacket((InetSocketAddress) origin, packet);
- context = new RtpParticipantContext(participant);
- this.participantTable.put(participant.getSsrc(), context);
-
- LOG.debug("New participant joined session with id {} (from data packet): {}.", this.id, participant);
- for (RtpSessionEventListener listener : this.eventListeners) {
- listener.participantJoinedFromData(this, participant, packet);
- }
- }
-
- return context;
- }
-
- @Override
- protected boolean doBeforeDataReceivedValidation(DataPacket packet) {
- return true;
- }
-
- @Override
- protected boolean doAfterDataReceivedValidation(SocketAddress origin) {
- return true;
- }
-
- // ControlPacketReceiver ------------------------------------------------------------------------------------------
-
- @Override
- public void controlPacketReceived(SocketAddress origin, CompoundControlPacket packet) {
-
}
// getters & setters ----------------------------------------------------------------------------------------------
@@ -191,6 +50,6 @@ public RtpParticipantContext getParticipantContext(long ssrc) {
}
public Map<Long, RtpParticipantContext> getParticipantTable() {
- return Collections.unmodifiableMap(this.participantTable);
+ return Collections.<Long, RtpParticipantContext>unmodifiableMap(this.participantTable);
}
}
View
160 src/main/java/org/factor45/efflux/session/RtpParticipant.java
@@ -17,6 +17,9 @@
package org.factor45.efflux.session;
import org.factor45.efflux.packet.DataPacket;
+import org.factor45.efflux.packet.SdesChunk;
+import org.factor45.efflux.packet.SdesChunkItem;
+import org.factor45.efflux.packet.SdesChunkPrivItem;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -40,9 +43,11 @@
private String name;
private String cname;
private String email;
+ private String phone;
private String location;
private String tool;
private String note;
+ private String privPrefix;
private String priv;
// constructors ---------------------------------------------------------------------------------------------------
@@ -73,12 +78,25 @@ private RtpParticipant() {
public static RtpParticipant createFromUnexpectedDataPacket(InetSocketAddress origin, DataPacket packet) {
RtpParticipant participant = new RtpParticipant();
+ // I know RFC states that we "MUST NOT" consider the origin IP as the destination for future packets, but it
+ // doesn't provide an alternative so yeah, I'll pretty much disregard the RFC here.
participant.dataAddress = origin;
participant.controlAddress = new InetSocketAddress(origin.getAddress(), origin.getPort() + 1);
participant.ssrc = packet.getSsrc();
return participant;
}
+ public static RtpParticipant createFromSdesChunk(InetSocketAddress origin, SdesChunk chunk) {
+ RtpParticipant participant = new RtpParticipant();
+ // I know RFC states that we "MUST NOT" consider the origin IP as the destination for future packets, but it
+ // doesn't provide an alternative so yeah, I'll pretty much disregard the RFC here.
+ participant.dataAddress = new InetSocketAddress(origin.getAddress(), origin.getPort() - 1);
+ participant.controlAddress = origin;
+ participant.updateFromSdesChunk(chunk);
+
+ return participant;
+ }
+
/**
* Randomly generates a new SSRC.
* <p/>
@@ -97,11 +115,77 @@ public static long generateNewSsrc() {
// public methods -------------------------------------------------------------------------------------------------
- public void updateRtpAddress(SocketAddress address) {
+ public boolean updateFromSdesChunk(SdesChunk chunk) {
+ boolean modified = false;
+ if (this.ssrc != chunk.getSsrc()) {
+ modified = true;
+ }
+ this.ssrc = chunk.getSsrc();
+ for (SdesChunkItem item : chunk.getItems()) {
+ switch (item.getType()) {
+ case CNAME:
+ if (this.willCauseModification(this.cname, item.getValue())) {
+ this.setCname(item.getValue());
+ modified = true;
+ }
+ break;
+ case NAME:
+ if (this.willCauseModification(this.name, item.getValue())) {
+ this.setName(item.getValue());
+ modified = true;
+ }
+ break;
+ case EMAIL:
+ if (this.willCauseModification(this.email, item.getValue())) {
+ this.setEmail(item.getValue());
+ modified = true;
+ }
+ break;
+ case PHONE:
+ if (this.willCauseModification(this.phone, item.getValue())) {
+ this.setPhone(item.getValue());
+ modified = true;
+ }
+ break;
+ case LOCATION:
+ if (this.willCauseModification(this.location, item.getValue())) {
+ this.setLocation(item.getValue());
+ modified = true;
+ }
+ break;
+ case TOOL:
+ if (this.willCauseModification(this.location, item.getValue())) {
+ this.setTool(item.getValue());
+ modified = true;
+ }
+ break;
+ case NOTE:
+ if (this.willCauseModification(this.location, item.getValue())) {
+ this.setNote(item.getValue());
+ modified = true;
+ }
+ break;
+ case PRIV:
+ String prefix = ((SdesChunkPrivItem) item).getPrefix();
+ if (this.willCauseModification(this.privPrefix, prefix) ||
+ this.willCauseModification(this.priv, item.getValue())) {
+ this.setPriv(prefix, item.getValue());
+ modified = true;
+ }
+ break;
+ default:
+ // Never falls here...
+ }
+ }
+
+ return modified;
+ }
+
+ public void updateDataAddress(SocketAddress address) {
this.dataAddress = address;
}
- public void updateRtcpAddress(SocketAddress address) {
+ public void updateControlAddress(SocketAddress address) {
this.controlAddress = address;
}
@@ -138,111 +222,135 @@ public void updateSsrc(long ssrc) {
this.ssrc = ssrc;
}
+ // private helpers ------------------------------------------------------------------------------------------------
+
+ private boolean willCauseModification(String originalValue, String newValue) {
+ return newValue != null && !newValue.equals(originalValue);
+ }
+
// getters & setters ----------------------------------------------------------------------------------------------
public SocketAddress getDataAddress() {
- return dataAddress;
+ return this