Skip to content

Commit

Permalink
0003253: ProcessInfo which is used to gather information about processes
Browse files Browse the repository at this point in the history
can be corrupted on push and pull because of threading in 3.8
  • Loading branch information
chenson42 committed Sep 20, 2017
1 parent 15d560b commit 437ee69
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 87 deletions.
Expand Up @@ -38,7 +38,7 @@ public class ChannelMap {

public static final String CHANNELS_IGNORE = "Ignored-Channels";

private String threadChannel;
private String channelQueue;

private Map<String, Set<String>> map;

Expand Down Expand Up @@ -92,12 +92,12 @@ public Set<String> getIgnoreChannels() {
return map.get(CHANNELS_IGNORE);
}

public String getThreadChannel() {
return threadChannel;
public String getChannelQueue() {
return channelQueue;
}

public void setThreadChannel(String threadChannel) {
this.threadChannel = threadChannel;
public void setChannelQueue(String threadChannel) {
this.channelQueue = threadChannel;
}


Expand Down
Expand Up @@ -28,7 +28,6 @@
import java.util.HashMap;
import java.util.Map;

import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;
import org.jumpmind.util.AppUtils;

public class ProcessInfo implements Serializable, Comparable<ProcessInfo>, Cloneable {
Expand Down Expand Up @@ -316,21 +315,18 @@ public String toString() {
public String showInError(String identityNodeId) {
if (status == ProcessStatus.ERROR) {
switch (key.getProcessType()) {
case MANUAL_LOAD:
return null;
case PUSH_JOB:
return key.getTargetNodeId();
case PULL_JOB:
return key.getSourceNodeId();
case PUSH_HANDLER:
return key.getSourceNodeId();
case PULL_HANDLER:
case PUSH_JOB_EXTRACT:
case PUSH_JOB_TRANSFER:
case PULL_HANDLER_EXTRACT:
case PULL_HANDLER_TRANSFER:
return key.getTargetNodeId();
case PULL_JOB_LOAD:
case PULL_JOB_TRANSFER:
case PUSH_HANDLER_LOAD:
case PUSH_HANDLER_TRANSFER:
case ROUTER_JOB:
return key.getSourceNodeId();
case ROUTER_READER:
return key.getSourceNodeId();
case GAP_DETECT:
case GAP_DETECT:
return key.getSourceNodeId();
default:
return null;
Expand All @@ -340,6 +336,7 @@ public String showInError(String identityNodeId) {
}
}

@Override
public int compareTo(ProcessInfo o) {
if (status == ProcessStatus.ERROR && o.status != ProcessStatus.ERROR) {
return -1;
Expand Down
Expand Up @@ -26,74 +26,21 @@ public class ProcessInfoKey implements Serializable {

private static final long serialVersionUID = 1L;

public enum ProcessType {
ANY, PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, REST_PULL_HANLDER, OFFLINE_PUSH, OFFLINE_PULL, ROUTER_JOB, INSERT_LOAD_EVENTS, GAP_DETECT, ROUTER_READER, MANUAL_LOAD, FILE_SYNC_PULL_JOB, FILE_SYNC_PUSH_JOB, FILE_SYNC_PULL_HANDLER, FILE_SYNC_PUSH_HANDLER, FILE_SYNC_TRACKER, INITIAL_LOAD_EXTRACT_JOB, FILE_SYNC_INITIAL_LOAD_EXTRACT_JOB, PULL_CONFIG_JOB;

public String toString() {
switch (this) {
case ANY:
return "<Any>";
case MANUAL_LOAD:
return "Manual Load";
case PUSH_JOB:
return "Database Push";
case PULL_JOB:
return "Database Pull";
case PULL_CONFIG_JOB:
return "Config Pull";
case PUSH_HANDLER:
return "Service Database Push";
case PULL_HANDLER:
return "Service Database Pull";
case OFFLINE_PUSH:
return "Offline Push";
case OFFLINE_PULL:
return "Offline Pull";
case ROUTER_JOB:
return "Routing";
case ROUTER_READER:
return "Routing Reader";
case GAP_DETECT:
return "Gap Detection";
case FILE_SYNC_PULL_JOB:
return "File Sync Pull";
case FILE_SYNC_PUSH_JOB:
return "File Sync Push";
case FILE_SYNC_PULL_HANDLER:
return "Service File Sync Pull";
case FILE_SYNC_PUSH_HANDLER:
return "Service File Sync Push";
case FILE_SYNC_TRACKER:
return "File Sync Tracker";
case REST_PULL_HANLDER:
return "REST Pull";
case INSERT_LOAD_EVENTS:
return "Inserting Load Events";
case INITIAL_LOAD_EXTRACT_JOB:
return "Initial Load Extractor";
case FILE_SYNC_INITIAL_LOAD_EXTRACT_JOB:
return "File Sync Initial Load Extractor";
default:
return name();
}
}
};

private String sourceNodeId;

private String targetNodeId;

private ProcessType processType;

private String channelId;

public ProcessInfoKey(String sourceNodeId, String targetNodeId, ProcessType processType) {
this.sourceNodeId = sourceNodeId;
this.targetNodeId = targetNodeId;
this.processType = processType;
this.channelId = null;
}

public ProcessInfoKey(String sourceNodeId, String channelId, String targetNodeId, ProcessType processType) {
this.sourceNodeId = sourceNodeId;
this.targetNodeId = targetNodeId;
Expand All @@ -114,9 +61,9 @@ public ProcessType getProcessType() {
}

public String getChannelId() {
return channelId;
return channelId;
}

@Override
public int hashCode() {
final int prime = 31;
Expand Down Expand Up @@ -159,8 +106,8 @@ public boolean equals(Object obj) {

@Override
public String toString() {
return String.format("processType=%s,sourceNodeId=%s,targetNodeId=%s,channelId=%s",
processType.toString(), sourceNodeId, targetNodeId, channelId);
return String.format("processType=%s,sourceNodeId=%s,targetNodeId=%s,channelId=%s", processType.toString(), sourceNodeId,
targetNodeId, channelId);
}

}
@@ -0,0 +1,64 @@
package org.jumpmind.symmetric.model;

public enum ProcessType {

ANY, PUSH_JOB_EXTRACT, PUSH_JOB_TRANSFER, PULL_JOB_TRANSFER, PULL_JOB_LOAD, PUSH_HANDLER_TRANSFER, PUSH_HANDLER_LOAD, PULL_HANDLER_TRANSFER, PULL_HANDLER_EXTRACT, REST_PULL_HANLDER, OFFLINE_PUSH, OFFLINE_PULL, ROUTER_JOB, INSERT_LOAD_EVENTS, GAP_DETECT, ROUTER_READER, MANUAL_LOAD, FILE_SYNC_PULL_JOB, FILE_SYNC_PUSH_JOB, FILE_SYNC_PULL_HANDLER, FILE_SYNC_PUSH_HANDLER, FILE_SYNC_TRACKER, INITIAL_LOAD_EXTRACT_JOB, FILE_SYNC_INITIAL_LOAD_EXTRACT_JOB, PULL_CONFIG_JOB;

@Override
public String toString() {
switch (this) {
case ANY:
return "<Any>";
case MANUAL_LOAD:
return "Manual Load";
case PUSH_JOB_EXTRACT:
return "Database Push Extract";
case PUSH_JOB_TRANSFER:
return "Database Push Transfer";
case PULL_JOB_TRANSFER:
return "Database Pull Transfer";
case PULL_JOB_LOAD:
return "Database Pull Load";
case PULL_CONFIG_JOB:
return "Config Pull";
case PUSH_HANDLER_TRANSFER:
return "Service Database Push Transfer";
case PULL_HANDLER_TRANSFER:
return "Service Database Pull Transfer";
case PUSH_HANDLER_LOAD:
return "Service Database Push Load";
case PULL_HANDLER_EXTRACT:
return "Service Database Pull Extract";
case OFFLINE_PUSH:
return "Offline Push";
case OFFLINE_PULL:
return "Offline Pull";
case ROUTER_JOB:
return "Routing";
case ROUTER_READER:
return "Routing Reader";
case GAP_DETECT:
return "Gap Detection";
case FILE_SYNC_PULL_JOB:
return "File Sync Pull";
case FILE_SYNC_PUSH_JOB:
return "File Sync Push";
case FILE_SYNC_PULL_HANDLER:
return "Service File Sync Pull";
case FILE_SYNC_PUSH_HANDLER:
return "Service File Sync Push";
case FILE_SYNC_TRACKER:
return "File Sync Tracker";
case REST_PULL_HANLDER:
return "REST Pull";
case INSERT_LOAD_EVENTS:
return "Inserting Load Events";
case INITIAL_LOAD_EXTRACT_JOB:
return "Initial Load Extractor";
case FILE_SYNC_INITIAL_LOAD_EXTRACT_JOB:
return "File Sync Initial Load Extractor";
default:
return name();
}
}
};
Expand Up @@ -270,7 +270,7 @@ public void loadDataFromPull(Node remote, RemoteNodeStatus status) throws IOExce
suspendIgnoreChannels.getSuspendChannelsAsString());
requestProperties.put(WebConstants.IGNORED_CHANNELS,
suspendIgnoreChannels.getIgnoreChannelsAsString());
requestProperties.put(WebConstants.THREAD_CHANNEL, status.getChannelId());
requestProperties.put(WebConstants.CHANNEL_QUEUE, status.getChannelId());
transport = transportManager.getPullTransport(remote, local,
localSecurity.getNodePassword(), requestProperties,
parameterService.getRegistrationUrl());
Expand Down
Expand Up @@ -201,7 +201,7 @@ private void pushToNode(Node remote, RemoteNodeStatus status) {
ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(identity
.getNodeId(), status.getChannelId(), remote.getNodeId(), ProcessType.PUSH_JOB));
Map<String, String> requestProperties = new HashMap<String, String>();
requestProperties.put(WebConstants.THREAD_CHANNEL, status.getChannelId());
requestProperties.put(WebConstants.CHANNEL_QUEUE, status.getChannelId());

try {
transport = transportManager.getPushTransport(remote, identity,
Expand Down
Expand Up @@ -198,7 +198,7 @@ private HttpURLConnection requestReservation(String queue) {
connection.setConnectTimeout(httpTimeout);
connection.setReadTimeout(httpTimeout);
connection.setRequestMethod("HEAD");
connection.setRequestProperty(WebConstants.THREAD_CHANNEL, queue);
connection.setRequestProperty(WebConstants.CHANNEL_QUEUE, queue);

analyzeResponseCode(connection.getResponseCode());
} catch (IOException ex) {
Expand Down
Expand Up @@ -133,7 +133,7 @@ public class WebConstants {

public static final String BATCH_TO_SEND_COUNT = "Batch-To-Send-Count";

public static final String THREAD_CHANNEL = "threadChannel";
public static final String CHANNEL_QUEUE = "threadChannel";

public static final String CONFIG_VERSION = "configVersion";

Expand Down
Expand Up @@ -63,7 +63,7 @@ public boolean before(HttpServletRequest req, HttpServletResponse resp) throws I
String nodeId = getNodeId(req);
String method = req.getMethod();

String threadChannel = req.getHeader(WebConstants.THREAD_CHANNEL);
String threadChannel = req.getHeader(WebConstants.CHANNEL_QUEUE);

if (method.equals(WebConstants.METHOD_HEAD) &&
ServletUtils.normalizeRequestUri(req).contains("push")) {
Expand Down Expand Up @@ -136,7 +136,7 @@ public void after(HttpServletRequest req, HttpServletResponse resp) throws IOExc
ServletException {
String poolId = req.getRequestURI();
String nodeId = getNodeId(req);
String threadChannel = req.getHeader(WebConstants.THREAD_CHANNEL);
String threadChannel = req.getHeader(WebConstants.CHANNEL_QUEUE);
concurrentConnectionManager.releaseConnection(nodeId, threadChannel, poolId);
}

Expand Down
Expand Up @@ -94,7 +94,7 @@ public void handleWithCompression(HttpServletRequest req, HttpServletResponse re
ChannelMap map = new ChannelMap();
map.addSuspendChannels(req.getHeader(WebConstants.SUSPENDED_CHANNELS));
map.addIgnoreChannels(req.getHeader(WebConstants.IGNORED_CHANNELS));
map.setThreadChannel(req.getHeader(WebConstants.THREAD_CHANNEL));
map.setChannelQueue(req.getHeader(WebConstants.CHANNEL_QUEUE));

// pull out headers and pass to pull() method
pull(nodeId, req.getRemoteHost(), req.getRemoteAddr(), res.getOutputStream(), req.getHeader(WebConstants.HEADER_ACCEPT_CHARSET), res, map);
Expand Down Expand Up @@ -123,12 +123,12 @@ public void pull(String nodeId, String remoteHost, String remoteAddress,
IOutgoingTransport outgoingTransport = createOutgoingTransport(outputStream, encoding,
map);
ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(
nodeService.findIdentityNodeId(), map.getThreadChannel(), nodeId, ProcessType.PULL_HANDLER));
nodeService.findIdentityNodeId(), map.getChannelQueue(), nodeId, ProcessType.PULL_HANDLER));

try {
Node targetNode = nodeService.findNode(nodeId, true);
List<OutgoingBatch> batchList = dataExtractorService.extract(processInfo, targetNode,
map.getThreadChannel(), outgoingTransport);
map.getChannelQueue(), outgoingTransport);
logDataReceivedFromPull(targetNode, batchList, processInfo, remoteHost);

if (processInfo.getStatus() != ProcessStatus.ERROR) {
Expand Down
Expand Up @@ -63,7 +63,7 @@ public void handle(HttpServletRequest req, HttpServletResponse res) throws IOExc
InputStream inputStream = createInputStream(req);
OutputStream outputStream = res.getOutputStream();

String threadChannel = req.getHeader(WebConstants.THREAD_CHANNEL);
String threadChannel = req.getHeader(WebConstants.CHANNEL_QUEUE);

push(nodeId, threadChannel, inputStream, outputStream);

Expand Down

0 comments on commit 437ee69

Please sign in to comment.