Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
6ab2108
Work in progress for RESTCOMM-1147, RMS resources leak
gvagenas Oct 1, 2017
7a988ed
Fixes for SmsTest
gvagenas Oct 3, 2017
e9d3923
Merge branch 'master' into restcomm1147_rms_resources_leak_on_conf
gvagenas Oct 3, 2017
110a6a2
Code cleanup.
gvagenas Oct 3, 2017
2cb0698
Work in progress for MgcpMonitoringService
gvagenas Oct 3, 2017
644664b
Merge branch 'issue1155_MgcpMonitoringService' into restcomm1147_rms_…
gvagenas Oct 3, 2017
0355b20
Work in progress for MgcpMonitoringService
gvagenas Oct 4, 2017
e89b53e
Work in progress for MgcpMonitoring service
gvagenas Oct 4, 2017
e3734ae
Patch to monitor MGCP resources and be able to track MGCP resources leak
gvagenas Oct 4, 2017
e17ded6
Merge branch 'master' into restcomm1147_rms_resources_leak_on_conf
gvagenas Oct 4, 2017
fe49c02
Work in progress to remove endpoints on connection destroy
gvagenas Oct 5, 2017
aaebb11
Merge branch 'restcomm1158_removeEndpoints_onConnectionDestroy' into …
gvagenas Oct 5, 2017
b2b3ae5
Patch to remove endpoints on connection destroy
gvagenas Oct 5, 2017
804ba65
Patch for DialConferenceTest
gvagenas Oct 6, 2017
3164114
Addition to DialConferenceTest to assert MOH is properly stopped.
gvagenas Oct 6, 2017
b03a9e7
Testsuite enhancements for checking MGCP resources at the end of the …
gvagenas Oct 6, 2017
fba78b9
Patch to properly cleanup MGCP resources for Failed Calls
gvagenas Oct 6, 2017
2c69abf
Enhancements to testsuite for DialActionTest and DialForkTest to asse…
gvagenas Oct 6, 2017
5255baa
Merge branch 'master' of github.com:RestComm/Restcomm-Connect
gvagenas Oct 9, 2017
5345768
Merge branch 'master' into restcomm1147_rms_resources_leak_on_conf
gvagenas Oct 9, 2017
0e7dc36
Code cleanup and added comments
gvagenas Oct 10, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void destroy() {
system.awaitTermination();
}

