Skip to content

Commit

Permalink
0002108: Add processinfo to track when symmetricds is "inserting load…
Browse files Browse the repository at this point in the history
… events" on the route job thread
  • Loading branch information
chenson42 committed Dec 19, 2014
1 parent caf836e commit dec7247
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
Expand Up @@ -27,7 +27,7 @@ public class ProcessInfoKey implements Serializable {
private static final long serialVersionUID = 1L;

public enum ProcessType {
PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, REST_PULL_HANLDER, ROUTER_JOB, GAP_DETECT, ROUTER_READER, MANUAL_LOAD, FILE_SYNC_PULL_JOB, FILE_SYNC_PUSH_JOB, FILE_SYNC_PULL_HANDLER, FILE_SYNC_PUSH_HANDLER, INITIAL_LOAD_EXTRACT_JOB;
PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, REST_PULL_HANLDER, ROUTER_JOB, INSERT_LOAD_EVENTS, GAP_DETECT, ROUTER_READER, MANUAL_LOAD, FILE_SYNC_PULL_JOB, FILE_SYNC_PUSH_JOB, FILE_SYNC_PULL_HANDLER, FILE_SYNC_PUSH_HANDLER, INITIAL_LOAD_EXTRACT_JOB;

public String toString() {
switch (this) {
Expand Down Expand Up @@ -57,6 +57,8 @@ public String toString() {
return "Service File Sync Push";
case REST_PULL_HANLDER:
return "REST Pull";
case INSERT_LOAD_EVENTS:
return "Inserting Load Events";
case INITIAL_LOAD_EXTRACT_JOB:
return "Initial Load Extractor";
default:
Expand Down
Expand Up @@ -156,11 +156,14 @@ public synchronized void stop() {
*/
synchronized public long routeData(boolean force) {
long dataCount = -1l;
if (engine.getNodeService().findIdentity() != null) {
Node identity = engine.getNodeService().findIdentity();
if (identity != null) {
if (force || engine.getClusterService().lock(ClusterConstants.ROUTE)) {
try {
engine.getOutgoingBatchService().updateAbandonedRoutingBatches();

insertInitialLoadEvents();

long ts = System.currentTimeMillis();
DataGapDetector gapDetector = new DataGapDetector(
engine.getDataService(), parameterService, symmetricDialect,
Expand Down Expand Up @@ -188,7 +191,13 @@ synchronized public long routeData(boolean force) {
* triggers is running.
*/
protected void insertInitialLoadEvents() {

ProcessInfo processInfo = engine.getStatisticManager().newProcessInfo(
new ProcessInfoKey(engine.getNodeService().findIdentityNodeId(), null, ProcessType.INSERT_LOAD_EVENTS));
processInfo.setStatus(ProcessInfo.Status.PROCESSING);

try {

INodeService nodeService = engine.getNodeService();
Node identity = nodeService.findIdentity();
if (identity != null) {
Expand Down Expand Up @@ -256,7 +265,9 @@ protected void insertInitialLoadEvents() {
}
}

processInfo.setStatus(ProcessInfo.Status.OK);
} catch (Exception ex) {
processInfo.setStatus(ProcessInfo.Status.ERROR);
log.error("", ex);
}

Expand Down

0 comments on commit dec7247

Please sign in to comment.