Skip to content

Commit

Permalink
CAMEL-10337: camel-asterix - Endpoint should be singleton
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Mar 23, 2017
1 parent 64c57a8 commit d032095
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 177 deletions.
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.asterisk;

import java.util.function.Function;

import org.apache.camel.Exchange;
import org.asteriskjava.manager.action.ExtensionStateAction;
import org.asteriskjava.manager.action.ManagerAction;
import org.asteriskjava.manager.action.QueueStatusAction;
import org.asteriskjava.manager.action.SipPeersAction;

public enum AsteriskAction implements Function<Exchange, ManagerAction> {
QUEUE_STATUS {
@Override
public ManagerAction apply(Exchange exchange) {
return new QueueStatusAction();
}
},
SIP_PEERS {
@Override
public ManagerAction apply(Exchange exchange) {
return new SipPeersAction();
}
},
EXTENSION_STATE {
@Override
public ManagerAction apply(Exchange exchange) {
return new ExtensionStateAction(
exchange.getIn().getHeader(AsteriskConstants.EXTENSION, String.class),
exchange.getIn().getHeader(AsteriskConstants.CONTEXT, String.class)
);
}
}
}

This file was deleted.

Expand Up @@ -20,19 +20,18 @@


import org.apache.camel.CamelContext; import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint; import org.apache.camel.Endpoint;
import org.apache.camel.impl.UriEndpointComponent; import org.apache.camel.impl.DefaultComponent;


