Skip to content
Permalink
Browse files
JUDDI-241 adding a kick starter function to start the replication pro…
…cess on server start up. updating documentation for the new config settings
  • Loading branch information
spyhunter99 committed Jan 31, 2015
1 parent ac44bfd commit a07e1ccdba46d3e2cf20ed45cafd4daf3507a689
Showing 5 changed files with 111 additions and 28 deletions.
@@ -118,9 +118,9 @@ RMI Proxy properties that can be referenced in the _juddiv3.xml_ file and is onl
|Property Name |Description |Required |Default Value or [Example Value]
|_juddi/subscription/expiration/days_|Days before a subscription expires|N|[_30_]
|_juddi/subscription/chunkexpiration/minutes_| Minutes before a "chunked" subscription call expires|N|[_5_]
|_juddi/notification/interval_|Specifies the interval at which the notification timer triggers. This is the upper boundary set by the registry. Between the user defined endDate of a Subscription and this value, the registry will pick the earliest date.|N|_3000000_
|_juddi/notification/start/buffer_|Specifies the amount of time to wait before the notification timer initially fires|N|20000
|_juddi/notification/acceptableLagtime_|Specifies the amount of time (in ms) from which to determine if the server is overload and to skip notifications. Notifications during this cycle will not be repeated (i.e. never be delivered)|N|10000
|_juddi/notification/interval_|Specifies the interval at which the notification timer triggers. This is the upper boundary set by the registry. Between the user defined endDate of a Subscription and this value, the registry will pick the earliest date. (in ms)|N|_3000000_
|_juddi/notification/start/buffer_|Specifies the amount of time to wait before the notification timer initially fires. (in ms)|N|20000
|_juddi/notification/acceptableLagtime_|Specifies the amount of time (in ms) from which to determine if the server is overload and to skip notifications. Notifications during this cycle will not be repeated (i.e. never be delivered). (in ms)|N|10000
|_juddi/notification/maxTries_|Specifies the number of times to attempt the delivery of messages to subscribers.|N|3
|_juddi/notification/maxTriesResetInterval_|Once the maximum delivery attempts have been made, the server will add that endpoint to an ignore list, which is reset every N ms.|N|600000
|_juddi/notification/sendAuthTokenWithResultList_|Sends a valid authentication token for the owning user of the subscription in the subscription notification result message. Unless it is specifically needed, this is recommended to be set to false.|N|false
@@ -177,6 +177,19 @@ Perofrmance properties are referenced in the _juddiv3.xm_ file.
|_juddi/performance/enableFindBusinessTModelBagFiltering| UDDI defines a mechansim to filter findBusiness relates based on tModelInstanceInfo within their service's binding templates. This is an expensive operation and will cause significant performance degredation on larger registries. For spec complliance, it should be set to true. We suspect it's not a commonly used feature and recommend setting this to false. |N| true
|===========================================================================================

=== Replication

.These properties are used to tweak the replication service capabilities.

Perofrmance properties are referenced in the _juddiv3.xm_ file.
[options="header"]
|===========================================================================================
|Property Name |Description |Required |Default Value or [Example Value]
|_juddi/replication/getChangeRecordsMax| The maximum number of records to return from a getChangeRecord request |N| 100
|_juddi.replication.start.buffer | Specifies the amount of time to wait before the replication timer initially fires. (in ms) | N | 5000
|_juddi.replication.interval | Specifies the interval at which the replication timer triggers (in ms). | N | 5000
|===========================================================================================


=== Deploying two or more jUDDI server on the same application server

