Skip to content
Permalink
Browse files
JUDDI-906 more test case enhancements
JUDDI-910 basics of directed graph support added
JUDDI-904 additional output to hopefully back trace the issue
  • Loading branch information
spyhunter99 committed Jan 11, 2015
1 parent 24031fc commit 31c1461028b1052c38563f8369246fbc6d3d4be9
Showing 8 changed files with 181 additions and 90 deletions.
@@ -275,7 +275,7 @@ public void run() {
recordsreturned = records.size();
}
} catch (Exception ex) {
logger.error("Error caught fetching replication changes from " + poll, ex);
logger.error("Error caught fetching replication changes from " + poll + " @" + ((BindingProvider)replicationClient).getRequestContext().get(BindingProvider.ENDPOINT_ADDRESS_PROPERTY), ex);
}
}
} else {
@@ -15,20 +15,15 @@
* limitations under the License.
*/

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import javax.persistence.AttributeOverride;
import javax.persistence.AttributeOverrides;
import javax.persistence.CascadeType;
import javax.persistence.Column;
import javax.persistence.EmbeddedId;
import javax.persistence.Entity;
import javax.persistence.FetchType;
import javax.persistence.JoinColumn;
import javax.persistence.ManyToOne;
import javax.persistence.OneToMany;
import javax.persistence.OrderBy;
import javax.persistence.Table;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;
@@ -17,8 +17,11 @@
package org.apache.juddi.replication;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -39,6 +42,7 @@
import org.apache.juddi.model.ReplicationConfiguration;
import org.apache.juddi.v3.client.UDDIService;
import org.uddi.repl_v3.ChangeRecordIDType;
import org.uddi.repl_v3.CommunicationGraph.Edge;
import org.uddi.repl_v3.HighWaterMarkVectorType;
import org.uddi.repl_v3.NotifyChangeRecordsAvailable;
import org.uddi.repl_v3.Operator;
@@ -60,6 +64,7 @@ public class ReplicationNotifier extends TimerTask {
private long interval = 5000;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default
private long acceptableLagTime = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_ACCEPTABLE_LAGTIME, 10000l); //1000 milliseconds
private static String node = null;
private static UDDIService uddiService = new UDDIService();

/**
* default constructor
@@ -105,13 +110,12 @@ protected void ProcessChangeRecord(ChangeRecord j) {
tx = em.getTransaction();
tx.begin();


em.persist(j);
log.debug("CR saved locally, it was from " + j.getNodeID() +
" USN:" + j.getOriginatingUSN() +
" Type:" + j.getRecordType().name() +
" Key:"+j.getEntityKey() +
" Local id:"+j.getId());
log.debug("CR saved locally, it was from " + j.getNodeID()
+ " USN:" + j.getOriginatingUSN()
+ " Type:" + j.getRecordType().name()
+ " Key:" + j.getEntityKey()
+ " Local id:" + j.getId());
tx.commit();
} catch (Exception ex) {
log.error("error", ex);
@@ -129,70 +133,124 @@ protected void ProcessChangeRecord(ChangeRecord j) {
//TODO figure out what this statement means 7.5.3
/**
* In the absence of a communicationGraph element from the
* Replication Configuration Structure (although it's mandatory in the xsd), all nodes listed in the
* node element MAY send any and all messages to any other node
* of the registry.
* Replication Configuration Structure (although it's mandatory
* in the xsd), all nodes listed in the node element MAY send
* any and all messages to any other node of the registry.
*/
if (repcfg == null) {
log.debug("No replication configuration is defined!");
return;

}
List<String> destinationUrls = new ArrayList<String>();
Set<Object> destinationUrls = new HashSet<Object>();

for (Operator o:repcfg.getOperator())
{
//no need to tell myself about a change at myself
if (!o.getOperatorNodeID().equalsIgnoreCase(node))
destinationUrls.add(o.getSoapReplicationURL());
}
/*
Iterator<String> iterator = repcfg.getCommunicationGraph().getNode().iterator();
while (iterator.hasNext()) {
String next = iterator.next();
Node destinationNode = getNode(next);
if (destinationNode == null) {
log.warn(next + " node was not found, cannot deliver replication messages");
} else {
destinationUrls.add(destinationNode.getReplicationUrl());
if (repcfg.getCommunicationGraph() == null
|| repcfg.getCommunicationGraph().getEdge().isEmpty()) {
//no edges or graph defined, default to the operator list
for (Operator o : repcfg.getOperator()) {
//no need to tell myself about a change at myself
if (!o.getOperatorNodeID().equalsIgnoreCase(node)) {
destinationUrls.add(o.getSoapReplicationURL());
}
}
}
Iterator<CommunicationGraph.Edge> it = repcfg.getCommunicationGraph().getEdge().iterator();
} else {
//repcfg.getCommunicationGraph()
Iterator<Edge> iterator = repcfg.getCommunicationGraph().getEdge().iterator();
while (iterator.hasNext()) {
Edge next = iterator.next();

while (it.hasNext()) {
//send each change set to the replication node in the graph
if (next.getMessageSender().equalsIgnoreCase(node)) {

CommunicationGraph.Edge next = it.next();
//next.getMessageReceiver(); //Node ID
Node destinationNode = getNode(next.getMessageReceiver());
if (destinationNode == null) {
log.warn(next.getMessageReceiver() + " node was not found, cannot deliver replication messages");
} else {
destinationUrls.add(destinationNode.getReplicationUrl());
//this is my server and i need to transmit the notification to
String messageReceiver = next.getMessageReceiver();
PrimaryAlternate container = new PrimaryAlternate();

for (int x = 0; x < repcfg.getOperator().size(); x++) {
if (repcfg.getOperator().get(x).getOperatorNodeID().equalsIgnoreCase(messageReceiver)) {
container.primaryUrl = repcfg.getOperator().get(x).getSoapReplicationURL();
}
}
for (int y = 0; y < next.getMessageReceiverAlternate().size(); y++) {
for (int x = 0; x < repcfg.getOperator().size(); x++) {
if (repcfg.getOperator().get(x).getOperatorNodeID().equalsIgnoreCase(next.getMessageReceiverAlternate().get(y))) {
container.alternateUrls.add(repcfg.getOperator().get(x).getSoapReplicationURL());
}
}
}
if (container.primaryUrl!=null)
destinationUrls.add(container);

}

}
}*/
for (String s : destinationUrls) {
//TODO the spec talks about control messages, should we even support it? seems pointless
UDDIReplicationPortType x = new UDDIService().getUDDIReplicationPort();
((BindingProvider) x).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, s);

}