/** /**
* Represents the component that manages {@link AsteriskEndpoint}. * Represents the component that manages {@link AsteriskEndpoint}.
*/ */
public class AsteriskComponent extends UriEndpointComponent { public class AsteriskComponent extends DefaultComponent {


public AsteriskComponent() { public AsteriskComponent() {
super(AsteriskEndpoint.class);
} }


public AsteriskComponent(CamelContext context) { public AsteriskComponent(CamelContext context) {
super(context, AsteriskEndpoint.class); super(context);
} }


@Override @Override
Expand Down
Expand Up @@ -29,35 +29,35 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


public class AsteriskConnection { public final class AsteriskConnection {
private static final Logger LOG = LoggerFactory.getLogger(AsteriskConnection.class); private static final Logger LOG = LoggerFactory.getLogger(AsteriskConnection.class);


private String host; private final String host;
private String username; private final String username;
private String password; private final String password;

private ManagerConnection managerConnection; private ManagerConnection managerConnection;


public AsteriskConnection(String host, String username, String password) { public AsteriskConnection(String host, String username, String password) {
this.host = host; this.host = host;
this.username = username; this.username = username;
this.password = password; this.password = password;

this.connect();
} }


private void connect() { public void connect() {
if (managerConnection == null) { if (managerConnection == null) {
LOG.debug("asterisk connection attempt to {} username: {}", host, username); LOG.debug("asterisk connection attempt to {} username: {}", host, username);


ManagerConnectionFactory factory = new ManagerConnectionFactory(host, username, password); ManagerConnectionFactory factory = new ManagerConnectionFactory(host, username, password);
managerConnection = factory.createManagerConnection(); managerConnection = factory.createManagerConnection();

LOG.debug("asterisk connection established!"); LOG.debug("asterisk connection established!");
} }
} }

public void login() throws IllegalStateException, IOException, AuthenticationFailedException, TimeoutException, CamelAsteriskException { public void login() throws IllegalStateException, IOException, AuthenticationFailedException, TimeoutException, CamelAsteriskException {
// Lazy connect if not done before
connect();

if (managerConnection != null && (managerConnection.getState() == ManagerConnectionState.DISCONNECTED || managerConnection.getState() == ManagerConnectionState.INITIAL)) { if (managerConnection != null && (managerConnection.getState() == ManagerConnectionState.DISCONNECTED || managerConnection.getState() == ManagerConnectionState.INITIAL)) {
managerConnection.login("on"); managerConnection.login("on");


Expand Down Expand Up @@ -87,10 +87,19 @@ public void addListener(ManagerEventListener listener) throws CamelAsteriskExcep
} }
} }


public void removeListener(ManagerEventListener listener) throws CamelAsteriskException {
if (managerConnection != null) {
managerConnection.removeEventListener(listener);

LOG.debug("asterisk removed listener {}", listener);
} else {
throw new CamelAsteriskException("Add listener operation, managerConnection is empty!");
}
}

public ManagerResponse sendAction(ManagerAction action) throws IllegalArgumentException, IllegalStateException, IOException, TimeoutException { public ManagerResponse sendAction(ManagerAction action) throws IllegalArgumentException, IllegalStateException, IOException, TimeoutException {
ManagerResponse response = managerConnection.sendAction(action); ManagerResponse response = managerConnection.sendAction(action);


return response; return response;
} }

} }
Expand Up @@ -17,12 +17,11 @@
package org.apache.camel.component.asterisk; package org.apache.camel.component.asterisk;


public final class AsteriskConstants { public final class AsteriskConstants {

public static final String EVENT_NAME = "CamelAsteriskEventName"; public static final String EVENT_NAME = "CamelAsteriskEventName";
public static final String EXTENSION = "CamelAsteriskExtension"; public static final String EXTENSION = "CamelAsteriskExtension";
public static final String CONTEXT = "CamelAsteriskContext"; public static final String CONTEXT = "CamelAsteriskContext";
public static final String ACTION = "CamelAsteriskAction";


private AsteriskConstants() { private AsteriskConstants() {
} }

} }
Expand Up @@ -16,34 +16,61 @@
*/ */
package org.apache.camel.component.asterisk; package org.apache.camel.component.asterisk;


import org.apache.camel.Exchange;
import org.apache.camel.Processor; import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer; import org.apache.camel.impl.DefaultConsumer;
import org.asteriskjava.manager.ManagerEventListener;
import org.asteriskjava.manager.event.ManagerEvent;


/** /**
* The Asterisk consumer. * The Asterisk consumer.
*/ */
public class AsteriskConsumer extends DefaultConsumer { public class AsteriskConsumer extends DefaultConsumer {
private final AsteriskEndpoint endpoint; private final AsteriskEndpoint endpoint;
private final AsteriskListenerTask task; private final AsteriskConnection connection;
private final ManagerEventListener listener;


public AsteriskConsumer(AsteriskEndpoint endpoint, Processor processor) { public AsteriskConsumer(AsteriskEndpoint endpoint, Processor processor) {
super(endpoint, processor); super(endpoint, processor);

this.endpoint = endpoint; this.endpoint = endpoint;
this.task = new AsteriskListenerTask(endpoint, this); this.connection = new AsteriskConnection(endpoint.getHostname(), endpoint.getUsername(), endpoint.getPassword());
this.listener = new EventListener();
} }


@Override @Override
protected void doStart() throws Exception { protected void doStart() throws Exception {
connection.connect();
connection.addListener(listener);
connection.login();

super.doStart(); super.doStart();
log.info("Starting Asterisk AMI Event Listener");
endpoint.addListener(task);
endpoint.login();
} }


@Override @Override
protected void doStop() throws Exception { protected void doStop() throws Exception {
super.doStop(); super.doStop();
log.info("Stopping Asterisk AMI Event Listener");
endpoint.logoff(); connection.removeListener(listener);
connection.logoff();
}

// *******************************
//
// *******************************

private final class EventListener implements ManagerEventListener {
@Override
public void onManagerEvent(ManagerEvent event) {
Exchange exchange = endpoint.createExchange();
exchange.getIn().setHeader(AsteriskConstants.EVENT_NAME, event.getClass().getSimpleName());
exchange.getIn().setBody(event);

try {
getProcessor().process(exchange);
} catch (Exception e) {
getExceptionHandler().handleException("Error processing exchange.", exchange, e);
}
}
} }
} }
Expand Up @@ -16,45 +16,38 @@
*/ */
package org.apache.camel.component.asterisk; package org.apache.camel.component.asterisk;


import java.io.IOException;

import org.apache.camel.Consumer; import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor; import org.apache.camel.Processor;
import org.apache.camel.Producer; import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.Metadata; import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath; import org.apache.camel.spi.UriPath;
import org.asteriskjava.manager.AuthenticationFailedException; import org.apache.camel.util.ObjectHelper;
import org.asteriskjava.manager.ManagerEventListener;
import org.asteriskjava.manager.TimeoutException;
import org.asteriskjava.manager.action.ManagerAction;
import org.asteriskjava.manager.event.ManagerEvent;
import org.asteriskjava.manager.response.ManagerResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/** /**
* The asterisk component is used to interact with Asterisk PBX Server <a href="http://www.asterisk.org">Asterisk PBX Server</a>. * The asterisk component is used to interact with Asterisk PBX Server <a href="http://www.asterisk.org">Asterisk PBX Server</a>.
*/ */
@UriEndpoint(firstVersion = "2.18.0", scheme = "asterisk", title = "Asterisk", syntax = "asterisk:name", consumerClass = AsteriskConsumer.class, label = "voip") @UriEndpoint(firstVersion = "2.18.0", scheme = "asterisk", title = "Asterisk", syntax = "asterisk:name", consumerClass = AsteriskConsumer.class, label = "voip")
public class AsteriskEndpoint extends DefaultEndpoint { public class AsteriskEndpoint extends DefaultEndpoint {
@SuppressWarnings("unused") @UriPath(description = "Name of component")
private static final Logger LOG = LoggerFactory.getLogger(AsteriskProducer.class); @Metadata(required = "true")

private AsteriskConnection asteriskConnection;

@UriPath(description = "Name of component") @Metadata(required = "true")
private String name; private String name;

@UriParam @UriParam
@Metadata(required = "true")
private String hostname; private String hostname;

@UriParam(label = "producer") @UriParam(label = "producer")
private AsteriskActionEnum action; private AsteriskAction action;

@UriParam(secret = true) @UriParam(secret = true)
@Metadata(required = "true")
private String username; private String username;

@UriParam(secret = true) @UriParam(secret = true)
@Metadata(required = "true")
private String password; private String password;


public AsteriskEndpoint(String uri, AsteriskComponent component) { public AsteriskEndpoint(String uri, AsteriskComponent component) {
Expand All @@ -63,57 +56,25 @@ public AsteriskEndpoint(String uri, AsteriskComponent component) {


@Override @Override
protected void doStart() throws Exception { protected void doStart() throws Exception {
super.doStart(); // Validate mandatory option

ObjectHelper.notNull(hostname, "hostname");
asteriskConnection = new AsteriskConnection(hostname, username, password); ObjectHelper.notNull(username, "username");
ObjectHelper.notNull(password, "password");
} }


@Override @Override
protected void doStop() throws Exception {
super.doStop();
// do not exist disconnect operation!!!
asteriskConnection = null;
}

public Producer createProducer() throws Exception { public Producer createProducer() throws Exception {
if (action == null) {
throw new IllegalArgumentException("Missing required action parameter");
}

return new AsteriskProducer(this); return new AsteriskProducer(this);
} }


@Override
public Consumer createConsumer(Processor processor) throws Exception { public Consumer createConsumer(Processor processor) throws Exception {
return new AsteriskConsumer(this, processor); return new AsteriskConsumer(this, processor);
} }


@Override
public boolean isSingleton() { public boolean isSingleton() {
// TODO: prefer to be singleton and do not have state on the endpoint return true;
// the asteriskConnection should be createed on the consumer / producer instance and be private there
return false;
}

public void addListener(ManagerEventListener listener) throws CamelAsteriskException {
asteriskConnection.addListener(listener);
}

public void login() throws IllegalStateException, IOException, AuthenticationFailedException, TimeoutException, CamelAsteriskException {
asteriskConnection.login();
}

public void logoff() throws CamelAsteriskException {
asteriskConnection.logoff();
}

public Exchange createExchange(ManagerEvent event) {
Exchange exchange = super.createExchange();
exchange.getIn().setHeader(AsteriskConstants.EVENT_NAME, event.getClass().getSimpleName());
exchange.getIn().setBody(event);
return exchange;
}

public ManagerResponse sendAction(ManagerAction action) throws IllegalArgumentException, IllegalStateException, IOException, TimeoutException {
return asteriskConnection.sendAction(action);
} }


public String getUsername() { public String getUsername() {
Expand All @@ -138,14 +99,14 @@ public void setPassword(String password) {
this.password = password; this.password = password;
} }


public AsteriskActionEnum getAction() { public AsteriskAction getAction() {
return action; return action;
} }


/** /**
* What action to perform such as getting queue status, sip peers or extension state. * What action to perform such as getting queue status, sip peers or extension state.
*/ */
public void setAction(AsteriskActionEnum action) { public void setAction(AsteriskAction action) {
this.action = action; this.action = action;
} }


Expand All @@ -154,7 +115,7 @@ public String getHostname() {
} }


/** /**
* The hostname of the asterix server * The hostname of the asterisk server
*/ */
public void setHostname(String hostname) { public void setHostname(String hostname) {
this.hostname = hostname; this.hostname = hostname;
Expand Down

0 comments on commit d032095

Please sign in to comment.