Skip to content

Commit

Permalink
0002668: Use node security cache
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Jul 3, 2016
1 parent 2a08830 commit 38bd32a
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 135 deletions.
Expand Up @@ -451,7 +451,7 @@ protected void autoConfigRegistrationServer() {
nodeService.insertNodeIdentity(nodeId);
node = nodeService.findIdentity();
nodeService.insertNodeGroup(node.getNodeGroupId(), null);
NodeSecurity nodeSecurity = nodeService.findNodeSecurity(nodeId, true);
NodeSecurity nodeSecurity = nodeService.findOrCreateNodeSecurity(nodeId);
nodeSecurity.setInitialLoadTime(new Date());
nodeSecurity.setRegistrationTime(new Date());
nodeSecurity.setInitialLoadEnabled(false);
Expand Down
Expand Up @@ -78,6 +78,9 @@ public class ConfigurationChangedDatabaseWriterFilter extends DatabaseWriterFilt
final String CTX_KEY_FLUSH_CONFLICTS_NEEDED = "FlushConflicts."
+ ConfigurationChangedDatabaseWriterFilter.class.getSimpleName() + hashCode();

final String CTX_KEY_FLUSH_NODE_SECURITY_NEEDED = "FlushNodeSecurity."
+ ConfigurationChangedDatabaseWriterFilter.class.getSimpleName() + hashCode();

final String CTX_KEY_RESTART_JOBMANAGER_NEEDED = "RestartJobManager."
+ ConfigurationChangedDatabaseWriterFilter.class.getSimpleName() + hashCode();

Expand Down Expand Up @@ -118,6 +121,7 @@ public void afterWrite(DataContext context, Table table, CsvData data) {
recordParametersFlushNeeded(context, table);
recordJobManagerRestartNeeded(context, table, data);
recordConflictFlushNeeded(context, table);
recordNodeSecurityFlushNeeded(context, table);
}

private void recordGroupletFlushNeeded(DataContext context, Table table) {
Expand Down Expand Up @@ -192,6 +196,12 @@ private void recordTransformFlushNeeded(DataContext context, Table table) {
}
}

private void recordNodeSecurityFlushNeeded(DataContext context, Table table) {
if (matchesTable(table, TableConstants.SYM_NODE_SECURITY)) {
context.put(CTX_KEY_FLUSH_NODE_SECURITY_NEEDED, true);
}
}

private boolean isSyncTriggersNeeded(DataContext context, Table table) {
boolean autoSync = engine.getParameterService().is(ParameterConstants.AUTO_SYNC_TRIGGERS_AFTER_CONFIG_LOADED) ||
context.getBatch().getBatchId() == Constants.VIRTUAL_BATCH_FOR_REGISTRATION;
Expand Down Expand Up @@ -332,7 +342,13 @@ public void batchCommitted(DataContext context) {
parameterService.rereadParameters();
context.remove(CTX_KEY_FLUSH_PARAMETERS_NEEDED);
}


if (context.get(CTX_KEY_FLUSH_NODE_SECURITY_NEEDED) != null) {
log.info("About to refresh the cache of node security because new configuration came through the data loader");
nodeService.flushNodeAuthorizedCache();
context.remove(CTX_KEY_FLUSH_NODE_SECURITY_NEEDED);
}

if (context.get(CTX_KEY_RESYNC_TABLE_NEEDED) != null
&& parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) {
@SuppressWarnings("unchecked")
Expand Down
Expand Up @@ -73,8 +73,10 @@ public interface INodeService {

public NodeSecurity findNodeSecurity(String nodeId);

public NodeSecurity findNodeSecurity(String nodeId, boolean createIfNotFound);

public NodeSecurity findNodeSecurity(String nodeId, boolean useCache);

public NodeSecurity findOrCreateNodeSecurity(String nodeId);

public void deleteNodeHost(String nodeId);

public void deleteNodeSecurity(String nodeId);
Expand Down
Expand Up @@ -254,8 +254,9 @@ public void loadDataFromPull(Node remote, RemoteNodeStatus status) throws IOExce
local = new Node(this.parameterService, symmetricDialect);
}
try {
NodeSecurity localSecurity = nodeService.findNodeSecurity(local.getNodeId());
NodeSecurity localSecurity = nodeService.findNodeSecurity(local.getNodeId(), true);
IIncomingTransport transport = null;
boolean isRegisterTransport = false;
if (remote != null && localSecurity != null) {
Map<String, String> requestProperties = new HashMap<String, String>();
ChannelMap suspendIgnoreChannels = configurationService
Expand All @@ -274,6 +275,7 @@ public void loadDataFromPull(Node remote, RemoteNodeStatus status) throws IOExce
log.info("Using registration URL of {}", transport.getUrl());
remote = new Node();
remote.setSyncUrl(parameterService.getRegistrationUrl());
isRegisterTransport = true;
}

ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(remote
Expand All @@ -285,7 +287,7 @@ public void loadDataFromPull(Node remote, RemoteNodeStatus status) throws IOExce
status.updateIncomingStatus(list);
local = nodeService.findIdentity();
if (local != null) {
localSecurity = nodeService.findNodeSecurity(local.getNodeId());
localSecurity = nodeService.findNodeSecurity(local.getNodeId(), !isRegisterTransport);
if (StringUtils.isNotBlank(transport.getRedirectionUrl())) {
/*
* We were redirected for the pull, we need to
Expand Down
Expand Up @@ -629,7 +629,7 @@ public void loadFilesFromPush(String nodeId, InputStream in, OutputStream out) {
ProcessInfoKey.ProcessType.FILE_SYNC_PUSH_HANDLER));
try {
List<IncomingBatch> list = processZip(in, nodeId, processInfo);
NodeSecurity security = nodeService.findNodeSecurity(local.getNodeId());
NodeSecurity security = nodeService.findNodeSecurity(local.getNodeId(), true);
processInfo.setStatus(ProcessInfo.Status.ACKING);
engine.getTransportManager().writeAcknowledgement(out, sourceNode, list, local,
security != null ? security.getNodePassword() : null);
Expand All @@ -652,7 +652,7 @@ public void loadFilesFromPush(String nodeId, InputStream in, OutputStream out) {
public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status) {
Node identity = engine.getNodeService().findIdentity();
if (identity != null) {
NodeSecurity security = engine.getNodeService().findNodeSecurity(identity.getNodeId());
NodeSecurity security = engine.getNodeService().findNodeSecurity(identity.getNodeId(), true);
if (security != null) {
if (nodeCommunication.getCommunicationType() == CommunicationType.FILE_PULL) {
pullFilesFromNode(nodeCommunication, status, identity, security);
Expand Down

0 comments on commit 38bd32a

Please sign in to comment.