private MediaServerControllerFactory mediaServerControllerFactory(final Configuration configuration, ClassLoader loader, DaoManager storage)
private MediaServerControllerFactory mediaServerControllerFactory(final Configuration configuration, ClassLoader loader, DaoManager storage, ActorRef monitoring)
throws ServletException {
Configuration settings ;
String compatibility = configuration.subset("mscontrol").getString("compatibility", "rms");
Expand All @@ -89,7 +89,7 @@ private MediaServerControllerFactory mediaServerControllerFactory(final Configur
case "rms":
try {
settings = configuration.subset("media-server-manager");
ActorRef mrb = mediaResourceBroker(settings, storage, loader);
ActorRef mrb = mediaResourceBroker(settings, storage, loader, monitoring);
factory = new MmsControllerFactory(mrb);
} catch (UnknownHostException e) {
throw new ServletException(e);
Expand Down Expand Up @@ -200,7 +200,7 @@ private SipURI outboundInterface(ServletContext context, String transport) {
return result;
}

private ActorRef mediaResourceBroker(final Configuration configuration, final DaoManager storage, final ClassLoader loader) throws UnknownHostException{
private ActorRef mediaResourceBroker(final Configuration configuration, final DaoManager storage, final ClassLoader loader, final ActorRef monitoring) throws UnknownHostException{
final Props props = new Props(new UntypedActorFactory() {
private static final long serialVersionUID = 1L;

Expand All @@ -211,7 +211,7 @@ public UntypedActor create() throws Exception {
}
});
ActorRef mrb = system.actorOf(props);
mrb.tell(new StartMediaResourceBroker(configuration, storage, loader), null);
mrb.tell(new StartMediaResourceBroker(configuration, storage, loader, monitoring), null);
return mrb;
}

Expand Down Expand Up @@ -387,7 +387,7 @@ public void servletInitialized(SipServletContextEvent event) {
// Create the media server controller factory
MediaServerControllerFactory mscontrollerFactory = null;
try {
mscontrollerFactory = mediaServerControllerFactory(xml, loader, storage);
mscontrollerFactory = mediaServerControllerFactory(xml, loader, storage, monitoring);
} catch (ServletException exception) {
logger.error("ServletException during initialization: ", exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,19 @@ protected Response getMetrics(final String accountSid, final UriInfo info, final
checkAuthenticatedAccount();
allowOnlySuperAdmin();
boolean withLiveCallDetails = false;
boolean withMgcpStats = false;
if (info != null && info.getQueryParameters().containsKey("LiveCallDetails") ) {
withLiveCallDetails = Boolean.parseBoolean(info.getQueryParameters().getFirst("LiveCallDetails"));
}

if (info != null && info.getQueryParameters().containsKey("MgcpStats") ) {
withMgcpStats = Boolean.parseBoolean(info.getQueryParameters().getFirst("MgcpStats"));
}
//Get the list of live calls from Monitoring Service
MonitoringServiceResponse monitoringServiceResponse;
try {
final Timeout expires = new Timeout(Duration.create(5, TimeUnit.SECONDS));
GetStatistics getStatistics = new GetStatistics(withLiveCallDetails, accountSid);
GetStatistics getStatistics = new GetStatistics(withLiveCallDetails, withMgcpStats, accountSid);
Future<Object> future = (Future<Object>) ask(monitoringService, getStatistics, expires);
monitoringServiceResponse = (MonitoringServiceResponse) Await.result(future, Duration.create(5, TimeUnit.SECONDS));
} catch (Exception exception) {
Expand Down Expand Up @@ -202,14 +207,19 @@ protected Response registerForUpdates(final String accountSid, final UriInfo inf
checkAuthenticatedAccount();
allowOnlySuperAdmin();
boolean withLiveCallDetails = false;
boolean withMgcpStats = false;
if (info != null && info.getQueryParameters().containsKey("LiveCallDetails") ) {
withLiveCallDetails = Boolean.parseBoolean(info.getQueryParameters().getFirst("LiveCallDetails"));
}

if (info != null && info.getQueryParameters().containsKey("MgcpStats") ) {
withMgcpStats = Boolean.parseBoolean(info.getQueryParameters().getFirst("MgcpStats"));
}
//Get the list of live calls from Monitoring Service
MonitoringServiceResponse monitoringServiceResponse;
try {
final Timeout expires = new Timeout(Duration.create(60, TimeUnit.SECONDS));
GetStatistics getStatistics = new GetStatistics(withLiveCallDetails, accountSid);
GetStatistics getStatistics = new GetStatistics(withLiveCallDetails, withMgcpStats, accountSid);
Future<Object> future = (Future<Object>) ask(monitoringService, getStatistics, expires);
monitoringServiceResponse = (MonitoringServiceResponse) Await.result(future, Duration.create(10, TimeUnit.SECONDS));
} catch (Exception exception) {
Expand Down Expand Up @@ -238,14 +248,19 @@ protected Response registerForCallUpdates(final String accountSid, final String
final String url = data.getFirst("Url");
final String refresh = data.getFirst("Refresh");
boolean withLiveCallDetails = false;
boolean withMgcpStats = false;
if (data != null && data.containsKey("LiveCallDetails")) {
withLiveCallDetails = Boolean.parseBoolean(data.getFirst("LiveCallDetails"));
}

if (data != null && data.containsKey("MgcpStats") ) {
withMgcpStats = Boolean.parseBoolean(data.getFirst("MgcpStats"));
}
//Get the list of live calls from Monitoring Service
MonitoringServiceResponse monitoringServiceResponse;
try {
final Timeout expires = new Timeout(Duration.create(60, TimeUnit.SECONDS));
GetStatistics getStatistics = new GetStatistics(withLiveCallDetails, accountSid);
GetStatistics getStatistics = new GetStatistics(withLiveCallDetails, withMgcpStats, accountSid);
Future<Object> future = (Future<Object>) ask(monitoringService, getStatistics, expires);
monitoringServiceResponse = (MonitoringServiceResponse) Await.result(future, Duration.create(10, TimeUnit.SECONDS));
} catch (Exception exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,16 @@
import org.mobicents.protocols.mgcp.jain.pkg.AUPackage;
import org.restcomm.connect.commons.faulttolerance.RestcommUntypedActor;
import org.restcomm.connect.commons.util.RevolvingCounter;
import org.restcomm.connect.mgcp.stats.MgcpConnectionAdded;
import org.restcomm.connect.mgcp.stats.MgcpConnectionDeleted;
import org.restcomm.connect.mgcp.stats.MgcpEndpointAdded;
import org.restcomm.connect.mgcp.stats.MgcpEndpointDeleted;

import javax.sdp.SdpFactory;
import javax.sdp.SdpParseException;
import javax.sdp.SessionDescription;
import java.net.InetAddress;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -89,30 +94,16 @@ public class MockMediaGateway extends RestcommUntypedActor {
private RevolvingCounter connectionIdPool;
private RevolvingCounter endpointIdPool;

private static Map<MediaSession, ActorRef> endpoints;
private static Map<MediaSession, ActorRef> links;
private static Map<MediaSession, ActorRef> connections;
private Map<String, String> connEndpointMap;

private ActorRef monitoringService;

private ActorSystem system;

public MockMediaGateway() {
super();
endpoints = new ConcurrentHashMap<MediaSession, ActorRef>();
links = new ConcurrentHashMap<MediaSession, ActorRef>();
connections = new ConcurrentHashMap<MediaSession, ActorRef>();
system = context().system();
}

public static Map<MediaSession, ActorRef> getEndpointsMap() {
return endpoints;
}

public static Map<MediaSession, ActorRef> getConnections() {
return connections;
}

public static Map<MediaSession, ActorRef> getLinks() {
return links;
connEndpointMap = new ConcurrentHashMap<String, String>();
}

private ActorRef getConnection(final Object message) {
Expand All @@ -127,7 +118,10 @@ public UntypedActor create() throws Exception {
}
});
ActorRef connection = system.actorOf(props);
connections.put(session, connection);
if (logger.isInfoEnabled()) {
String msg = String.format("MockMediaGateway, Added new Connection, path: %s",connection.path());
logger.info(msg);
}
return connection;
}

Expand All @@ -144,7 +138,10 @@ public Actor create() throws Exception {
}
});
ActorRef bridgeEndpoint = system.actorOf(props);
endpoints.put(session, bridgeEndpoint);
if (logger.isInfoEnabled()) {
String msg = String.format("MockMediaGateway, Added Bridge endpoint, path: %s",bridgeEndpoint.path());
logger.info(msg);
}
return bridgeEndpoint;
}

Expand All @@ -165,7 +162,10 @@ public UntypedActor create() throws Exception {
}
});
ActorRef conferenceEndpoint = system.actorOf(props);
endpoints.put(session, conferenceEndpoint);
if (logger.isInfoEnabled()) {
String msg = String.format("MockMediaGateway, Added Conference endpoint, path: %s", conferenceEndpoint.path());
logger.info(msg);
}
return conferenceEndpoint;
}

Expand All @@ -188,7 +188,10 @@ public UntypedActor create() throws Exception {
}
});
ActorRef ivrEndpoint = system.actorOf(props);
endpoints.put(session, ivrEndpoint);
if (logger.isInfoEnabled()) {
String msg = String.format("MockMediaGateway, Added Ivr endpoint, path: %s",ivrEndpoint.path());
logger.info(msg);
}
return ivrEndpoint;
}

Expand All @@ -205,7 +208,10 @@ public UntypedActor create() throws Exception {
}
});
ActorRef link = system.actorOf(props);
links.put(session, link);
if (logger.isInfoEnabled()) {
String msg = String.format("MockMediaGateway, Added new Link, path: %s",link.path());
logger.info(msg);
}
return link;
}