@@ -30,6 +30,7 @@
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.jws.WebParam;
import javax.jws.WebResult;
@@ -72,6 +73,7 @@
import org.uddi.repl_v3.ChangeRecordAcknowledgement;
import org.uddi.repl_v3.ChangeRecordIDType;
import org.uddi.repl_v3.ChangeRecords;
import org.uddi.repl_v3.CommunicationGraph.Edge;
import org.uddi.repl_v3.DoPing;
import org.uddi.repl_v3.GetChangeRecords;
import org.uddi.repl_v3.HighWaterMarkVectorType;
@@ -180,24 +182,20 @@ private static Set<String> diffNodeList(Set<String> oldnodes, Set<String> newNod
private UDDIServiceCounter serviceCounter;

private static PullTimerTask timer = null;
private long startBuffer = 5000l;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default
private long interval = 5000l;// AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default
private long startBuffer;
private long interval;

private static UDDIPublicationImpl pub = null;

public UDDIReplicationImpl() {
public UDDIReplicationImpl() throws ConfigurationException {
super();
this.interval = AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_INTERVAL, 5000L);
this.startBuffer = AppConfig.getConfiguration().getLong(Property.JUDDI_REPLICATION_START_BUFFER, 5000L);
if (pub == null) {
pub = new UDDIPublicationImpl();
}
serviceCounter = ServiceCounterLifecycleResource.getServiceCounter(UDDIReplicationImpl.class);
Init();
try {
startBuffer = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default
interval = 5000;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default
} catch (ConfigurationException ex) {
logger.fatal(ex);
}

}

@@ -234,9 +232,14 @@ public PullTimerTask() {
timer = new Timer(true);
timer.scheduleAtFixedRate(this, startBuffer, interval);
}
boolean firstrun = true;

@Override
public void run() {
if (firstrun) {
enqueueAllReceivingNodes();
firstrun = false;
}

if (!queue.isEmpty()) {
logger.info("Replication change puller thread started. Queue size: " + queue.size());
@@ -258,12 +261,11 @@ public void run() {
// logger.info("Node " + poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID()
// + " USN " + poll.getChangesAvailable().getHighWaterMark().get(xx).getOriginatingUSN());
//}
Set<String> nodesHitThisCycle=new HashSet<String>();
Set<String> nodesHitThisCycle = new HashSet<String>();
for (int xx = 0; xx < poll.getChangesAvailable().getHighWaterMark().size(); xx++) {
int recordsreturned = 21;
while (recordsreturned >= 20) {
if (nodesHitThisCycle.contains(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID()))
{
if (nodesHitThisCycle.contains(poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID())) {
logger.info("i've already hit the node " + poll.getChangesAvailable().getHighWaterMark().get(xx).getNodeID() + " this cycle, skipping");
break;
}
@@ -763,11 +765,11 @@ private void PersistChangeRecord(ChangeRecord rec) {

}
if (rec.getChangeRecordCorrection() != null) {
//TODO
//TODO implement

}
if (rec.getChangeRecordConditionFailed() != null) {
//TODO
//TODO implement

}
tx.commit();
@@ -840,6 +842,55 @@ private HighWaterMarkVectorType getLastChangeRecordFrom(String sourcenode) {
return ret;
}

private void enqueueAllReceivingNodes() {
if (queue == null) {
queue = new ConcurrentLinkedQueue<NotifyChangeRecordsAvailable>();
}
//get the replication config
//get everyone we are expecting to receive data from, then enqueue them for pulling
ReplicationConfiguration repcfg = ReplicationNotifier.FetchEdges();
if (repcfg == null) {
return;
}
Set<String> allnodes = new HashSet<String>();
for (int i = 0; i < repcfg.getOperator().size(); i++) {
allnodes.add(repcfg.getOperator().get(i).getOperatorNodeID());
}
Set<String> receivers = new HashSet<String>();
if (repcfg.getCommunicationGraph() == null
|| repcfg.getCommunicationGraph().getEdge().isEmpty()) {
//no edges or graph defined, default to the operator list
for (org.uddi.repl_v3.Operator o : repcfg.getOperator()) {
//no need to tell myself about a change at myself
if (!o.getOperatorNodeID().equalsIgnoreCase(node)) {
receivers.add(o.getOperatorNodeID());
}
}
} else {
//repcfg.getCommunicationGraph()
Iterator<Edge> iterator = repcfg.getCommunicationGraph().getEdge().iterator();
while (iterator.hasNext()) {
Edge next = iterator.next();

if (next.getMessageReceiver().equalsIgnoreCase(node)) {
receivers.add(next.getMessageSender());
}

}

}
for (String s : receivers) {
//this is a list of nodes that this node is expecting updates from
//here are we ticking the notification engine to ping the remove service for updates
for (String nodeping : allnodes) {
queue.add(new NotifyChangeRecordsAvailable(s, getLastChangeRecordFrom(nodeping)));
//for each node we are expecting data from, go fetch it, along the way, we'll request all data for all nodes
//that we know about
}

}
}

}

/**
@@ -991,11 +1042,11 @@ public org.uddi.repl_v3.ChangeRecords getChangeRecords(
* recent change record that has been successfully processed by
* the invocating node
*/
int maxrecords = 100; //TODO config this
if (responseLimitCount != null) {
maxrecords = responseLimitCount.intValue();
}
try {
int maxrecords = AppConfig.getConfiguration().getInt(Property.JUDDI_REPLICATION_GET_CHANGE_RECORDS_MAX, 100);
if (responseLimitCount != null) {
maxrecords = responseLimitCount.intValue();
}
tx.begin();
Long firstrecord = 0L;
Long lastrecord = null;
@@ -227,7 +227,7 @@ public interface Property {
*
* @since 3.3
*/
public static String JUDDI_ENABLE_FIND_BUSINESS_TMODEL_BAG_FILTERING = "juddi.preformance.enableFindBusinessTModelBagFiltering";
public static final String JUDDI_ENABLE_FIND_BUSINESS_TMODEL_BAG_FILTERING = "juddi.preformance.enableFindBusinessTModelBagFiltering";
/**
* When set to true, juddi with reject publish requests when at least
* one digitally signed entity cannot be cryptographically validated
@@ -237,9 +237,24 @@ public interface Property {
*
* @since 3.3
*/
public static String JUDDI_REJECT_ENTITIES_WITH_INVALID_SIG_PREFIX = "juddi.validation.rejectInvalidSignatures.";
public static final String JUDDI_REJECT_ENTITIES_WITH_INVALID_SIG_PREFIX = "juddi.validation.rejectInvalidSignatures.";
/**
* @since 3.3.
*/
public static String JUDDI_REJECT_ENTITIES_WITH_INVALID_SIG_ENABLE = "juddi.validation.rejectInvalidSignatures.enable";

/**
* @since 3.3
*/
public static final String JUDDI_REPLICATION_GET_CHANGE_RECORDS_MAX="juddi.replication.getChangeRecordsMax";

/**
* @since 3.3
*/
public static final String JUDDI_REPLICATION_START_BUFFER = "juddi.replication.start.buffer";

/**
* @since 3.3
*/
public static final String JUDDI_REPLICATION_INTERVAL="juddi.replication.interval";
}
@@ -1397,9 +1397,9 @@ public static org.apache.juddi.model.ChangeRecord mapChangeRecord(ChangeRecord r
//r.setId(rec.getChangeID().getOriginatingUSN());
r.setOriginatingUSN(rec.getChangeID().getOriginatingUSN());
if (r.getOriginatingUSN()==null){
logger.warn("strange, the getOriginatingUSN is null!!");
JAXB.marshal(rec, System.out);
Thread.dumpStack();
// logger.warn("strange, the getOriginatingUSN is null!!");
// JAXB.marshal(rec, System.out);
// Thread.dumpStack();
}
r.setNodeID(rec.getChangeID().getNodeID());
if (rec.getChangeRecordNewData() != null) {
@@ -56,6 +56,11 @@
@XmlRootElement(name = "notify_changeRecordsAvailable")
public class NotifyChangeRecordsAvailable implements Serializable{
public NotifyChangeRecordsAvailable(){}
/**
*
* @param node notifying node
* @param changesAvailable latest record available
*/
public NotifyChangeRecordsAvailable(String node, HighWaterMarkVectorType changesAvailable){
this.notifyingNode = node;
this.changesAvailable = changesAvailable;
@@ -115,5 +120,4 @@ public void setChangesAvailable(HighWaterMarkVectorType value) {
this.changesAvailable = value;
}

}

}

0 comments on commit a07e1cc

Please sign in to comment.