UDDIReplicationPortType x = uddiService.getUDDIReplicationPort();
if (destinationUrls.isEmpty())
log.fatal("Something is bizarre with the replication config. I should have had at least one node to notify, but I have none!");
for (Object s : destinationUrls) {

NotifyChangeRecordsAvailable req = new NotifyChangeRecordsAvailable();

req.setNotifyingNode(node);
HighWaterMarkVectorType highWaterMarkVectorType = new HighWaterMarkVectorType();

highWaterMarkVectorType.getHighWaterMark().add(new ChangeRecordIDType(node, j.getId()));
req.setChangesAvailable(highWaterMarkVectorType);


if (s instanceof String)
SendNotification(x,(String)s, req);
else if (s instanceof PrimaryAlternate)
{
//more complex directed graph stuff
PrimaryAlternate pa = (PrimaryAlternate)s;
if (!SendNotification(x, pa.primaryUrl, req))
{
for (String url : pa.alternateUrls){
if (SendNotification(x, url, req))
break;
//no need to continue to additional alternates
}
}
else
{
//primary url succeeded, no further action required
}

}

//TODO the spec talks about control messages, should we even support it? seems pointless


}
}

/**
* return true if successful
* @param x
* @param s
* @param req
* @return
*/
private boolean SendNotification(UDDIReplicationPortType x, String s, NotifyChangeRecordsAvailable req) {
((BindingProvider) x).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, s);
try {
x.notifyChangeRecordsAvailable(req);
log.debug("Successfully sent change record available message to " + s);
return true;
} catch (Exception ex) {
log.warn("Unable to send change notification to " + s );
log.warn("Unable to send change notification to " + s);
log.debug("Unable to send change notification to " + s, ex);
}
}
return false;
}

class PrimaryAlternate {

String primaryUrl=null;
List<String> alternateUrls=new ArrayList<String>();
}

