Permalink
Browse files

% heavy API changes (after re-reading the RFC and discussing with a c…

…o-worker, I found out that senders and receivers need not be associated - in fact, they may never be!)

- removed the RtpParticipantContext concept
+ added RtpParticipantInfo (previous RtpParticipant)
% RtpParticipant is now the previous RtpParticipantContext
+ ParticipantDatabase added (helps factoring out important logic and testability)
% bumped version up to 0.4 (due to the high amount of API and internal changes)
  • Loading branch information...
1 parent d5c48e8 commit 779b20b3148850e9986d0fb5095bad6015c472f0 Bruno de Carvalho committed Sep 15, 2010
Showing with 1,458 additions and 581 deletions.
  1. +1 −1 HISTORY.md
  2. +1 −1 pom.xml
  3. +21 −20 src/functionaltest/java/org/factor45/efflux/session/ControlPacketFunctionalTest.java
  4. +4 −3 src/functionaltest/java/org/factor45/efflux/session/MultiParticipantSessionFunctionalTest.java
  5. +28 −27 src/functionaltest/java/org/factor45/efflux/session/SingleParticipantSessionFunctionalTest.java
  6. +18 −0 src/main/java/org/factor45/efflux/packet/SdesChunk.java
  7. +304 −0 src/main/java/org/factor45/efflux/participant/DefaultParticipantDatabase.java
  8. +56 −0 src/main/java/org/factor45/efflux/participant/ParticipantDatabase.java
  9. +5 −15 .../factor45/efflux/{session/RtpParticipantContext.java → participant/ParticipantEventListener.java}
  10. +25 −0 src/main/java/org/factor45/efflux/participant/ParticipantOperation.java
  11. +245 −0 src/main/java/org/factor45/efflux/participant/RtpParticipant.java
  12. +18 −90 ...main/java/org/factor45/efflux/{session/RtpParticipant.java → participant/RtpParticipantInfo.java}
  13. +129 −0 src/main/java/org/factor45/efflux/participant/SingleParticipantDatabase.java
  14. +181 −210 src/main/java/org/factor45/efflux/session/AbstractRtpSession.java
  15. +0 −134 src/main/java/org/factor45/efflux/session/DefaultRtpParticipantContext.java
  16. +30 −17 src/main/java/org/factor45/efflux/session/MultiParticipantSession.java
  17. +7 −4 src/main/java/org/factor45/efflux/session/RtpSession.java
  18. +2 −1 src/main/java/org/factor45/efflux/session/RtpSessionDataListener.java
  19. +6 −2 src/main/java/org/factor45/efflux/session/RtpSessionEventListener.java
  20. +44 −50 src/main/java/org/factor45/efflux/session/SingleParticipantSession.java
  21. +76 −0 src/main/java/org/factor45/efflux/util/TimeUtils.java
  22. +244 −0 src/test/java/org/factor45/efflux/participant/DefaultParticipantDatabaseTest.java
  23. +13 −6 src/test/java/org/factor45/efflux/session/MultiParticipantSessionTest.java
