Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/3.11' into 3.12
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed May 22, 2020
2 parents 52d93e6 + 21ce112 commit adef033
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 42 deletions.
Expand Up @@ -32,6 +32,7 @@
import org.jumpmind.symmetric.ext.IHeartbeatListener;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.load.IReloadGenerator;
import org.jumpmind.symmetric.model.AbstractBatch.Status;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.DataEvent;
Expand Down Expand Up @@ -103,8 +104,11 @@ public String reloadTableImmediate(String nodeId, String catalogName, String sch

public String sendSQL(String nodeId, String sql);

public Map<Integer, ExtractRequest> insertReloadEvents(Node targetNode, boolean reverse, List<TableReloadRequest> reloadRequests, ProcessInfo processInfo,
List<TriggerHistory> activeHistories, List<TriggerRouter> triggerRouters, Map<Integer, ExtractRequest> extractRequests);
public Map<Integer, ExtractRequest> insertReloadEvents(
Node targetNode, boolean reverse, List<TableReloadRequest> reloadRequests,
ProcessInfo processInfo, List<TriggerRouter> triggerRouters,
Map<Integer, ExtractRequest> extractRequests,
IReloadGenerator reloadGenerator);

public boolean insertReloadEvent(TableReloadRequest request, boolean deleteAtClient);

Expand Down
Expand Up @@ -1028,13 +1028,13 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo extractInfo, Node targe

if (currentBatch.getStatus() == Status.IG) {
cleanupIgnoredBatch(sourceNode, targetNode, currentBatch, writer);
} else if (!isPreviouslyExtracted(currentBatch, false)) {
} else if (currentBatch.getStatus() == Status.RQ || !isPreviouslyExtracted(currentBatch, false)) {
BatchLock lock = null;
try {
log.debug("{} attempting to acquire lock for batch {}", targetNode.getNodeId(), currentBatch.getBatchId());
lock = acquireLock(currentBatch, useStagingDataWriter);
log.debug("{} acquired lock for batch {}", targetNode.getNodeId(), currentBatch.getBatchId());
if (!isPreviouslyExtracted(currentBatch, true)) {
if (currentBatch.getStatus() == Status.RQ || !isPreviouslyExtracted(currentBatch, true)) {
log.debug("{} extracting batch {}", targetNode.getNodeId(), currentBatch.getBatchId());
currentBatch.setExtractCount(currentBatch.getExtractCount() + 1);

Expand Down
Expand Up @@ -71,6 +71,7 @@
import org.jumpmind.symmetric.io.data.transform.TransformPoint;
import org.jumpmind.symmetric.job.OracleNoOrderHeartbeat;
import org.jumpmind.symmetric.job.PushHeartbeatListener;
import org.jumpmind.symmetric.load.IReloadGenerator;
import org.jumpmind.symmetric.load.IReloadListener;
import org.jumpmind.symmetric.load.IReloadVariableFilter;
import org.jumpmind.symmetric.model.AbstractBatch.Status;
Expand Down Expand Up @@ -813,13 +814,22 @@ private String getReloadChannelIdForTrigger(Trigger trigger, Map<String, Channel

@Override
public Map<Integer, ExtractRequest> insertReloadEvents(Node targetNode, boolean reverse, List<TableReloadRequest> reloadRequests, ProcessInfo processInfo,
List<TriggerHistory> activeHistories, List<TriggerRouter> triggerRouters, Map<Integer, ExtractRequest> extractRequests) {
List<TriggerRouter> triggerRouters, Map<Integer, ExtractRequest> extractRequests,
IReloadGenerator reloadGenerator)
{
if (engine.getClusterService().lock(ClusterConstants.SYNC_TRIGGERS)) {
try {
INodeService nodeService = engine.getNodeService();
ITriggerRouterService triggerRouterService = engine.getTriggerRouterService();

synchronized (triggerRouterService) {

List<TriggerHistory> activeHistories = null;
if (reloadGenerator == null) {
activeHistories = triggerRouterService.getActiveTriggerHistories();
} else {
activeHistories = reloadGenerator.getActiveTriggerHistories(targetNode);
}

boolean isFullLoad = reloadRequests == null
|| (reloadRequests.size() == 1 && reloadRequests.get(0).isFullLoadRequest());
Expand Down
Expand Up @@ -35,7 +35,6 @@
import org.jumpmind.symmetric.model.ExtractRequest;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.model.NodeGroupLinkAction;
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfoKey;
Expand Down Expand Up @@ -236,12 +235,8 @@ protected void processTableRequestLoads(Node source, ProcessInfo processInfo) {
Map<String, List<TableReloadRequest>> requestsSplitByLoad = new HashMap<String, List<TableReloadRequest>>();
Map<String, List<TriggerRouter>> triggerRoutersByNodeGroup = new HashMap<String, List<TriggerRouter>>();
Map<Integer, ExtractRequest> extractRequests = null;
List<TriggerHistory> activeHistories = null;

IReloadGenerator reloadGenerator = extensionService.getExtensionPoint(IReloadGenerator.class);
if (reloadGenerator == null) {
activeHistories = engine.getTriggerRouterService().getActiveTriggerHistories();
}

for (TableReloadRequest load : loadsToProcess) {
Node targetNode = engine.getNodeService().findNode(load.getTargetNodeId(), true);
Expand All @@ -251,12 +246,9 @@ protected void processTableRequestLoads(Node source, ProcessInfo processInfo) {
fullLoad.add(load);
List<TriggerRouter> triggerRouters = getTriggerRoutersForNodeGroup(triggerRoutersByNodeGroup, targetNode.getNodeGroupId());

if (reloadGenerator != null) {
activeHistories = reloadGenerator.getActiveTriggerHistories(targetNode);
}

extractRequests = engine.getDataService().insertReloadEvents(targetNode, false, fullLoad, processInfo, activeHistories,
triggerRouters, extractRequests);
extractRequests = engine.getDataService().insertReloadEvents(targetNode, false, fullLoad, processInfo,
triggerRouters, extractRequests, reloadGenerator);

loadCountToProcess--;
if (++activeLoadCount >= maxLoadCount) {
Expand Down Expand Up @@ -302,12 +294,9 @@ protected void processTableRequestLoads(Node source, ProcessInfo processInfo) {
targetNode.getNodeGroupId());
triggerRoutersByTargetNodeGroupId.put(targetNode.getNodeGroupId(), triggerRouters);
}
if (reloadGenerator != null) {
activeHistories = reloadGenerator.getActiveTriggerHistories(targetNode);
}

extractRequests = engine.getDataService().insertReloadEvents(targetNode, false, entry.getValue(), processInfo, activeHistories,
triggerRouters, extractRequests);
extractRequests = engine.getDataService().insertReloadEvents(targetNode, false, entry.getValue(), processInfo,
triggerRouters, extractRequests, reloadGenerator);

loadCountToProcess--;
if (++activeLoadCount >= maxLoadCount) {
Expand Down Expand Up @@ -353,24 +342,6 @@ protected List<NodeSecurity> findNodesThatAreReadyForInitialLoad() {
return toReturn;
}

protected void sendReverseInitialLoad(ProcessInfo processInfo, List<TriggerHistory> activeHistories,
Map<String, List<TriggerRouter>> triggerRoutersByNodeGroup) {
INodeService nodeService = engine.getNodeService();
boolean queuedLoad = false;
List<Node> nodes = new ArrayList<Node>();
nodes.addAll(nodeService.findTargetNodesFor(NodeGroupLinkAction.P));
nodes.addAll(nodeService.findTargetNodesFor(NodeGroupLinkAction.W));
for (Node node : nodes) {
List<TriggerRouter> triggerRouters = getTriggerRoutersForNodeGroup(triggerRoutersByNodeGroup, node.getNodeGroupId());
engine.getDataService().insertReloadEvents(node, true, null, processInfo, activeHistories, triggerRouters, null);
queuedLoad = true;
}

if (!queuedLoad) {
log.info("{} was enabled but no nodes were linked to load", ParameterConstants.AUTO_RELOAD_REVERSE_ENABLED);
}
}

protected boolean isValidLoadTarget(String targetNodeId) {
boolean result = false;
NodeSecurity targetNodeSecurity = engine.getNodeService().findNodeSecurity(targetNodeId);
Expand Down
Expand Up @@ -45,7 +45,6 @@
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.JdbcSqlTemplate;
import org.jumpmind.db.sql.Row;
import org.jumpmind.db.sql.SqlTemplateSettings;

/*
* Reads a database model from a PostgreSql database.
Expand Down Expand Up @@ -123,7 +122,10 @@ protected Integer mapUnknownJdbcTypeForColumn(Map<String, Object> values) {
return Types.BLOB;
} else if (type != null && (type == Types.STRUCT || type == Types.OTHER)) {
return Types.LONGVARCHAR;
} else {
} else if (typeName != null && typeName.equalsIgnoreCase("BIT")) {
return Types.VARCHAR;
}
else {
return super.mapUnknownJdbcTypeForColumn(values);
}
}
Expand Down
Expand Up @@ -75,9 +75,11 @@ public <T> T getObjectFromResultSet(ResultSet rs, Class<T> clazz) throws SQLExce
String s = rs.getString(1);
Date d = null;

d = FormatUtils.parseDate(s,FormatUtils.TIMESTAMP_PATTERNS);
if (s != null) {
d = FormatUtils.parseDate(s, FormatUtils.TIMESTAMP_PATTERNS);
}

if (Timestamp.class.isAssignableFrom(clazz)) {
if (d != null && Timestamp.class.isAssignableFrom(clazz)) {
return (T) new Timestamp(d.getTime());
} else {
return (T) d;
Expand Down

0 comments on commit adef033

Please sign in to comment.