diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedDatabaseWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedDatabaseWriterFilter.java index 4c7636e59d..a206092d13 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedDatabaseWriterFilter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedDatabaseWriterFilter.java @@ -90,10 +90,13 @@ public class ConfigurationChangedDatabaseWriterFilter extends DatabaseWriterFilt final String CTX_KEY_REINITIALIZED = "Reinitialized." + ConfigurationChangedDatabaseWriterFilter.class.getSimpleName() + hashCode(); - + final String CTX_KEY_FILE_SYNC_ENABLED = "FileSyncEnabled." + ConfigurationChangedDatabaseWriterFilter.class.getSimpleName() + hashCode(); + final String CTX_KEY_INITIAL_LOAD_COMPLETED = "InitialLoadCompleted." + + ConfigurationChangedDatabaseWriterFilter.class.getSimpleName() + hashCode(); + private ISymmetricEngine engine; public ConfigurationChangedDatabaseWriterFilter(ISymmetricEngine engine) { @@ -113,8 +116,10 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) { engine.start(); context.put(CTX_KEY_REINITIALIZED, Boolean.TRUE); } - } - + } + + checkReloadStarted(table, data); + return true; } @@ -132,7 +137,33 @@ public void afterWrite(DataContext context, Table table, CsvData data) { recordNodeFlushNeeded(context, table, data); recordFileSyncEnabled(context, table, data); } - + + private void checkReloadStarted(Table table, CsvData data) { + if (data.getDataEventType() == DataEventType.INSERT || data.getDataEventType() == DataEventType.UPDATE) { + if (matchesTable(table, TableConstants.SYM_NODE_SECURITY)) { + + Map newData = data.toColumnNameValuePairs(table.getColumnNames(), CsvData.ROW_DATA); + String initialLoadEnabled = newData.get("INITIAL_LOAD_ENABLED"); + String nodeId = newData.get("NODE_ID"); + + INodeService nodeService = engine.getNodeService(); + if (nodeId.equals(nodeService.findIdentityNodeId()) || nodeService.findIdentityNodeId() == null) { + boolean duringInitialLoad = nodeService.findIdentityNodeId() != null + && nodeService.findNodeSecurity(nodeService.findIdentityNodeId(), true).isInitialLoadEnabled(); + if (!duringInitialLoad && "1".equals(initialLoadEnabled)) { + + log.info("Reload started"); + + List listeners = engine.getExtensionService().getExtensionPointList(IClientReloadListener.class); + for (IClientReloadListener listener : listeners) { + listener.reloadStarted(); + } + } + } + } + } + } + private void recordGroupletFlushNeeded(DataContext context, Table table) { if (isGroupletFlushNeeded(table)) { context.put(CTX_KEY_FLUSH_GROUPLETS_NEEDED, true); @@ -160,20 +191,25 @@ private void recordSyncNeeded(DataContext context, Table table, CsvData data) { tables.add(table); } - if (data.getDataEventType() == DataEventType.UPDATE - && !engine.getParameterService().is(ParameterConstants.TRIGGER_CREATE_BEFORE_INITIAL_LOAD)) { + if (data.getDataEventType() == DataEventType.UPDATE) { if (matchesTable(table, TableConstants.SYM_NODE_SECURITY)) { Map newData = data.toColumnNameValuePairs(table.getColumnNames(), CsvData.ROW_DATA); String initialLoadEnabled = newData.get("INITIAL_LOAD_ENABLED"); String initialLoadTime = newData.get("INITIAL_LOAD_TIME"); + + INodeService nodeService = engine.getNodeService(); + boolean duringInitialLoad = nodeService.findIdentityNodeId() != null + && nodeService.findNodeSecurity(nodeService.findIdentityNodeId(), true).isInitialLoadEnabled(); String nodeId = newData.get("NODE_ID"); - if (nodeId != null && nodeId.equals(context.getBatch().getTargetNodeId()) && - StringUtils.isNotBlank(initialLoadTime) && "0".equals(initialLoadEnabled)) { - log.info( - "Requesting syncTriggers because {} is false and sym_node_security changed to indicate that an initial load has completed", - ParameterConstants.TRIGGER_CREATE_BEFORE_INITIAL_LOAD); - context.put(CTX_KEY_RESYNC_NEEDED, true); - engine.getRegistrationService().setAllowClientRegistration(false); + if (nodeId != null && nodeId.equals(context.getBatch().getTargetNodeId()) && duringInitialLoad && StringUtils.isNotBlank(initialLoadTime) + && "0".equals(initialLoadEnabled)) { + if (!engine.getParameterService().is(ParameterConstants.TRIGGER_CREATE_BEFORE_INITIAL_LOAD)) { + log.info("Requesting syncTriggers because {} is false and sym_node_security changed to indicate that an initial load has completed", + ParameterConstants.TRIGGER_CREATE_BEFORE_INITIAL_LOAD); + context.put(CTX_KEY_RESYNC_NEEDED, true); + engine.getRegistrationService().setAllowClientRegistration(false); + } + context.put(CTX_KEY_INITIAL_LOAD_COMPLETED, true); } } } @@ -359,6 +395,18 @@ public void syncEnded(DataContext context, List batchesProcessed, context.remove(CTX_KEY_FILE_SYNC_ENABLED); } + + if (context.get(CTX_KEY_INITIAL_LOAD_COMPLETED) != null) { + + log.info("Reload completed"); + + List listeners = engine.getExtensionService().getExtensionPointList(IClientReloadListener.class); + for (IClientReloadListener listener : listeners) { + listener.reloadCompleted(); + } + + context.remove(CTX_KEY_INITIAL_LOAD_COMPLETED); + } } @Override diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/IClientReloadListener.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/IClientReloadListener.java new file mode 100644 index 0000000000..f80e1f9113 --- /dev/null +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/IClientReloadListener.java @@ -0,0 +1,11 @@ +package org.jumpmind.symmetric.load; + +import org.jumpmind.extension.IExtensionPoint; + +public interface IClientReloadListener extends IExtensionPoint { + + void reloadStarted(); + + void reloadCompleted(); + +}