View
@@ -1,7 +1,7 @@
efflux
======
-0.4.0
+0.5.0
-----
* First release!
* Multiple participant RTP session
View
@@ -4,7 +4,7 @@
<groupId>org.factor45.efflux</groupId>
<artifactId>efflux</artifactId>
- <version>0.3.0</version>
+ <version>0.4.0</version>
<packaging>jar</packaging>
<name>efflux</name>
@@ -23,6 +23,7 @@
import org.factor45.efflux.packet.SdesChunk;
import org.factor45.efflux.packet.SdesChunkItems;
import org.factor45.efflux.packet.SourceDescriptionPacket;
+import org.factor45.efflux.participant.RtpParticipantInfo;
import org.junit.After;
import org.junit.Test;
@@ -55,8 +56,8 @@ public void tearDown() {
public void testSendAndReceive() {
final CountDownLatch latch = new CountDownLatch(2);
- RtpParticipant local1 = new RtpParticipant("127.0.0.1", 6000, 6001, 1);
- RtpParticipant remote1 = new RtpParticipant("127.0.0.1", 7000, 7001, 2);
+ RtpParticipantInfo local1 = new RtpParticipantInfo("127.0.0.1", 6000, 6001, 1);
+ RtpParticipantInfo remote1 = new RtpParticipantInfo("127.0.0.1", 7000, 7001, 2);
this.session1 = new SingleParticipantSession("Session1", 8, local1, remote1);
this.session1.setAutomatedRtcpHandling(false);
assertTrue(this.session1.init());
@@ -73,8 +74,8 @@ public void appDataReceived(RtpSession session, AppDataPacket appDataPacket) {
}
});
- RtpParticipant local2 = new RtpParticipant("127.0.0.1", 7000, 7001, 2);
- RtpParticipant remote2 = new RtpParticipant("127.0.0.1", 6000, 6001, 1);
+ RtpParticipantInfo local2 = new RtpParticipantInfo("127.0.0.1", 7000, 7001, 2);
+ RtpParticipantInfo remote2 = new RtpParticipantInfo("127.0.0.1", 6000, 6001, 1);
this.session2 = new SingleParticipantSession("Session2", 8, local2, remote2);
this.session2.setAutomatedRtcpHandling(false);
assertTrue(this.session2.init());
@@ -106,7 +107,7 @@ public void appDataReceived(RtpSession session, AppDataPacket appDataPacket) {
public void testSendAndNotReceiveForAutomatedRtcpSession() {
final CountDownLatch latch = new CountDownLatch(1);
- RtpParticipant local1 = new RtpParticipant("127.0.0.1", 6000, 6001, 1);
+ RtpParticipantInfo local1 = new RtpParticipantInfo("127.0.0.1", 6000, 6001, 1);
this.session1 = new MultiParticipantSession("Session1", 8, local1);
this.session1.setAutomatedRtcpHandling(true);
assertTrue(this.session1.init());
@@ -122,8 +123,8 @@ public void appDataReceived(RtpSession session, AppDataPacket appDataPacket) {
}
});
- RtpParticipant local2 = new RtpParticipant("127.0.0.1", 7000, 7001, 2);
- RtpParticipant remote2 = new RtpParticipant("127.0.0.1", 6000, 6001, 1);
+ RtpParticipantInfo local2 = new RtpParticipantInfo("127.0.0.1", 7000, 7001, 2);
+ RtpParticipantInfo remote2 = new RtpParticipantInfo("127.0.0.1", 6000, 6001, 1);
this.session2 = new SingleParticipantSession("Session2", 8, local2, remote2);
this.session2.setAutomatedRtcpHandling(false);
assertTrue(this.session2.init());
@@ -155,27 +156,27 @@ public void appDataReceived(RtpSession session, AppDataPacket appDataPacket) {
public void testSendSdesAndByePackets() throws Exception {
final CountDownLatch latch = new CountDownLatch(2);
- RtpParticipant local1 = new RtpParticipant("127.0.0.1", 6000, 6001, 1);
+ RtpParticipantInfo local1 = new RtpParticipantInfo("127.0.0.1", 6000, 6001, 1);
this.session1 = new MultiParticipantSession("Session1", 8, local1);
this.session1.addEventListener(new RtpSessionEventListener() {
@Override
- public void participantJoinedFromData(RtpSession session, RtpParticipant participant, DataPacket packet) {
+ public void participantJoinedFromData(RtpSession session, RtpParticipantInfo participant, DataPacket packet) {
fail("Unexpected event triggered.");
}
@Override
- public void participantJoinedFromControl(RtpSession session, RtpParticipant participant, SdesChunk chunk) {
+ public void participantJoinedFromControl(RtpSession session, RtpParticipantInfo participant, SdesChunk chunk) {
System.err.println("Participant joined from SDES chunk: " + chunk);
latch.countDown();
}
@Override
- public void participantDataUpdated(RtpSession session, RtpParticipant participant) {
+ public void participantDataUpdated(RtpSession session, RtpParticipantInfo participant) {
fail("Unexpected event triggered.");
}
@Override
- public void participantLeft(RtpSession session, RtpParticipant participant) {
+ public void participantLeft(RtpSession session, RtpParticipantInfo participant) {
System.err.println("Participant left: " + participant);
latch.countDown();
}
@@ -192,7 +193,7 @@ public void sessionTerminated(RtpSession session, Throwable cause) {
});
assertTrue(this.session1.init());
- RtpParticipant local2 = new RtpParticipant("127.0.0.1", 7000, 7001, 2);
+ RtpParticipantInfo local2 = new RtpParticipantInfo("127.0.0.1", 7000, 7001, 2);
this.session2 = new MultiParticipantSession("Session2", 8, local2);
this.session2.addParticipant(local1);
assertTrue(this.session2.init());
@@ -210,28 +211,28 @@ public void sessionTerminated(RtpSession session, Throwable cause) {
public void testUpdateSdes() throws Exception {
final CountDownLatch latch = new CountDownLatch(2);
- RtpParticipant local1 = new RtpParticipant("127.0.0.1", 6000, 6001, 1);
+ RtpParticipantInfo local1 = new RtpParticipantInfo("127.0.0.1", 6000, 6001, 1);
this.session1 = new MultiParticipantSession("Session1", 8, local1);
this.session1.addEventListener(new RtpSessionEventListener() {
@Override
- public void participantJoinedFromData(RtpSession session, RtpParticipant participant, DataPacket packet) {
+ public void participantJoinedFromData(RtpSession session, RtpParticipantInfo participant, DataPacket packet) {
System.err.println("Participant joined from DataPacket: " + packet);
latch.countDown();
}
@Override
- public void participantJoinedFromControl(RtpSession session, RtpParticipant participant, SdesChunk chunk) {
+ public void participantJoinedFromControl(RtpSession session, RtpParticipantInfo participant, SdesChunk chunk) {
fail("Unexpected packet received");
}
@Override
- public void participantDataUpdated(RtpSession session, RtpParticipant participant) {
+ public void participantDataUpdated(RtpSession session, RtpParticipantInfo participant) {
System.err.println("Participant information updated: " + participant);
latch.countDown();
}
@Override
- public void participantLeft(RtpSession session, RtpParticipant participant) {
+ public void participantLeft(RtpSession session, RtpParticipantInfo participant) {
System.err.println("Participant left: " + participant);
}
@@ -246,7 +247,7 @@ public void sessionTerminated(RtpSession session, Throwable cause) {
});
assertTrue(this.session1.init());
- RtpParticipant local2 = new RtpParticipant("127.0.0.1", 7000, 7001, 2);
+ RtpParticipantInfo local2 = new RtpParticipantInfo("127.0.0.1", 7000, 7001, 2);
this.session2 = new MultiParticipantSession("Session2", 8, local2);
this.session2.addParticipant(local1);
this.session2.setAutomatedRtcpHandling(false);
@@ -272,7 +273,7 @@ public void sessionTerminated(RtpSession session, Throwable cause) {
RtpParticipantContext context = this.session1.getRemoteParticipant(2);
assertNotNull(context);
- RtpParticipant participant = context.getParticipant();
+ RtpParticipantInfo participant = context.getParticipant();
assertEquals(2, participant.getSsrc());
assertEquals("session2@127.0.0.1:7000", participant.getCname());
assertEquals("session2", participant.getName());
@@ -17,6 +17,7 @@
package org.factor45.efflux.session;
import org.factor45.efflux.packet.DataPacket;
+import org.factor45.efflux.participant.RtpParticipantInfo;
import org.junit.After;
import org.junit.Test;
@@ -50,20 +51,20 @@ public void tearDown() {
@Test
public void testDeliveryToAllParticipants() throws Exception {
this.sessions = new MultiParticipantSession[N];
- RtpParticipant[] p = new RtpParticipant[N];
+ RtpParticipantInfo[] p = new RtpParticipantInfo[N];
final AtomicInteger[] counters = new AtomicInteger[N];
final CountDownLatch latch = new CountDownLatch(N);
for (byte i = 0; i < N; i++) {
- p[i] = new RtpParticipant("127.0.0.1", 10000 + (i * 2), 20001 + (i * 2), i);
+ p[i] = new RtpParticipantInfo("127.0.0.1", 10000 + (i * 2), 20001 + (i * 2), i);
this.sessions[i] = new MultiParticipantSession("session" + i, 8, p[i]);
assertTrue(this.sessions[i].init());
final AtomicInteger counter = new AtomicInteger();
counters[i] = counter;
this.sessions[i].addDataListener(new RtpSessionDataListener() {
@Override
- public void dataPacketReceived(RtpSession session, RtpParticipant participant, DataPacket packet) {
+ public void dataPacketReceived(RtpSession session, RtpParticipantInfo participant, DataPacket packet) {
System.err.println(session.getId() + " received data from " + participant + ": " + packet);
if (counter.incrementAndGet() == (N - 1)) {
// Release the latch once all N-1 messages (because it wont receive the message it sends) are
@@ -18,6 +18,7 @@
import org.factor45.efflux.packet.DataPacket;
import org.factor45.efflux.packet.SdesChunk;
+import org.factor45.efflux.participant.RtpParticipantInfo;
import org.junit.After;
import org.junit.Test;
@@ -50,25 +51,25 @@ public void tearDown() {
public void testSendAndReceive() {
final CountDownLatch latch = new CountDownLatch(2);
- RtpParticipant local1 = new RtpParticipant("127.0.0.1", 6000, 6001, 1);
- RtpParticipant remote1 = new RtpParticipant("127.0.0.1", 7000, 7001, 2);
+ RtpParticipantInfo local1 = new RtpParticipantInfo("127.0.0.1", 6000, 6001, 1);
+ RtpParticipantInfo remote1 = new RtpParticipantInfo("127.0.0.1", 7000, 7001, 2);
this.session1 = new SingleParticipantSession("Session1", 8, local1, remote1);
assertTrue(this.session1.init());
this.session1.addDataListener(new RtpSessionDataListener() {
@Override
- public void dataPacketReceived(RtpSession session, RtpParticipant participant, DataPacket packet) {
+ public void dataPacketReceived(RtpSession session, RtpParticipantInfo participant, DataPacket packet) {
System.err.println("Session 1 received packet: " + packet + "(session: " + session.getId() + ")");
latch.countDown();
}
});
- RtpParticipant local2 = new RtpParticipant("127.0.0.1", 7000, 7001, 2);
- RtpParticipant remote2 = new RtpParticipant("127.0.0.1", 6000, 6001, 1);
+ RtpParticipantInfo local2 = new RtpParticipantInfo("127.0.0.1", 7000, 7001, 2);
+ RtpParticipantInfo remote2 = new RtpParticipantInfo("127.0.0.1", 6000, 6001, 1);
this.session2 = new SingleParticipantSession("Session2", 8, local2, remote2);
assertTrue(this.session2.init());
this.session2.addDataListener(new RtpSessionDataListener() {
@Override
- public void dataPacketReceived(RtpSession session, RtpParticipant participant, DataPacket packet) {
+ public void dataPacketReceived(RtpSession session, RtpParticipantInfo participant, DataPacket packet) {
System.err.println("Session 2 received packet: " + packet + "(session: " + session.getId() + ")");
latch.countDown();
}
@@ -94,25 +95,25 @@ public void testSendAndReceiveUpdatingRemote() throws Exception {
final CountDownLatch latch2 = new CountDownLatch(1);
String initialHost;
- RtpParticipant local1 = new RtpParticipant("127.0.0.1", 6000, 6001, 1);
- RtpParticipant remote1 = new RtpParticipant("127.0.0.1", 7000, 7001, 2);
+ RtpParticipantInfo local1 = new RtpParticipantInfo("127.0.0.1", 6000, 6001, 1);
+ RtpParticipantInfo remote1 = new RtpParticipantInfo("127.0.0.1", 7000, 7001, 2);
this.session1 = new SingleParticipantSession("Session1", 8, local1, remote1);
assertTrue(this.session1.init());
this.session1.addDataListener(new RtpSessionDataListener() {
@Override
- public void dataPacketReceived(RtpSession session, RtpParticipant participant, DataPacket packet) {
+ public void dataPacketReceived(RtpSession session, RtpParticipantInfo participant, DataPacket packet) {
System.err.println("Session 1 received packet: " + packet + "(session: " + session.getId() + ")");
latch2.countDown();
}
});
- RtpParticipant local2 = new RtpParticipant("127.0.0.1", 7000, 7001, 2);
- RtpParticipant remote2 = new RtpParticipant("127.0.0.1", 9000, 9001, 1);
+ RtpParticipantInfo local2 = new RtpParticipantInfo("127.0.0.1", 7000, 7001, 2);
+ RtpParticipantInfo remote2 = new RtpParticipantInfo("127.0.0.1", 9000, 9001, 1);
this.session2 = new SingleParticipantSession("Session2", 8, local2, remote2);
assertTrue(this.session2.init());
this.session2.addDataListener(new RtpSessionDataListener() {
@Override
- public void dataPacketReceived(RtpSession session, RtpParticipant participant, DataPacket packet) {
+ public void dataPacketReceived(RtpSession session, RtpParticipantInfo participant, DataPacket packet) {
System.err.println("Session 2 received packet: " + packet + "(session: " + session.getId() + ")");
latch.countDown();
}
@@ -151,20 +152,20 @@ public void dataPacketReceived(RtpSession session, RtpParticipant participant, D
public void testIgnoreFromUnexpectedSsrc() throws Exception {
final AtomicInteger counter = new AtomicInteger();
- RtpParticipant local1 = new RtpParticipant("127.0.0.1", 6000, 6001);
- RtpParticipant remote1 = new RtpParticipant("127.0.0.1", 7000, 7001);
+ RtpParticipantInfo local1 = new RtpParticipantInfo("127.0.0.1", 6000, 6001);
+ RtpParticipantInfo remote1 = new RtpParticipantInfo("127.0.0.1", 7000, 7001);
this.session1 = new SingleParticipantSession("Session1", 8, local1, remote1);
assertTrue(this.session1.init());
this.session1.addDataListener(new RtpSessionDataListener() {
@Override
- public void dataPacketReceived(RtpSession session, RtpParticipant participant, DataPacket packet) {
+ public void dataPacketReceived(RtpSession session, RtpParticipantInfo participant, DataPacket packet) {
System.err.println("Session 1 received packet: " + packet + "(session: " + session.getId() + ")");
counter.incrementAndGet();
}
});
- RtpParticipant local2 = new RtpParticipant("127.0.0.1", 7000, 7001, 2);
- RtpParticipant remote2 = new RtpParticipant("127.0.0.1", 6000, 6001, 1);
+ RtpParticipantInfo local2 = new RtpParticipantInfo("127.0.0.1", 7000, 7001, 2);
+ RtpParticipantInfo remote2 = new RtpParticipantInfo("127.0.0.1", 6000, 6001, 1);
this.session2 = new SingleParticipantSession("Session2", 8, local2, remote2) {
@Override
public boolean sendDataPacket(DataPacket packet) {
@@ -199,31 +200,31 @@ public void testCollisionResolution() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch latch2 = new CountDownLatch(1);
- RtpParticipant local1 = new RtpParticipant("127.0.0.1", 6000, 6001, 2);
- RtpParticipant remote1 = new RtpParticipant("127.0.0.1", 7000, 7001, 1);
+ RtpParticipantInfo local1 = new RtpParticipantInfo("127.0.0.1", 6000, 6001, 2);
+ RtpParticipantInfo remote1 = new RtpParticipantInfo("127.0.0.1", 7000, 7001, 1);
this.session1 = new SingleParticipantSession("Session1", 8, local1, remote1);
assertTrue(this.session1.init());
this.session1.addDataListener(new RtpSessionDataListener() {
@Override
- public void dataPacketReceived(RtpSession session, RtpParticipant participant, DataPacket packet) {
+ public void dataPacketReceived(RtpSession session, RtpParticipantInfo participant, DataPacket packet) {
System.err.println("Session 1 received packet: " + packet + "(session: " + session.getId() + ")");
}
});
this.session1.addEventListener(new RtpSessionEventListener() {
@Override
- public void participantJoinedFromData(RtpSession session, RtpParticipant participant, DataPacket packet) {
+ public void participantJoinedFromData(RtpSession session, RtpParticipantInfo participant, DataPacket packet) {
}
@Override
- public void participantJoinedFromControl(RtpSession session, RtpParticipant participant, SdesChunk chunk) {
+ public void participantJoinedFromControl(RtpSession session, RtpParticipantInfo participant, SdesChunk chunk) {
}
@Override
- public void participantDataUpdated(RtpSession session, RtpParticipant participant) {
+ public void participantDataUpdated(RtpSession session, RtpParticipantInfo participant) {
}
@Override
- public void participantLeft(RtpSession session, RtpParticipant participant) {
+ public void participantLeft(RtpSession session, RtpParticipantInfo participant) {
}
@Override
@@ -238,13 +239,13 @@ public void sessionTerminated(RtpSession session, Throwable cause) {
}
});
- RtpParticipant local2 = new RtpParticipant("127.0.0.1", 7000, 7001, 2);
- RtpParticipant remote2 = new RtpParticipant("127.0.0.1", 6000, 6001, 1);
+ RtpParticipantInfo local2 = new RtpParticipantInfo("127.0.0.1", 7000, 7001, 2);
+ RtpParticipantInfo remote2 = new RtpParticipantInfo("127.0.0.1", 6000, 6001, 1);
this.session2 = new SingleParticipantSession("Session2", 8, local2, remote2);
assertTrue(this.session2.init());
this.session2.addDataListener(new RtpSessionDataListener() {
@Override
- public void dataPacketReceived(RtpSession session, RtpParticipant participant, DataPacket packet) {
+ public void dataPacketReceived(RtpSession session, RtpParticipantInfo participant, DataPacket packet) {
System.err.println("Session 2 received packet: " + packet + "(session: " + session.getId() + ")");
latch2.countDown();
}
Oops, something went wrong.

0 comments on commit 779b20b

Please sign in to comment.