Expand All @@ -222,7 +228,10 @@ public UntypedActor create() throws Exception {
}
});
ActorRef packetRelayEndpoint = system.actorOf(props);
endpoints.put(session, packetRelayEndpoint);
if (logger.isInfoEnabled()) {
String msg = String.format("MockMediaGateway, Added PacketRelay endpoint, path: %s",packetRelayEndpoint.path());
logger.info(msg);
}
return packetRelayEndpoint;
}

Expand Down Expand Up @@ -264,6 +273,7 @@ private void powerOn(final Object message) {
requestIdPool = new RevolvingCounter(1, Integer.MAX_VALUE);
sessionIdPool = new RevolvingCounter(1, Integer.MAX_VALUE);
transactionIdPool = new RevolvingCounter(1, Integer.MAX_VALUE);
monitoringService = request.getMonitoringService();
}

@Override
Expand Down Expand Up @@ -298,15 +308,24 @@ public void onReceive(final Object message) throws Exception {
sender.tell(new MediaGatewayResponse<ActorRef>(endpoint), self);
} else if (DestroyConnection.class.equals(klass)) {
final DestroyConnection request = (DestroyConnection) message;
connections.values().remove(request.connection());
if (logger.isInfoEnabled()) {
String msg = String.format("MockMediaGateway, Connection destroyed, path %s",request.connection().path());
logger.info(msg);
}
context.stop(request.connection());
} else if (DestroyLink.class.equals(klass)) {
final DestroyLink request = (DestroyLink) message;
links.values().remove(request.link());
if (logger.isInfoEnabled()) {
String msg = String.format("MockMediaGateway, Link destroyed, path %s",request.link().path());
logger.info(msg);
}
context.stop(request.link());
} else if (DestroyEndpoint.class.equals(klass)) {
final DestroyEndpoint request = (DestroyEndpoint) message;
endpoints.values().remove(request.endpoint());
if (logger.isInfoEnabled()) {
String msg = String.format("MockMediaGateway, Endpoint destroyed, path %s",request.endpoint().path());
logger.info(msg);
}
context.stop(request.endpoint());
} else if (message instanceof JainMgcpCommandEvent) {
send(message, sender);
Expand Down Expand Up @@ -336,14 +355,21 @@ private void createConnection(final Object message, final ActorRef sender) {
buffer.append(endpointIdPool.get());
endpointId = new EndpointIdentifier(buffer.toString(), domain);
}
connEndpointMap.put(connId.toString(), endpointId.getLocalEndpointName());
if (logger.isInfoEnabled()) {
String msg = String.format("About to add connId %s for endpoint %s", connId.toString(), endpointId.getLocalEndpointName());
logger.info(msg);
}
monitoringService.tell(new MgcpEndpointAdded(connId.toString(), endpointId.getLocalEndpointName()), self());
monitoringService.tell(new MgcpConnectionAdded(connId.toString(), endpointId.getLocalEndpointName()), self());
response.setSpecificEndpointIdentifier(endpointId);
// Create a new secondary end point id if necessary.
EndpointIdentifier secondaryEndpointId = crcx.getSecondEndpointIdentifier();
if (secondaryEndpointId != null) {
buffer = new StringBuilder();
buffer.append(connectionIdPool.get());
connId = new ConnectionIdentifier(buffer.toString());
response.setSecondConnectionIdentifier(connId);
ConnectionIdentifier secondayConnId = new ConnectionIdentifier(buffer.toString());
response.setSecondConnectionIdentifier(secondayConnId);
endpointName = secondaryEndpointId.getLocalEndpointName();
if (endpointName.endsWith("$")) {
final String[] tokens = endpointName.split("/");
Expand All @@ -353,6 +379,13 @@ private void createConnection(final Object message, final ActorRef sender) {
buffer.append(endpointIdPool.get());
secondaryEndpointId = new EndpointIdentifier(buffer.toString(), domain);
}
connEndpointMap.put(secondayConnId.toString(), secondaryEndpointId.getLocalEndpointName());
if (logger.isInfoEnabled()) {
String msg = String.format("About to add connId %s for secondary endpoint %s associated with endpont %s", secondayConnId.toString(), secondaryEndpointId.getLocalEndpointName(), endpointId.getLocalEndpointName());
logger.info(msg);
}
monitoringService.tell(new MgcpEndpointAdded(connId.toString(), secondaryEndpointId.getLocalEndpointName()), self());
monitoringService.tell(new MgcpConnectionAdded(secondayConnId.toString(), secondaryEndpointId.getLocalEndpointName()));
response.setSecondEndpointIdentifier(secondaryEndpointId);
}
final ConnectionDescriptor descriptor = new ConnectionDescriptor(sdp);
Expand All @@ -366,7 +399,11 @@ private void createConnection(final Object message, final ActorRef sender) {
private void modifyConnection(final Object message, final ActorRef sender) {
final ActorRef self = self();
final ModifyConnection mdcx = (ModifyConnection) message;
System.out.println(mdcx.toString());
System.out.println("MDCX: \n" +mdcx.toString());
if (logger.isInfoEnabled()) {
String msg = String.format("Got MDCX for endpoint %s connId %s, mdcx: \n%s", mdcx.getEndpointIdentifier().getLocalEndpointName(), mdcx.getConnectionIdentifier(), mdcx);
logger.info(msg);
}
ReturnCode code;
SessionDescription sessionDescription = null;
boolean isNonValidSdp = false;
Expand Down Expand Up @@ -400,6 +437,28 @@ private void modifyConnection(final Object message, final ActorRef sender) {
private void deleteConnection(final Object message, final ActorRef sender) {
final ActorRef self = self();
final DeleteConnection dlcx = (DeleteConnection) message;
if (dlcx.getConnectionIdentifier() == null) {
connEndpointMap.values().removeAll(Collections.singleton(dlcx.getEndpointIdentifier().getLocalEndpointName()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we log the result of this operation, i think it could be worth to print it, whether we deleted something or not...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaimecasero you mean the operation to remove from the Map? Yes I can add this at the log statement later

monitoringService.tell(new MgcpConnectionDeleted(null, dlcx.getEndpointIdentifier().getLocalEndpointName()), self());
monitoringService.tell(new MgcpEndpointDeleted(dlcx.getEndpointIdentifier().getLocalEndpointName()), self());
if (logger.isInfoEnabled()) {
String msg = String.format("Endpoint deleted %s", dlcx.getEndpointIdentifier().getLocalEndpointName());
logger.info(msg);
}
} else {
connEndpointMap.remove(dlcx.getConnectionIdentifier().toString());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, can we save the result of this operation for logging later?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure as above

monitoringService.tell(new MgcpConnectionDeleted(dlcx.getConnectionIdentifier().toString(), null), self());
//If all connections have been removed for a given Endpoint, we can consider that the endpoint is stopped (this is the RMS behavior)
//Here check if we have Endpoint ID on the map, and if not, tell MonitoringService that the endpoint stopped.
if (!connEndpointMap.values().contains(dlcx.getEndpointIdentifier().getLocalEndpointName())) {
monitoringService.tell(new MgcpEndpointDeleted(dlcx.getEndpointIdentifier().getLocalEndpointName()), self());
if (logger.isInfoEnabled()) {
String msg = String.format("Endpoint deleted %s since because there are no more connections related to this endpoint", dlcx.getEndpointIdentifier().getLocalEndpointName());
logger.info(msg);
}
}
}

System.out.println(dlcx.toString());
final ReturnCode code = ReturnCode.Transaction_Executed_Normally;
final DeleteConnectionResponse response = new DeleteConnectionResponse(self, code);
Expand Down
Loading