public synchronized void run() {
@@ -201,13 +259,14 @@ public synchronized void run() {
queue = new ConcurrentLinkedQueue();
}
//TODO revisie this
if (!queue.isEmpty())
if (!queue.isEmpty()) {
log.info("Replication, Notifying nodes of new change records. " + queue.size() + " queued");
}

//TODO check for replication config changes
while (!queue.isEmpty()) {
//for each change at this node

ChangeRecord j = queue.poll();
ProcessChangeRecord(j);

@@ -250,6 +309,7 @@ public static org.uddi.repl_v3.ReplicationConfiguration FetchEdges() {
return null;
}

@Deprecated
private Node getNode(String messageSender) {
EntityManager em = PersistenceManager.getEntityManager();
EntityTransaction tx = null;
@@ -138,6 +138,22 @@ public void validateSetReplicationNodes(ReplicationConfiguration replicationConf
throw new InvalidValueException(new ErrorMessage("errors.replication.contactNull"));
}

if (replicationConfiguration.getOperator() == null || replicationConfiguration.getOperator().isEmpty()) {
throw new InvalidValueException(new ErrorMessage("errors.replication.contactNull", "Operator is null or empty"));
}
for (int i = 0; i < replicationConfiguration.getOperator().size(); i++) {
if (replicationConfiguration.getOperator().get(i).getSoapReplicationURL() == null
|| "".equals(replicationConfiguration.getOperator().get(i).getSoapReplicationURL())) {
throw new InvalidValueException(new ErrorMessage("errors.replication.contactNull", "Replication URL is null or empty"));
}
if (!replicationConfiguration.getOperator().get(i).getSoapReplicationURL().toLowerCase().startsWith("http")) {
throw new InvalidValueException(new ErrorMessage("errors.replication.contactNull", "Replication URL is invalid, only HTTP is supported"));
}
if (replicationConfiguration.getOperator().get(i).getOperatorNodeID() == null
|| replicationConfiguration.getOperator().get(i).getOperatorNodeID().equalsIgnoreCase("")) {
throw new InvalidValueException(new ErrorMessage("errors.replication.contactNull", "Node ID is not defined"));
}
}
if (replicationConfiguration.getCommunicationGraph() != null) {
for (String s : replicationConfiguration.getCommunicationGraph().getNode()) {
if (!Contains(replicationConfiguration.getOperator(), s)) {
@@ -181,32 +197,35 @@ private boolean Contains(List<Operator> operator, String s) {
}

public void validateTransfer(EntityManager em, TransferCustody body) throws DispositionReportFaultMessage {

if (body == null)
throw new TransferNotAllowedException(new ErrorMessage("errors.NullInput"));
if (body.getTransferToken()==null)

if (body == null) {
throw new TransferNotAllowedException(new ErrorMessage("errors.NullInput"));
}
if (body.getTransferToken() == null) {
throw new TransferNotAllowedException(new ErrorMessage("errors.NullInput"));
if (body.getKeyBag()==null)
}
if (body.getKeyBag() == null) {
throw new TransferNotAllowedException(new ErrorMessage("errors.NullInput"));
if (body.getTransferOperationalInfo()==null)
}
if (body.getTransferOperationalInfo() == null) {
throw new TransferNotAllowedException(new ErrorMessage("errors.NullInput"));

if (body.getTransferOperationalInfo().getNodeID()==null)
}

if (body.getTransferOperationalInfo().getNodeID() == null) {
throw new TransferNotAllowedException(new ErrorMessage("errors.NullInput"));
if (body.getTransferOperationalInfo().getAuthorizedName()==null)
}
if (body.getTransferOperationalInfo().getAuthorizedName() == null) {
throw new TransferNotAllowedException(new ErrorMessage("errors.NullInput"));

}

//confirm i own the records in question
//confirm i issued the transfer token

TransferEntities x = new TransferEntities();
x.setKeyBag(body.getKeyBag());
x.setTransferToken(body.getTransferToken());
String transferTokenId = new String(body.getTransferToken().getOpaqueToken());
new ValidateCustodyTransfer(null).validateTransferLocalEntities(em, transferTokenId, body.getKeyBag().getKey() );


new ValidateCustodyTransfer(null).validateTransferLocalEntities(em, transferTokenId, body.getKeyBag().getKey());

}

}

0 comments on commit 31c1461

Please sign in to comment.