Skip to content

Commit

Permalink
0001139: Expose information about the processes that are currently ru…
Browse files Browse the repository at this point in the history
…nning via an api to be used to inspect what is going on in an engine
  • Loading branch information
chenson42 committed Mar 26, 2013
1 parent d966793 commit b580c93
Show file tree
Hide file tree
Showing 13 changed files with 514 additions and 42 deletions.
Expand Up @@ -294,7 +294,7 @@ protected void init() {
this.nodeCommunicationService = buildNodeCommunicationService(clusterService, nodeService, parameterService, symmetricDialect);
this.pushService = new PushService(parameterService, symmetricDialect,
dataExtractorService, acknowledgeService, transportManager, nodeService,
clusterService, nodeCommunicationService);
clusterService, nodeCommunicationService, statisticManager);
this.pullService = new PullService(parameterService, symmetricDialect, nodeService,
dataLoaderService, registrationService, clusterService, nodeCommunicationService);
this.jobManager = createJobManager();
Expand Down
@@ -0,0 +1,189 @@
/*
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU Lesser General Public License (the
* "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.model;

import java.io.Serializable;
import java.util.Date;

import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;

public class ProcessInfo implements Serializable, Comparable<ProcessInfo> {

private static final long serialVersionUID = 1L;

public static enum Status {
NEW, EXTRACTING, LOADING, TRANSFERRING, DONE, ERROR
};

private ProcessInfoKey key;

private Status status = Status.NEW;

private long dataCount;

private long batchCount;

private long currentBatchId;

private String currentChannelId;

private String currentTableName;

private transient Thread thread;

private Date startTime = new Date();

private Date lastStatusChangeTime = new Date();

private Date endTime;

public ProcessInfo() {
this(new ProcessInfoKey("", "", ProcessInfoKey.ProcessType.TEST));
}

public ProcessInfo(ProcessInfoKey key) {
this.key = key;
thread = Thread.currentThread();
}

public String getSourceNodeId() {
return this.key.getSourceNodeId();
}

public String getTargetNodeId() {
return this.key.getTargetNodeId();
}

public ProcessType getProcessType() {
return this.key.getProcessType();
}

public ProcessInfoKey getKey() {
return key;
}

public void setKey(ProcessInfoKey key) {
this.key = key;
}

public Status getStatus() {
return status;
}

public void setStatus(Status status) {
this.status = status;
this.lastStatusChangeTime = new Date();
}

public long getDataCount() {
return dataCount;
}

public void setDataCount(long dataCount) {
this.dataCount = dataCount;
}

public long getBatchCount() {
return batchCount;
}

public void setBatchCount(long batchCount) {
this.batchCount = batchCount;
}

public void incrementDataCount() {
this.dataCount++;
}

public void incrementBatchCount() {
this.batchCount++;
}

public long getCurrentBatchId() {
return currentBatchId;
}

public void setCurrentBatchId(long currentBatchId) {
this.currentBatchId = currentBatchId;
}

public String getCurrentChannelId() {
return currentChannelId;
}

public void setCurrentChannelId(String currentChannelId) {
this.currentChannelId = currentChannelId;
}

public Thread getThread() {
return thread;
}

public void setThread(Thread thread) {
this.thread = thread;
}

public Date getStartTime() {
return startTime;
}

public Date getEndTime() {
return endTime;
}

public void setEndTime(Date endTime) {
this.endTime = endTime;
}

public void setCurrentTableName(String currentTableName) {
this.currentTableName = currentTableName;
}

public String getCurrentTableName() {
return currentTableName;
}

public Date getLastStatusChangeTime() {
return lastStatusChangeTime;
}

@Override
public String toString() {
return String.format("%s,status=%s,startTime=%s", key.toString(), status.toString(),
startTime.toString());
}

public int compareTo(ProcessInfo o) {
if (status == Status.ERROR && o.status == Status.DONE) {
return -1;
} else if (status == Status.DONE && o.status == Status.ERROR) {
return 1;
} else if ((status != Status.DONE && status != Status.ERROR)
&& (o.status == Status.DONE || o.status == Status.ERROR)) {
return -1;
} else if ((o.status != Status.DONE && o.status != Status.ERROR)
&& (status == Status.DONE || status == Status.ERROR)) {
return 1;
} else {
return startTime.compareTo(o.startTime);
}
}
}
@@ -0,0 +1,88 @@
/*
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU Lesser General Public License (the
* "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.model;

import java.util.Map;

import org.jumpmind.db.model.Table;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.util.Statistics;

public class ProcessInfoDataWriter implements IDataWriter {

private IDataWriter targetWriter;

private ProcessInfo processInfo;

public ProcessInfoDataWriter(IDataWriter targetWriter, ProcessInfo processInfo) {
this.targetWriter = targetWriter;
this.processInfo = processInfo;
}

public void open(DataContext context) {
targetWriter.open(context);
processInfo.setDataCount(0);
processInfo.setBatchCount(0);
}

public void close() {
targetWriter.close();
}

public Map<Batch, Statistics> getStatistics() {
return targetWriter.getStatistics();
}

public void start(Batch batch) {
if (batch != null) {
processInfo.setCurrentBatchId(batch.getBatchId());
processInfo.setCurrentChannelId(batch.getChannelId());
processInfo.incrementBatchCount();
}
targetWriter.start(batch);
}

public boolean start(Table table) {
if (table != null) {
processInfo.setCurrentTableName(table.getFullyQualifiedTableName());
}
return targetWriter.start(table);
}

public void write(CsvData data) {
if (data != null) {
processInfo.incrementDataCount();
}
targetWriter.write(data);
}

public void end(Table table) {
targetWriter.end(table);
}

public void end(Batch batch, boolean inError) {
targetWriter.end(batch, inError);
}

}
@@ -0,0 +1,96 @@
/*
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU Lesser General Public License (the
* "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.model;

import java.io.Serializable;

public class ProcessInfoKey implements Serializable {

private static final long serialVersionUID = 1L;

public enum ProcessType {
MANUAL_LOAD, PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, ROUTER_JOB, ROUTER_READER, OUTGOING_PURGE_JOB, INCOMING_PURGE_JOB, TEST
};

private String sourceNodeId;

private String targetNodeId;

private ProcessType processType;

public ProcessInfoKey(String sourceNodeId, String targetNodeId, ProcessType processType) {
this.sourceNodeId = sourceNodeId;
this.targetNodeId = targetNodeId;
this.processType = processType;
}

public String getSourceNodeId() {
return sourceNodeId;
}

public String getTargetNodeId() {
return targetNodeId;
}

public ProcessType getProcessType() {
return processType;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((processType == null) ? 0 : processType.hashCode());
result = prime * result + ((sourceNodeId == null) ? 0 : sourceNodeId.hashCode());
result = prime * result + ((targetNodeId == null) ? 0 : targetNodeId.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ProcessInfoKey other = (ProcessInfoKey) obj;
if (processType != other.processType)
return false;
if (sourceNodeId == null) {
if (other.sourceNodeId != null)
return false;
} else if (!sourceNodeId.equals(other.sourceNodeId))
return false;
if (targetNodeId == null) {
if (other.targetNodeId != null)
return false;
} else if (!targetNodeId.equals(other.targetNodeId))
return false;
return true;
}

@Override
public String toString() {
return String.format("processType=%s,sourceNodeId=%s,targetNodeId=%s",processType.toString(), sourceNodeId, targetNodeId);
}

}

0 comments on commit b580c93

Please sign in to comment.