From bb07f5862cb92fcd527d97fe1f0ae0a6e3e46585 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lukas=20W=C3=B6hrl?= Date: Mon, 28 Oct 2019 16:02:08 +0100 Subject: [PATCH] 0003546: Add an IClientReloadListener to add a hook on reloads on the client (#81) --- ...figurationChangedDatabaseWriterFilter.java | 71 +++++++++++++++---- .../symmetric/load/IClientReloadListener.java | 11 +++ 2 files changed, 70 insertions(+), 12 deletions(-) create mode 100644 symmetric-core/src/main/java/org/jumpmind/symmetric/load/IClientReloadListener.java diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedDatabaseWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedDatabaseWriterFilter.java index 4c7636e59d..638e1f3bd5 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedDatabaseWriterFilter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedDatabaseWriterFilter.java @@ -90,10 +90,13 @@ public class ConfigurationChangedDatabaseWriterFilter extends DatabaseWriterFilt final String CTX_KEY_REINITIALIZED = "Reinitialized." + ConfigurationChangedDatabaseWriterFilter.class.getSimpleName() + hashCode(); - + final String CTX_KEY_FILE_SYNC_ENABLED = "FileSyncEnabled." + ConfigurationChangedDatabaseWriterFilter.class.getSimpleName() + hashCode(); + final String CTX_KEY_INITIAL_LOAD_COMPLETED = "InitialLoadCompleted." + + ConfigurationChangedDatabaseWriterFilter.class.getSimpleName() + hashCode(); + private ISymmetricEngine engine; public ConfigurationChangedDatabaseWriterFilter(ISymmetricEngine engine) { @@ -113,8 +116,10 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) { engine.start(); context.put(CTX_KEY_REINITIALIZED, Boolean.TRUE); } - } - + } + + checkReloadStarted(table, data); + return true; } @@ -132,7 +137,32 @@ public void afterWrite(DataContext context, Table table, CsvData data) { recordNodeFlushNeeded(context, table, data); recordFileSyncEnabled(context, table, data); } - + + private void checkReloadStarted(Table table, CsvData data) { + if (data.getDataEventType() == DataEventType.INSERT || data.getDataEventType() == DataEventType.UPDATE) { + if (matchesTable(table, TableConstants.SYM_NODE_SECURITY)) { + + Map newData = data.toColumnNameValuePairs(table.getColumnNames(), CsvData.ROW_DATA); + String initialLoadEnabled = newData.get("INITIAL_LOAD_ENABLED"); + String nodeId = newData.get("NODE_ID"); + + INodeService nodeService = engine.getNodeService(); + if(nodeId.equals(nodeService.findIdentityNodeId()) || nodeService.findIdentityNodeId() == null){ + boolean duringInitialLoad = nodeService.findIdentityNodeId() != null && nodeService.findNodeSecurity(nodeService.findIdentityNodeId(), true).isInitialLoadEnabled(); + if (!duringInitialLoad && "1".equals(initialLoadEnabled)) { + + log.info("Reload started"); + + List listeners = engine.getExtensionService().getExtensionPointList(IClientReloadListener.class); + for(IClientReloadListener listener : listeners){ + listener.reloadStarted(); + } + } + } + } + } + } + private void recordGroupletFlushNeeded(DataContext context, Table table) { if (isGroupletFlushNeeded(table)) { context.put(CTX_KEY_FLUSH_GROUPLETS_NEEDED, true); @@ -160,20 +190,25 @@ private void recordSyncNeeded(DataContext context, Table table, CsvData data) { tables.add(table); } - if (data.getDataEventType() == DataEventType.UPDATE - && !engine.getParameterService().is(ParameterConstants.TRIGGER_CREATE_BEFORE_INITIAL_LOAD)) { + if (data.getDataEventType() == DataEventType.UPDATE) { if (matchesTable(table, TableConstants.SYM_NODE_SECURITY)) { Map newData = data.toColumnNameValuePairs(table.getColumnNames(), CsvData.ROW_DATA); String initialLoadEnabled = newData.get("INITIAL_LOAD_ENABLED"); String initialLoadTime = newData.get("INITIAL_LOAD_TIME"); + + INodeService nodeService = engine.getNodeService(); + boolean duringInitialLoad = nodeService.findNodeSecurity(nodeService.findIdentityNodeId(), true).isInitialLoadEnabled(); String nodeId = newData.get("NODE_ID"); if (nodeId != null && nodeId.equals(context.getBatch().getTargetNodeId()) && - StringUtils.isNotBlank(initialLoadTime) && "0".equals(initialLoadEnabled)) { - log.info( - "Requesting syncTriggers because {} is false and sym_node_security changed to indicate that an initial load has completed", - ParameterConstants.TRIGGER_CREATE_BEFORE_INITIAL_LOAD); - context.put(CTX_KEY_RESYNC_NEEDED, true); - engine.getRegistrationService().setAllowClientRegistration(false); + duringInitialLoad && StringUtils.isNotBlank(initialLoadTime) && "0".equals(initialLoadEnabled)) { + if(!engine.getParameterService().is(ParameterConstants.TRIGGER_CREATE_BEFORE_INITIAL_LOAD)){ + log.info( + "Requesting syncTriggers because {} is false and sym_node_security changed to indicate that an initial load has completed", + ParameterConstants.TRIGGER_CREATE_BEFORE_INITIAL_LOAD); + context.put(CTX_KEY_RESYNC_NEEDED, true); + engine.getRegistrationService().setAllowClientRegistration(false); + } + context.put(CTX_KEY_INITIAL_LOAD_COMPLETED, true); } } } @@ -359,6 +394,18 @@ public void syncEnded(DataContext context, List batchesProcessed, context.remove(CTX_KEY_FILE_SYNC_ENABLED); } + + if(context.get(CTX_KEY_INITIAL_LOAD_COMPLETED) != null){ + + log.info("Reload completed"); + + List listeners = engine.getExtensionService().getExtensionPointList(IClientReloadListener.class); + for(IClientReloadListener listener : listeners){ + listener.reloadCompleted(); + } + + context.remove(CTX_KEY_INITIAL_LOAD_COMPLETED); + } } @Override diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/IClientReloadListener.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/IClientReloadListener.java new file mode 100644 index 0000000000..f80e1f9113 --- /dev/null +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/IClientReloadListener.java @@ -0,0 +1,11 @@ +package org.jumpmind.symmetric.load; + +import org.jumpmind.extension.IExtensionPoint; + +public interface IClientReloadListener extends IExtensionPoint { + + void reloadStarted(); + + void reloadCompleted(); + +}