Skip to content

Commit

Permalink
0003546: Add an IClientReloadListener to add a hook on reloads on the…
Browse files Browse the repository at this point in the history
… client (#81)
  • Loading branch information
woehrl01 authored and erilong committed Oct 28, 2019
1 parent 70c947c commit bb07f58
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 12 deletions.
Expand Up @@ -90,10 +90,13 @@ public class ConfigurationChangedDatabaseWriterFilter extends DatabaseWriterFilt

final String CTX_KEY_REINITIALIZED = "Reinitialized."
+ ConfigurationChangedDatabaseWriterFilter.class.getSimpleName() + hashCode();

final String CTX_KEY_FILE_SYNC_ENABLED = "FileSyncEnabled."
+ ConfigurationChangedDatabaseWriterFilter.class.getSimpleName() + hashCode();

final String CTX_KEY_INITIAL_LOAD_COMPLETED = "InitialLoadCompleted."
+ ConfigurationChangedDatabaseWriterFilter.class.getSimpleName() + hashCode();

private ISymmetricEngine engine;

public ConfigurationChangedDatabaseWriterFilter(ISymmetricEngine engine) {
Expand All @@ -113,8 +116,10 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) {
engine.start();
context.put(CTX_KEY_REINITIALIZED, Boolean.TRUE);
}
}

}

checkReloadStarted(table, data);

return true;
}

Expand All @@ -132,7 +137,32 @@ public void afterWrite(DataContext context, Table table, CsvData data) {
recordNodeFlushNeeded(context, table, data);
recordFileSyncEnabled(context, table, data);
}


private void checkReloadStarted(Table table, CsvData data) {
if (data.getDataEventType() == DataEventType.INSERT || data.getDataEventType() == DataEventType.UPDATE) {
if (matchesTable(table, TableConstants.SYM_NODE_SECURITY)) {

Map<String, String> newData = data.toColumnNameValuePairs(table.getColumnNames(), CsvData.ROW_DATA);
String initialLoadEnabled = newData.get("INITIAL_LOAD_ENABLED");
String nodeId = newData.get("NODE_ID");

INodeService nodeService = engine.getNodeService();
if(nodeId.equals(nodeService.findIdentityNodeId()) || nodeService.findIdentityNodeId() == null){
boolean duringInitialLoad = nodeService.findIdentityNodeId() != null && nodeService.findNodeSecurity(nodeService.findIdentityNodeId(), true).isInitialLoadEnabled();
if (!duringInitialLoad && "1".equals(initialLoadEnabled)) {

log.info("Reload started");

List<IClientReloadListener> listeners = engine.getExtensionService().getExtensionPointList(IClientReloadListener.class);
for(IClientReloadListener listener : listeners){
listener.reloadStarted();
}
}
}
}
}
}

private void recordGroupletFlushNeeded(DataContext context, Table table) {
if (isGroupletFlushNeeded(table)) {
context.put(CTX_KEY_FLUSH_GROUPLETS_NEEDED, true);
Expand Down Expand Up @@ -160,20 +190,25 @@ private void recordSyncNeeded(DataContext context, Table table, CsvData data) {
tables.add(table);
}

if (data.getDataEventType() == DataEventType.UPDATE
&& !engine.getParameterService().is(ParameterConstants.TRIGGER_CREATE_BEFORE_INITIAL_LOAD)) {
if (data.getDataEventType() == DataEventType.UPDATE) {
if (matchesTable(table, TableConstants.SYM_NODE_SECURITY)) {
Map<String, String> newData = data.toColumnNameValuePairs(table.getColumnNames(), CsvData.ROW_DATA);
String initialLoadEnabled = newData.get("INITIAL_LOAD_ENABLED");
String initialLoadTime = newData.get("INITIAL_LOAD_TIME");

INodeService nodeService = engine.getNodeService();
boolean duringInitialLoad = nodeService.findNodeSecurity(nodeService.findIdentityNodeId(), true).isInitialLoadEnabled();
String nodeId = newData.get("NODE_ID");
if (nodeId != null && nodeId.equals(context.getBatch().getTargetNodeId()) &&
StringUtils.isNotBlank(initialLoadTime) && "0".equals(initialLoadEnabled)) {
log.info(
"Requesting syncTriggers because {} is false and sym_node_security changed to indicate that an initial load has completed",
ParameterConstants.TRIGGER_CREATE_BEFORE_INITIAL_LOAD);
context.put(CTX_KEY_RESYNC_NEEDED, true);
engine.getRegistrationService().setAllowClientRegistration(false);
duringInitialLoad && StringUtils.isNotBlank(initialLoadTime) && "0".equals(initialLoadEnabled)) {
if(!engine.getParameterService().is(ParameterConstants.TRIGGER_CREATE_BEFORE_INITIAL_LOAD)){
log.info(
"Requesting syncTriggers because {} is false and sym_node_security changed to indicate that an initial load has completed",
ParameterConstants.TRIGGER_CREATE_BEFORE_INITIAL_LOAD);
context.put(CTX_KEY_RESYNC_NEEDED, true);
engine.getRegistrationService().setAllowClientRegistration(false);
}
context.put(CTX_KEY_INITIAL_LOAD_COMPLETED, true);
}
}
}
Expand Down Expand Up @@ -359,6 +394,18 @@ public void syncEnded(DataContext context, List<IncomingBatch> batchesProcessed,

context.remove(CTX_KEY_FILE_SYNC_ENABLED);
}

if(context.get(CTX_KEY_INITIAL_LOAD_COMPLETED) != null){

log.info("Reload completed");

List<IClientReloadListener> listeners = engine.getExtensionService().getExtensionPointList(IClientReloadListener.class);
for(IClientReloadListener listener : listeners){
listener.reloadCompleted();
}

context.remove(CTX_KEY_INITIAL_LOAD_COMPLETED);
}
}

@Override
Expand Down
@@ -0,0 +1,11 @@
package org.jumpmind.symmetric.load;

import org.jumpmind.extension.IExtensionPoint;

public interface IClientReloadListener extends IExtensionPoint {

void reloadStarted();

void reloadCompleted();

}

0 comments on commit bb07f58

Please sign in to comment.