diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractTriggerTemplate.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractTriggerTemplate.java index dde7e486ef..ee257226ab 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractTriggerTemplate.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractTriggerTemplate.java @@ -160,7 +160,7 @@ public String createInitalLoadSql(Node node, TriggerRouter triggerRouter, Table if (useTriggerTemplateForColumnTemplatesDuringInitialLoad(column)) { ColumnString columnString = fillOutColumnTemplate(tableAlias, tableAlias, "", table, column, DataEventType.INSERT, false, channel, - triggerRouter.getTrigger()); + triggerRouter.getTrigger(), true); columnExpression = columnString.columnString; if (isNotBlank(textColumnExpression) && TypeMap.isTextType(column.getMappedTypeCode())) { @@ -777,7 +777,7 @@ protected ColumnString buildColumnsString(String origTableAlias, String tableAli Column column = columns[i]; if (column != null) { ColumnString columnString = fillOutColumnTemplate(origTableAlias, tableAlias, - columnPrefix, table, column, dml, isOld, channel, trigger); + columnPrefix, table, column, dml, isOld, channel, trigger, false); columnsText = columnsText + "\n " + columnString.columnString + lastCommandToken; containsLob |= columnString.isBlobClob; @@ -792,7 +792,7 @@ protected ColumnString buildColumnsString(String origTableAlias, String tableAli protected ColumnString fillOutColumnTemplate(String origTableAlias, String tableAlias, String columnPrefix, Table table, Column column, DataEventType dml, boolean isOld, Channel channel, - Trigger trigger) { + Trigger trigger, boolean ignoreStreamLobs) { boolean isLob = symmetricDialect.getPlatform().isLob(column.getMappedTypeCode()); String templateToUse = null; if (column.getJdbcTypeName() != null @@ -925,7 +925,7 @@ protected ColumnString fillOutColumnTemplate(String origTableAlias, String table } if (dml == DataEventType.DELETE && isLob && requiresEmptyLobTemplateForDeletes()) { templateToUse = emptyColumnTemplate; - } else if (isLob && trigger.isUseStreamLobs()) { + } else if (isLob && trigger.isUseStreamLobs() && !ignoreStreamLobs) { templateToUse = emptyColumnTemplate; } if (templateToUse != null) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/AbstractBatch.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/AbstractBatch.java index f7e7ae5e8f..db1b20decc 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/AbstractBatch.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/AbstractBatch.java @@ -35,7 +35,7 @@ public class AbstractBatch implements Serializable { public enum Status { OK("Ok"), ER("Error"), RQ("Request"), NE("New"), QY("Querying"), SE("Sending"), LD("Loading"), RT("Routing"), IG("Ignored"), RS( - "Resend"), XX("Unknown"); + "Resend"), XX("Unknown"), LS("LoadSetup"); private String description; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ExtractRequest.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ExtractRequest.java index b4700b9be9..7a2fb42c76 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ExtractRequest.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ExtractRequest.java @@ -27,7 +27,7 @@ public class ExtractRequest implements Serializable { private static final long serialVersionUID = 1L; public enum ExtractStatus { - NE, OK + NE, OK, LS }; private long requestId; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationTypeEmail.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationTypeEmail.java index adafbfd72b..324c0b2f74 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationTypeEmail.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationTypeEmail.java @@ -149,7 +149,7 @@ protected static String getLogDetails(MonitorEvent event) throws IOException { protected static List deserializeOfflineNodes(MonitorEvent event) throws IOException { List nodes = null; if (event.getDetails() != null) { - new Gson().fromJson(event.getDetails(), new TypeToken>() { + nodes = new Gson().fromJson(event.getDetails(), new TypeToken>() { }.getType()); } if (nodes == null) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java index c06fdf515a..64da4e245b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java @@ -86,4 +86,7 @@ public ExtractRequest requestExtractRequest(ISqlTransaction transaction, String public int cancelExtractRequests(long loadId); public void releaseMissedExtractRequests(); + + public void updateExtractRequestStatuses(ISqlTransaction transaction, long loadId, String sourceNodeId, + String fromStatus, String toStatus); } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java index 922a21d65c..ab26454903 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java @@ -75,6 +75,15 @@ public OutgoingBatches getOutgoingBatchByLoadRangeAndTable(long loadId, long sta public void updateOutgoingBatchStatus(ISqlTransaction transaction, Status status, String nodeId, long startBatchId, long endBatchId); + public void updateOutgoingSetupBatchStatusByStatus(ISqlTransaction transaction, String targetNodeId, long loadId, + long maxBatchId, String fromStatus, String toStatus); + + public void updateOutgoingLoadBatchStatusByStatus(ISqlTransaction transaction, String targetNodeId, long loadId, + long startDataBatchId, long endDataBatchId, String fromStatus, String toStatus); + + public void updateOutgoingFinalizeBatchStatusByStatus(ISqlTransaction transaction, String targetNodeId, long loadId, + long minBatchId, String fromStatus, String toStatus); + public void updateCommonBatchExtractStatistics(OutgoingBatch batch); public void updateOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgoingBatch); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 3689e722e7..d8881aa0ad 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -1725,7 +1725,7 @@ public ExtractRequest requestExtractRequest(ISqlTransaction transaction, String requestId = sequenceService.nextVal(transaction, Constants.SEQUENCE_EXTRACT_REQ); } transaction.prepareAndExecute(getSql("insertExtractRequestSql"), - new Object[] { requestId, engine.getNodeId(), nodeId, queue, ExtractStatus.NE.name(), startBatchId, endBatchId, + new Object[] { requestId, engine.getNodeId(), nodeId, queue, ExtractStatus.LS.name(), startBatchId, endBatchId, triggerRouter.getTrigger().getTriggerId(), triggerRouter.getRouter().getRouterId(), loadId, table, rows, parentRequestId, new Date(), new Date() }, new int[] { Types.BIGINT, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.BIGINT, @@ -1735,7 +1735,7 @@ table, rows, parentRequestId, new Date(), new Date() }, request.setRequestId(requestId); request.setNodeId(nodeId); request.setQueue(queue); - request.setStatus(ExtractStatus.NE); + request.setStatus(ExtractStatus.LS); request.setStartBatchId(startBatchId); request.setEndBatchId(endBatchId); request.setRouterId(triggerRouter.getRouterId()); @@ -2063,6 +2063,13 @@ public void removeBatchFromStaging(OutgoingBatch batch) { } } + @Override + public void updateExtractRequestStatuses(ISqlTransaction transaction, long loadId, String sourceNodeId, + String fromStatus, String toStatus) { + transaction.prepareAndExecute(getSql("updateExtractRequestStatuses"), + toStatus, new Date(), loadId, sourceNodeId, fromStatus); + } + static class FutureExtractStatus { boolean shouldExtractSkip; int batchExtractCount; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorServiceSqlMap.java index ade0e579f9..f2a2faa5e3 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorServiceSqlMap.java @@ -80,6 +80,9 @@ public DataExtractorServiceSqlMap(IDatabasePlatform platform, putSql("selectIncompleteTablesForExtractByLoadIdAndNodeId", "select * from $(extract_request) where load_id = ? and loaded_time is null and node_id = ? order by request_id"); putSql("selectCompletedTablesForExtractByLoadIdAndNodeId", "select * from $(extract_request) where load_id = ? and loaded_time is not null and node_id = ? order by request_id"); + + putSql("updateExtractRequestStatuses", "update $(extract_request) set status=?, last_update_time=? " + + "where load_id=? and source_node_id=? and status=?"); } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index 6792138231..f5e832f902 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -75,6 +75,7 @@ import org.jumpmind.symmetric.load.IReloadListener; import org.jumpmind.symmetric.load.IReloadVariableFilter; import org.jumpmind.symmetric.model.AbstractBatch.Status; +import org.jumpmind.symmetric.model.ExtractRequest.ExtractStatus; import org.jumpmind.symmetric.model.Channel; import org.jumpmind.symmetric.model.Data; import org.jumpmind.symmetric.model.DataEvent; @@ -601,9 +602,9 @@ public void createTableReloadStatus(ISqlTransaction transaction, long loadId, bo Object[] argsDelete = new Object[] { loadId }; String sqlDelete = getSql("deleteTableReloadStatus"); int[] typesDelete = new int[] { Types.NUMERIC }; - Object[] args = new Object[] { loadId, targetNodeId, sourceNodeId, isFullLoad ? 1 : 0, now, now }; + Object[] args = new Object[] { loadId, targetNodeId, sourceNodeId, isFullLoad ? 1 : 0, now, now, -1, -1, -1 }; String sql = getSql("insertTableReloadStatus"); - int[] types = new int[] { symmetricDialect.getSqlTypeForIds(), Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP }; + int[] types = new int[] { symmetricDialect.getSqlTypeForIds(), Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC }; if (transaction == null) { try { transaction = sqlTemplate.startSqlTransaction(); @@ -1068,6 +1069,7 @@ public Map insertReloadEvents(Node targetNode, boolean } log.info("Table reload request(s) for load id " + loadId + " have been processed."); } + update_outgoing_batch_and_extract_request_for_processing(transaction, targetNode.getNodeId(), loadId); checkInterrupted(); transaction.commit(); } catch (Error ex) { @@ -1112,6 +1114,50 @@ public Map insertReloadEvents(Node targetNode, boolean return extractRequests; } + private void update_outgoing_batch_and_extract_request_for_processing( + ISqlTransaction transaction, String targetNodeId, long loadId) { + TableReloadStatus status = getTableReloadStatusByLoadId(loadId); + if (status != null) { + long startDataBatchId = status.getStartDataBatchId(); + long endDataBatchId = status.getEndDataBatchId(); + String reloadUpdateStatus = Status.NE.name(); + if (parameterService.is(ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB)) { + reloadUpdateStatus = Status.RQ.name(); + } + update_setup_batches(transaction, targetNodeId, loadId, Status.LS.name(), Status.NE.name(), startDataBatchId); + update_load_batches(transaction, targetNodeId, loadId, Status.LS.name(), reloadUpdateStatus, + startDataBatchId, endDataBatchId); + update_finalize_batches(transaction, targetNodeId, loadId, Status.LS.name(), Status.NE.name(), + endDataBatchId); + update_extract_requests(transaction, loadId, engine.getNodeId(), ExtractStatus.LS.name(), + ExtractStatus.NE.name()); + } + } + + private void update_setup_batches(ISqlTransaction transaction, String targetNodeId, long loadId, + String fromStatus, String toStatus, long maxBatchId) { + engine.getOutgoingBatchService().updateOutgoingSetupBatchStatusByStatus(transaction, targetNodeId, loadId, + maxBatchId, fromStatus, toStatus); + } + + private void update_load_batches(ISqlTransaction transaction, String targetNodeId, long loadId, + String fromStatus, String toStatus, long startDataBatchId, long endDataBatchId) { + engine.getOutgoingBatchService().updateOutgoingLoadBatchStatusByStatus(transaction, targetNodeId, loadId, + startDataBatchId, endDataBatchId, fromStatus, toStatus); + } + + private void update_finalize_batches(ISqlTransaction transaction, String targetNodeId, long loadId, + String fromStatus, String toStatus, long minBatchId) { + engine.getOutgoingBatchService().updateOutgoingFinalizeBatchStatusByStatus(transaction, targetNodeId, + loadId, minBatchId, fromStatus, toStatus); + } + + private void update_extract_requests(ISqlTransaction transaction, long loadId, String sourceNodeId, + String fromStatus, String toStatus) { + engine.getDataExtractorService().updateExtractRequestStatuses(transaction, loadId, sourceNodeId, + fromStatus, toStatus); + } + private long getBatchCountFor(Map extractRequests) { long batchCount = 0; for (ExtractRequest request : extractRequests.values()) { @@ -1226,19 +1272,22 @@ private int insertSqlEventsPriorToReload(Node targetNode, String nodeIdRecord, l insertNodeSecurityUpdate(transaction, nodeIdRecord, targetNode.getNodeId(), true, loadId, createBy, channelId); batchCount++; - long curBatchId = engine.getSequenceService().currVal(transaction, Constants.SEQUENCE_OUTGOING_BATCH); - /* - * Mark incoming batches as OK at the target node because we marked outgoing batches as OK at the source - */ - insertSqlEvent( - transaction, - targetNode, - String.format( - "update %s_incoming_batch set status='OK', error_flag=0 where node_id='%s' and status != 'OK' " - + "and batch_id < " + curBatchId, - tablePrefix, engine.getNodeService().findIdentityNodeId()), true, - loadId, createBy); - batchCount++; + TableReloadRequest t = reloadRequests.get(ParameterConstants.ALL + ParameterConstants.ALL); + if (t != null && StringUtils.isBlank(t.getReloadSelect())) { + long curBatchId = engine.getSequenceService().currVal(transaction, Constants.SEQUENCE_OUTGOING_BATCH); + /* + * Mark incoming batches as OK at the target node because we marked outgoing batches as OK at the source + */ + insertSqlEvent( + transaction, + targetNode, + String.format( + "update %s_incoming_batch set status='OK', error_flag=0 where node_id='%s' and status != 'OK' " + + "and batch_id < " + curBatchId, + tablePrefix, engine.getNodeService().findIdentityNodeId()), true, + loadId, createBy, Status.LS); + batchCount++; + } } if (isFullLoad) { String beforeSql = parameterService.getString(reverse ? ParameterConstants.INITIAL_LOAD_REVERSE_BEFORE_SQL @@ -1248,7 +1297,7 @@ private int insertSqlEventsPriorToReload(Node targetNode, String nodeIdRecord, l transaction, targetNode, beforeSql, true, - loadId, createBy); + loadId, createBy, Status.LS); batchCount++; } } @@ -1294,7 +1343,7 @@ private int insertCreateBatchesForReload(Node targetNode, long loadId, String cr && engine.getGroupletService().isTargetEnabled(triggerRouter, targetNode)) { insertCreateEvent(transaction, targetNode, triggerHistory, triggerRouter.getTrigger().getReloadChannelId(), true, - loadId, createBy, false, false, false); + loadId, createBy, false, false, false, Status.LS); createEventsSent++; if (!transactional) { transaction.commit(); @@ -1317,7 +1366,7 @@ private int insertCreateBatchesForReload(Node targetNode, long loadId, String cr && engine.getGroupletService().isTargetEnabled(triggerRouter, targetNode)) { insertCreateEvent(transaction, targetNode, triggerHistory, triggerRouter.getRouter().getRouterId(), true, - loadId, createBy, false, false, false); + loadId, createBy, false, false, false, Status.LS); createEventsSent++; if (!transactional) { transaction.commit(); @@ -1356,7 +1405,8 @@ private int insertDeleteBatchesForReload(Node targetNode, long loadId, String cr && engine.getGroupletService().isTargetEnabled(triggerRouter, targetNode)) { insertPurgeEvent(transaction, targetNode, triggerRouter, triggerHistory, - true, currentRequest.getBeforeCustomSql(), loadId, createBy); + true, currentRequest.getBeforeCustomSql(), loadId, createBy, + Status.LS); deleteEventsSent++; if (!transactional) { transaction.commit(); @@ -1386,7 +1436,7 @@ private int insertDeleteBatchesForReload(Node targetNode, long loadId, String cr .getString(ParameterConstants.INITIAL_LOAD_DELETE_FIRST_SQL)) || !StringUtils .isEmpty(triggerRouter.getInitialLoadDeleteStmt()))) { insertPurgeEvent(transaction, targetNode, triggerRouter, triggerHistory, - true, null, loadId, createBy); + true, null, loadId, createBy, Status.LS); deleteEventsSent++; if (!transactional) { transaction.commit(); @@ -1429,7 +1479,7 @@ private int insertSQLBatchesForReload(Node targetNode, long loadId, String creat for (String sql : sqlStatements) { insertSqlEvent(transaction, triggerHistory, triggerRouter.getTrigger().getChannelId(), targetNode, sql, - true, loadId, createBy); + true, loadId, createBy, Status.LS); sqlEventsSent++; } if (!transactional) { @@ -1515,7 +1565,7 @@ private Map insertLoadBatchesForReload(Node targetNode, loadId, createBy, reloadChannel, rowCount, channel.getMaxBatchSize(), numberOfBatches); } else { startBatchId = insertReloadEvent(transaction, targetNode, triggerRouter, triggerHistory, - selectSql, true, loadId, createBy, Status.NE, null, -1); + selectSql, true, loadId, createBy, Status.LS, null, -1); } long endBatchId = startBatchId + numberOfBatches - 1; firstBatchId = firstBatchId == 0 ? startBatchId : firstBatchId; @@ -1684,6 +1734,13 @@ private TriggerHistory lookupTriggerHistory(Trigger trigger) { protected void insertPurgeEvent(ISqlTransaction transaction, Node targetNode, TriggerRouter triggerRouter, TriggerHistory triggerHistory, boolean isLoad, String overrideDeleteStatement, long loadId, String createBy) { + insertPurgeEvent(transaction, targetNode, triggerRouter, triggerHistory, isLoad, + overrideDeleteStatement, loadId, createBy, Status.NE); + } + + protected void insertPurgeEvent(ISqlTransaction transaction, Node targetNode, + TriggerRouter triggerRouter, TriggerHistory triggerHistory, boolean isLoad, + String overrideDeleteStatement, long loadId, String createBy, Status outgoingBatchStatus) { Node sourceNode = engine.getNodeService().findIdentity(); List transforms = this.engine.getTransformService().findTransformsFor( sourceNode.getNodeGroupId(), targetNode.getNodeGroupId(), triggerRouter.getTargetTable(triggerHistory)); @@ -1691,7 +1748,7 @@ protected void insertPurgeEvent(ISqlTransaction transaction, Node targetNode, List sqlStatements = resolveTargetTables(overrideDeleteStatement, triggerRouter, triggerHistory, targetNode); for (String sql : sqlStatements) { createPurgeEvent(transaction, sql, targetNode, sourceNode, - triggerRouter, triggerHistory, isLoad, loadId, createBy); + triggerRouter, triggerHistory, isLoad, loadId, createBy, outgoingBatchStatus); } } else if (transforms != null && transforms.size() > 0) { List sqlStatements = symmetricDialect.createPurgeSqlForMultipleTables(targetNode, triggerRouter, @@ -1700,13 +1757,13 @@ protected void insertPurgeEvent(ISqlTransaction transaction, Node targetNode, createPurgeEvent(transaction, sql, targetNode, sourceNode, - triggerRouter, triggerHistory, isLoad, loadId, createBy); + triggerRouter, triggerHistory, isLoad, loadId, createBy, outgoingBatchStatus); } } else { createPurgeEvent(transaction, symmetricDialect.createPurgeSqlFor(targetNode, triggerRouter, triggerHistory, transforms), targetNode, sourceNode, - triggerRouter, triggerHistory, isLoad, loadId, createBy); + triggerRouter, triggerHistory, isLoad, loadId, createBy, outgoingBatchStatus); } } @@ -1745,6 +1802,13 @@ public List resolveTargetTables(String sql, TriggerRouter triggerRouter, protected void createPurgeEvent(ISqlTransaction transaction, String sql, Node targetNode, Node sourceNode, TriggerRouter triggerRouter, TriggerHistory triggerHistory, boolean isLoad, long loadId, String createBy) { + createPurgeEvent(transaction, sql, targetNode, sourceNode, triggerRouter, triggerHistory, + isLoad, loadId, createBy, Status.NE); + } + + protected void createPurgeEvent(ISqlTransaction transaction, String sql, Node targetNode, Node sourceNode, + TriggerRouter triggerRouter, TriggerHistory triggerHistory, boolean isLoad, + long loadId, String createBy, Status outgoingBatchStatus) { sql = FormatUtils.replace("groupId", targetNode.getNodeGroupId(), sql); sql = FormatUtils.replace("externalId", targetNode.getExternalId(), sql); sql = FormatUtils.replace("nodeId", targetNode.getNodeId(), sql); @@ -1766,7 +1830,7 @@ protected void createPurgeEvent(ISqlTransaction transaction, String sql, Node ta data.setNodeList(targetNode.getNodeId()); if (isLoad) { insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNode.getNodeId(), - isLoad, loadId, createBy, Status.NE, null, -1); + isLoad, loadId, createBy, outgoingBatchStatus, null, -1); } else { insertData(transaction, data); } @@ -1794,15 +1858,27 @@ public void insertSqlEvent(Node targetNode, String sql, boolean isLoad, long loa public void insertSqlEvent(ISqlTransaction transaction, Node targetNode, String sql, boolean isLoad, long loadId, String createBy) { + insertSqlEvent(transaction, targetNode, sql, isLoad, loadId, createBy, Status.NE); + } + + public void insertSqlEvent(ISqlTransaction transaction, Node targetNode, String sql, + boolean isLoad, long loadId, String createBy, Status outgoingBatchStatus) { TriggerHistory history = engine.getTriggerRouterService() .findTriggerHistoryForGenericSync(); insertSqlEvent(transaction, history, Constants.CHANNEL_CONFIG, targetNode, sql, isLoad, - loadId, createBy); + loadId, createBy, outgoingBatchStatus); } public void insertSqlEvent(ISqlTransaction transaction, TriggerHistory history, String channelId, Node targetNode, String sql, boolean isLoad, long loadId, String createBy) { + insertSqlEvent(transaction, history, channelId, targetNode, sql, isLoad, loadId, + createBy, Status.NE); + } + + public void insertSqlEvent(ISqlTransaction transaction, TriggerHistory history, + String channelId, Node targetNode, String sql, boolean isLoad, long loadId, + String createBy, Status outgoingBatchStatus) { Trigger trigger = engine.getTriggerRouterService().getTriggerById(history.getTriggerId(), false); String reloadChannelId = getReloadChannelIdForTrigger(trigger, engine @@ -1813,7 +1889,7 @@ public void insertSqlEvent(ISqlTransaction transaction, TriggerHistory history, data.setNodeList(targetNode.getNodeId()); if (isLoad) { insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNode.getNodeId(), - isLoad, loadId, createBy, Status.NE, null, -1); + isLoad, loadId, createBy, outgoingBatchStatus, null, -1); } else { insertData(transaction, data); } @@ -1938,6 +2014,13 @@ protected void insertCreateEvent(ISqlTransaction transaction, Node targetNode, public void insertCreateEvent(ISqlTransaction transaction, Node targetNode, TriggerHistory triggerHistory, String channelId, boolean isLoad, long loadId, String createBy, boolean excludeIndices, boolean excludeForeignKeys, boolean excludeDefaults) { + insertCreateEvent(transaction, targetNode, triggerHistory, channelId, isLoad, loadId, + createBy, excludeIndices, excludeForeignKeys, excludeDefaults, Status.NE); + } + + public void insertCreateEvent(ISqlTransaction transaction, Node targetNode, + TriggerHistory triggerHistory, String channelId, boolean isLoad, long loadId, String createBy, + boolean excludeIndices, boolean excludeForeignKeys, boolean excludeDefaults, Status outgoingBatchStatus) { Data data = new Data(triggerHistory.getSourceTableName(), DataEventType.CREATE, null, null, triggerHistory, channelId, null, null); data.setNodeList(targetNode.getNodeId()); @@ -1945,7 +2028,7 @@ public void insertCreateEvent(ISqlTransaction transaction, Node targetNode, try { if (isLoad) { insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNode.getNodeId(), - isLoad, loadId, createBy, Status.NE, null, -1); + isLoad, loadId, createBy, outgoingBatchStatus, null, -1); } else { insertData(transaction, data); } @@ -2180,7 +2263,7 @@ private void insertNodeSecurityUpdate(ISqlTransaction transaction, String nodeId " t.node_id = '" + nodeIdRecord + "'"); if (data != null) { insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNodeId, - isLoad, loadId, createBy, Status.NE, channelId, -1); + isLoad, loadId, createBy, Status.LS, channelId, -1); } else { throw new SymmetricException(String.format("Unable to issue an update for %s_node_security. " + " Check the %s_trigger_hist for %s_node_security.", tablePrefix, tablePrefix, tablePrefix)); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java index 22e7091a5e..2221dd2a73 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java @@ -120,32 +120,33 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map replace putSql("updateTableReloadStatusTableCount", "update $(table_reload_status) set table_count = ?, last_update_time = ? where load_id = ?"); putSql("updateTableReloadStatusDataCounts", "update $(table_reload_status) set " + " start_data_batch_id = ?, end_data_batch_id = ?, " - + " data_batch_count = data_batch_count + ?, " + + " data_batch_count = case when data_batch_count = -1 then 0 else data_batch_count end + ?, " + " rows_count = rows_count + ?, " + " last_update_time = ? " + " where load_id = ?"); putSql("updateTableReloadStatusDataCountsNoParamsInSelect", "update $(table_reload_status) set " + " start_data_batch_id = ?, end_data_batch_id = ?, " - + " data_batch_count = data_batch_count + $(batchCount), " + + " data_batch_count = case when data_batch_count = -1 then 0 else data_batch_count end + $(batchCount), " + " rows_count = rows_count + $(rowCount), " + " last_update_time = ? " + " where load_id = ?"); putSql("insertTableReloadStatus", - "insert into $(table_reload_status) (load_id, target_node_id, source_node_id, full_load, start_time, last_update_time) values (?, ?, ?, ?, ?, ?)"); + "insert into $(table_reload_status) (load_id, target_node_id, source_node_id, full_load, start_time, last_update_time, data_batch_count, setup_batch_count, finalize_batch_count) " + + "values (?, ?, ?, ?, ?, ?, ?, ?, ?)"); putSql("deleteTableReloadStatus", "delete from $(table_reload_status) where load_id = ?"); putSql("updateTableReloadStatusSetupCount", "update $(table_reload_status) set " + " setup_batch_count = ?, last_update_time = ? " + " where load_id = ?"); putSql("updateTableReloadStatusDataLoaded", "update $(table_reload_status) " + " set completed = case when (" - + " data_batch_count <= (case when ? between start_data_batch_id and end_data_batch_id then data_batch_loaded + ? else data_batch_loaded end) and " - + " setup_batch_count <= (case when ? < start_data_batch_id then setup_batch_loaded + ? else setup_batch_loaded end) and " - + " finalize_batch_count <= (case when ? > end_data_batch_id then finalize_batch_loaded + ? else finalize_batch_loaded end)) " + + " (data_batch_count > -1 and data_batch_count <= (case when ? between start_data_batch_id and end_data_batch_id then data_batch_loaded + ? else data_batch_loaded end)) and " + + " (setup_batch_count > -1 and setup_batch_count <= (case when ? < start_data_batch_id then setup_batch_loaded + ? else setup_batch_loaded end)) and " + + " (finalize_batch_count > -1 and finalize_batch_count <= (case when ? > end_data_batch_id then finalize_batch_loaded + ? else finalize_batch_loaded end))) " + " then 1 else 0 end, " + " end_time = case when (" - + " data_batch_count <= (case when ? between start_data_batch_id and end_data_batch_id then data_batch_loaded + ? else data_batch_loaded end) and " - + " setup_batch_count <= (case when ? < start_data_batch_id then setup_batch_loaded + ? else setup_batch_loaded end) and " - + " finalize_batch_loaded <= (case when ? > end_data_batch_id then finalize_batch_loaded + ? else finalize_batch_loaded end)) " + + " (data_batch_count > -1 and data_batch_count <= (case when ? between start_data_batch_id and end_data_batch_id then data_batch_loaded + ? else data_batch_loaded end)) and " + + " (setup_batch_count > -1 and setup_batch_count <= (case when ? < start_data_batch_id then setup_batch_loaded + ? else setup_batch_loaded end)) and " + + " (finalize_batch_count > -1 and finalize_batch_count <= (case when ? > end_data_batch_id then finalize_batch_loaded + ? else finalize_batch_loaded end))) " + " then ? else end_time end, " + " data_batch_loaded = case when ? between start_data_batch_id and end_data_batch_id then data_batch_loaded + ? else data_batch_loaded end, " + " setup_batch_loaded = case when ? < start_data_batch_id then setup_batch_loaded + ? else setup_batch_loaded end, " @@ -158,14 +159,14 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map replace + " where load_id = ? and completed = 0"); putSql("updateTableReloadStatusDataLoadedNoParams", "update $(table_reload_status) " + " set completed = case when (" - + " data_batch_count <= (case when $(batchId) between start_data_batch_id and end_data_batch_id then data_batch_loaded + $(batchCount) else data_batch_loaded end) and " - + " setup_batch_count <= (case when $(batchId) < start_data_batch_id then setup_batch_loaded + $(batchCount) else setup_batch_loaded end) and " - + " finalize_batch_count <= (case when $(batchId) > end_data_batch_id then finalize_batch_loaded + $(batchCount) else finalize_batch_loaded end)) " + + " (data_batch_count > -1 and data_batch_count <= (case when $(batchId) between start_data_batch_id and end_data_batch_id then data_batch_loaded + $(batchCount) else data_batch_loaded end)) and " + + " (setup_batch_count > -1 and setup_batch_count <= (case when $(batchId) < start_data_batch_id then setup_batch_loaded + $(batchCount) else setup_batch_loaded end)) and " + + " (finalize_batch_count > -1 and finalize_batch_count <= (case when $(batchId) > end_data_batch_id then finalize_batch_loaded + $(batchCount) else finalize_batch_loaded end))) " + " then 1 else 0 end, " + " end_time = case when (" - + " data_batch_count <= (case when $(batchId) between start_data_batch_id and end_data_batch_id then data_batch_loaded + $(batchCount) else data_batch_loaded end) and " - + " setup_batch_count <= (case when $(batchId) < start_data_batch_id then setup_batch_loaded + $(batchCount) else setup_batch_loaded end) and " - + " finalize_batch_loaded <= (case when $(batchId) > end_data_batch_id then finalize_batch_loaded + $(batchCount) else finalize_batch_loaded end)) " + + " (data_batch_count > -1 and data_batch_count <= (case when $(batchId) between start_data_batch_id and end_data_batch_id then data_batch_loaded + $(batchCount) else data_batch_loaded end)) and " + + " (setup_batch_count > -1 and setup_batch_count <= (case when $(batchId) < start_data_batch_id then setup_batch_loaded + $(batchCount) else setup_batch_loaded end)) and " + + " (finalize_batch_count > -1 and finalize_batch_loaded <= (case when $(batchId) > end_data_batch_id then finalize_batch_loaded + $(batchCount) else finalize_batch_loaded end))) " + " then current_timestamp else end_time end, " + " data_batch_loaded = case when $(batchId) between start_data_batch_id and end_data_batch_id then data_batch_loaded + $(batchCount) else data_batch_loaded end, " + " setup_batch_loaded = case when $(batchId) < start_data_batch_id then setup_batch_loaded + $(batchCount) else setup_batch_loaded end, " diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java index 022e479af1..6c223f10ec 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java @@ -191,7 +191,10 @@ protected void trackChanges(ProcessInfo processInfo, boolean useCrc) { if (fileTriggerRouter.isEnabled()) { try { FileTrigger fileTrigger = fileTriggerRouter.getFileTrigger(); - checkSourceDir(fileTriggerRouter); + boolean sourceDirReachable = checkSourceDir(fileTriggerRouter); + if (!sourceDirReachable) { + continue; + } boolean ignoreFiles = shouldIgnoreInitialFiles(fileTriggerRouter, fileTrigger, ctxDate); FileTriggerTracker tracker = new FileTriggerTracker(fileTriggerRouter, getDirectorySnapshot(fileTriggerRouter), processInfo, useCrc, engine); @@ -224,7 +227,10 @@ protected void trackChangesFastScan(ProcessInfo processInfo, boolean useCrc) { for (final FileTriggerRouter fileTriggerRouter : fileTriggerRouters) { if (fileTriggerRouter.isEnabled()) { FileTrigger fileTrigger = fileTriggerRouter.getFileTrigger(); - checkSourceDir(fileTriggerRouter); + boolean sourceDirReachable = checkSourceDir(fileTriggerRouter); + if (!sourceDirReachable) { + continue; + } boolean ignoreFiles = shouldIgnoreInitialFiles(fileTriggerRouter, fileTrigger, ctxDate); FileAlterationObserver observer = new FileAlterationObserver(fileTriggerRouter.getFileTrigger().getBaseDir(), fileTriggerRouter.getFileTrigger().createIOFileFilter()); @@ -248,12 +254,16 @@ public DirectorySnapshot getLastDirectorySnapshot(String relativeDir) { } } - protected void checkSourceDir(FileTriggerRouter fileTriggerRouter) { + protected boolean checkSourceDir(FileTriggerRouter fileTriggerRouter) { File sourceDir = new File(fileTriggerRouter.getFileTrigger().getBaseDir()); if (!sourceDir.exists()) { log.warn("Source directory does not exist: {}", sourceDir.getAbsolutePath()); + return false; } else if (!sourceDir.canRead()) { log.warn("Source directory is not readable by user {}: {}", System.getProperty("user.name"), sourceDir.getAbsolutePath()); + return false; + } else { + return true; } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index 0dbcf21e45..9bb09c1f19 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -36,7 +36,6 @@ import org.jumpmind.db.sql.ISqlTransaction; import org.jumpmind.db.sql.Row; import org.jumpmind.db.sql.mapper.LongMapper; -import org.jumpmind.db.sql.mapper.RowMapper; import org.jumpmind.db.sql.mapper.StringMapper; import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ParameterConstants; @@ -277,6 +276,42 @@ public void updateOutgoingBatchStatus(ISqlTransaction transaction, Status status symmetricDialect.getSqlTypeForIds(), symmetricDialect.getSqlTypeForIds() }); } + public void updateOutgoingSetupBatchStatusByStatus(ISqlTransaction transaction, String targetNodeId, long loadId, + long maxBatchId, String fromStatus, String toStatus) { + // update $(outgoing_batch) + // set status=?, last_update_time=?, last_update_hostname=? + // where node_id=? and load_id=? and status=? and batch_id < ? + transaction.prepareAndExecute(getSql("updateOutgoingSetupBatchStatusByStatus"), + new Object[] { toStatus, new Date(), clusterService.getServerId(), + targetNodeId, loadId, fromStatus, maxBatchId }, + new int[] { Types.CHAR, Types.TIMESTAMP, Types.VARCHAR, + Types.VARCHAR, Types.NUMERIC, Types.CHAR, Types.NUMERIC }); + } + + public void updateOutgoingLoadBatchStatusByStatus(ISqlTransaction transaction, String targetNodeId, long loadId, + long startDataBatchId, long endDataBatchId, String fromStatus, String toStatus) { + // update $(outgoing_batch) + // set status=?, last_update_time=?, last_update_hostname=? + // where node_id=? and load_id=? and status=? and batch_id between ? and ? + transaction.prepareAndExecute(getSql("updateOutgoingLoadBatchStatusByStatus"), + new Object[] { toStatus, new Date(), clusterService.getServerId(), + targetNodeId, loadId, fromStatus, startDataBatchId, endDataBatchId }, + new int[] { Types.CHAR, Types.TIMESTAMP, Types.VARCHAR, + Types.VARCHAR, Types.NUMERIC, Types.CHAR, Types.NUMERIC, Types.NUMERIC }); + } + + public void updateOutgoingFinalizeBatchStatusByStatus(ISqlTransaction transaction, String targetNodeId, long loadId, + long minBatchId, String fromStatus, String toStatus) { + // update $(outgoing_batch) + // set status=?, last_update_time=?, last_update_hostname=? + // where node_id=? and load_id=? and status=? and batch_id > ? + transaction.prepareAndExecute(getSql("updateOutgoingFinalizeBatchStatusByStatus"), + new Object[] { toStatus, new Date(), clusterService.getServerId(), + targetNodeId, loadId, fromStatus, minBatchId }, + new int[] { Types.CHAR, Types.TIMESTAMP, Types.VARCHAR, + Types.VARCHAR, Types.NUMERIC, Types.CHAR, Types.NUMERIC }); + } + public void insertOutgoingBatch(final OutgoingBatch outgoingBatch) { ISqlTransaction transaction = null; try { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java index 9cdd095dd0..6601dbb05c 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java @@ -169,5 +169,11 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, + " last_update_hostname, ?, create_time, 'copy' from $(outgoing_batch) where node_id=? and channel_id=? and batch_id > ?) "); putSql("getAllBatchesSql", "select batch_id from $(outgoing_batch)"); putSql("whereInProgressStatusSql", "where status in (?, ?, ?, ?, ?) "); + putSql("updateOutgoingSetupBatchStatusByStatus", + "update $(outgoing_batch) set status=?, last_update_time=?, last_update_hostname=? where node_id=? and load_id=? and status=? and batch_id < ?"); + putSql("updateOutgoingLoadBatchStatusByStatus", + "update $(outgoing_batch) set status=?, last_update_time=?, last_update_hostname=? where node_id=? and load_id=? and status=? and batch_id between ? and ?"); + putSql("updateOutgoingFinalizeBatchStatusByStatus", + "update $(outgoing_batch) set status=?, last_update_time=?, last_update_hostname=? where node_id=? and load_id=? and status=? and batch_id > ?"); } } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/SelfSignedX509TrustManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/SelfSignedX509TrustManager.java index d156f8925d..fb48dfddb8 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/SelfSignedX509TrustManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/SelfSignedX509TrustManager.java @@ -80,11 +80,22 @@ public void checkServerTrusted(X509Certificate[] certificates, String authType) log.debug("X509Certificate[" + i + "]=" + certificates[i]); } } - if ((certificates != null) && (certificates.length == 1)) { - certificates[0].checkValidity(); - } else { - standardTrustManager.checkServerTrusted(certificates, authType); + if (certificates != null) { + if (certificates.length == 1 && certificates[0] != null) { + certificates[0].checkValidity(); + return; + } else if (certificates.length > 1 && certificates[0] != null) { + boolean certificatesAreEqual = true; + for (int i = 1; i < certificates.length; i++) { + certificatesAreEqual &= certificates[0].equals(certificates[i]); + } + if (certificatesAreEqual) { + certificates[0].checkValidity(); + return; + } + } } + standardTrustManager.checkServerTrusted(certificates, authType); } /**