Skip to content

Commit

Permalink
Migrate RunConsoleCommandMessageTask and RunScriptMessageTask from Bl…
Browse files Browse the repository at this point in the history
…ockingMessageTask to hz:mc executor
  • Loading branch information
puzpuzpuz committed Nov 25, 2019
1 parent e3e562e commit 9445ff9
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 30 deletions.
Expand Up @@ -19,40 +19,30 @@
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.codec.MCRunConsoleCommandCodec;
import com.hazelcast.client.impl.protocol.codec.MCRunConsoleCommandCodec.RequestParameters;
import com.hazelcast.client.impl.protocol.task.AbstractCallableMessageTask;
import com.hazelcast.client.impl.protocol.task.BlockingMessageTask;
import com.hazelcast.client.impl.protocol.task.AbstractInvocationMessageTask;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.management.ConsoleCommandHandler;
import com.hazelcast.internal.management.ManagementCenterService;
import com.hazelcast.internal.management.operation.RunConsoleCommandOperation;
import com.hazelcast.internal.nio.Connection;
import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
import com.hazelcast.spi.impl.operationservice.Operation;

import java.security.Permission;

import static com.hazelcast.internal.util.StringUtil.isNullOrEmpty;

public class RunConsoleCommandMessageTask
extends AbstractCallableMessageTask<RequestParameters> implements BlockingMessageTask {
public class RunConsoleCommandMessageTask extends AbstractInvocationMessageTask<RequestParameters> {

public RunConsoleCommandMessageTask(ClientMessage clientMessage, Node node, Connection connection) {
super(clientMessage, node, connection);
}

@Override
protected Object call() throws Exception {
ManagementCenterService mcService = nodeEngine.getManagementCenterService();
ConsoleCommandHandler handler = mcService.getCommandHandler();
try {
final String ns = parameters.namespace;
String command = parameters.command;
if (!isNullOrEmpty(ns)) {
// set namespace as a part of the command
command = ns + "__" + command;
}
return handler.handleCommand(command);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw e;
}
protected InvocationBuilder getInvocationBuilder(Operation op) {
return nodeEngine.getOperationService().createInvocationBuilder(getServiceName(), op, nodeEngine.getThisAddress());
}

@Override
protected Operation prepareOperation() {
return new RunConsoleCommandOperation(parameters.command, parameters.namespace);
}

@Override
Expand Down
Expand Up @@ -20,18 +20,16 @@
import com.hazelcast.client.impl.protocol.codec.MCRunScriptCodec;
import com.hazelcast.client.impl.protocol.codec.MCRunScriptCodec.RequestParameters;
import com.hazelcast.client.impl.protocol.task.AbstractInvocationMessageTask;
import com.hazelcast.client.impl.protocol.task.BlockingMessageTask;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.management.ManagementCenterService;
import com.hazelcast.internal.management.operation.ScriptExecutorOperation;
import com.hazelcast.internal.management.operation.RunScriptOperation;
import com.hazelcast.internal.nio.Connection;
import com.hazelcast.spi.impl.operationservice.InvocationBuilder;
import com.hazelcast.spi.impl.operationservice.Operation;

import java.security.Permission;

public class RunScriptMessageTask
extends AbstractInvocationMessageTask<RequestParameters> implements BlockingMessageTask {
public class RunScriptMessageTask extends AbstractInvocationMessageTask<RequestParameters> {

public RunScriptMessageTask(ClientMessage clientMessage, Node node, Connection connection) {
super(clientMessage, node, connection);
Expand All @@ -44,7 +42,7 @@ protected InvocationBuilder getInvocationBuilder(Operation op) {

@Override
protected Operation prepareOperation() {
return new ScriptExecutorOperation(parameters.engine, parameters.script);
return new RunScriptOperation(parameters.engine, parameters.script);
}

@Override
Expand Down
Expand Up @@ -56,13 +56,15 @@
import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.internal.util.Clock;
import com.hazelcast.internal.util.ExceptionUtil;
import com.hazelcast.internal.util.executor.ExecutorType;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.impl.MapService;
import com.hazelcast.spi.exception.RetryableException;
import com.hazelcast.spi.impl.executionservice.ExecutionService;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.OperationService;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import com.hazelcast.spi.properties.ClusterProperty;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -109,6 +111,7 @@ public class ManagementCenterService {
private static final long SLEEP_BETWEEN_POLL_MILLIS = 1000;
private static final long DEFAULT_UPDATE_INTERVAL = 3000;
private static final long EVENT_SEND_INTERVAL_MILLIS = 1000;
private static final int EXECUTOR_QUEUE_CAPACITY_PER_THREAD = 1000;

private final HazelcastInstanceImpl instance;
private final TaskPollThread taskPollThread;
Expand Down Expand Up @@ -147,6 +150,7 @@ public ManagementCenterService(HazelcastInstanceImpl instance) {
this.eventSendThread = new EventSendThread();
this.timedMemberStateFactory = instance.node.getNodeExtension().createTimedMemberStateFactory(instance);
this.connectionFactory = instance.node.getNodeExtension().getManagementCenterConnectionFactory();
registerExecutor();

if (this.managementCenterConfig.isEnabled()) {
this.instance.getCluster().addMembershipListener(new ManagementCenterService.MemberListenerImpl());
Expand Down Expand Up @@ -217,6 +221,15 @@ public void shutdown() {
}
}

private void registerExecutor() {
final ExecutionService executionService = instance.node.nodeEngine.getExecutionService();
int threadCount = instance.node.getProperties().getInteger(ClusterProperty.MC_EXECUTOR_THREAD_COUNT);
logger.finest("Creating new executor for Management Center service tasks with threadCount=" + threadCount);
executionService.register(ExecutionService.MC_EXECUTOR,
threadCount, threadCount * EXECUTOR_QUEUE_CAPACITY_PER_THREAD,
ExecutorType.CACHED);
}

public Optional<String> getTimedMemberStateJson() {
return Optional.ofNullable(timedMemberStateJson.get());
}
Expand Down
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.internal.management.operation;

import com.hazelcast.internal.management.ConsoleCommandHandler;
import com.hazelcast.internal.management.ManagementCenterService;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.executionservice.ExecutionService;
import com.hazelcast.spi.impl.operationservice.AbstractLocalOperation;

import java.util.concurrent.Future;

import static com.hazelcast.internal.util.ExceptionUtil.peel;
import static com.hazelcast.internal.util.ExceptionUtil.withTryCatch;
import static com.hazelcast.internal.util.StringUtil.isNullOrEmpty;

/**
* Operation to run console command. The command is executed on {@link ExecutionService#MC_EXECUTOR} executor.
*/
public class RunConsoleCommandOperation extends AbstractLocalOperation {

private final String command;
private final String namespace;

public RunConsoleCommandOperation(String command, String namespace) {
this.command = command;
this.namespace = namespace;
}

@Override
public void run() throws Exception {
final ILogger logger = getNodeEngine().getLogger(getClass());
final ManagementCenterService mcService = ((NodeEngineImpl) getNodeEngine()).getManagementCenterService();
final ExecutionService executionService = getNodeEngine().getExecutionService();

Future<String> future = executionService.submit(
ExecutionService.MC_EXECUTOR,
() -> {
ConsoleCommandHandler handler = mcService.getCommandHandler();
try {
final String ns = namespace;
String cmd = command;
if (!isNullOrEmpty(ns)) {
// set namespace as a part of the command
cmd = ns + "__" + cmd;
}
return handler.handleCommand(cmd);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw e;
}
});

executionService.asCompletableFuture(future).whenComplete(
withTryCatch(
logger,
(output, error) -> sendResponse(error != null ? peel(error) : output)
)
);
}

@Override
public final Object getResponse() {
throw new UnsupportedOperationException();
}

@Override
public String getServiceName() {
return ManagementCenterService.SERVICE_NAME;
}

@Override
public boolean returnsResponse() {
return false;
}
}
@@ -0,0 +1,80 @@
/*
* Copyright (c) 2008-2019, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.internal.management.operation;

import com.hazelcast.internal.management.ManagementCenterService;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.executionservice.ExecutionService;
import com.hazelcast.spi.impl.operationservice.AbstractLocalOperation;

import java.util.concurrent.Future;

import static com.hazelcast.internal.util.ExceptionUtil.peel;
import static com.hazelcast.internal.util.ExceptionUtil.withTryCatch;

/**
* Operation to execute script on the node. The script is executed on {@link ExecutionService#MC_EXECUTOR} executor.
* <p>
* Note: This operation reuses {@link ScriptExecutorOperation} to execute the script.
* Once MC migrates to client comms, these classes can be merged.
*/
public class RunScriptOperation extends AbstractLocalOperation {

private final String engine;
private final String script;

public RunScriptOperation(String engine, String script) {
this.engine = engine;
this.script = script;
}

@Override
public void run() {
final ILogger logger = getNodeEngine().getLogger(getClass());
final ExecutionService executionService = getNodeEngine().getExecutionService();
final ScriptExecutorOperation legacyOperation = new ScriptExecutorOperation(getNodeEngine(), engine, script);

Future<Object> future = executionService.submit(
ExecutionService.MC_EXECUTOR,
() -> {
legacyOperation.run();
return legacyOperation.getResponse();
});

executionService.asCompletableFuture(future).whenComplete(
withTryCatch(
logger,
(result, error) -> sendResponse(error != null ? peel(error) : result)
)
);
}

@Override
public final Object getResponse() {
throw new UnsupportedOperationException();
}

@Override
public String getServiceName() {
return ManagementCenterService.SERVICE_NAME;
}

@Override
public boolean returnsResponse() {
return false;
}
}
Expand Up @@ -22,6 +22,7 @@
import com.hazelcast.internal.management.ScriptEngineManagerContext;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.spi.impl.NodeEngine;

import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
Expand All @@ -31,25 +32,37 @@

/**
* Operation to execute script on the node.
* <p>
* Note: Once MC migrates to client comms, this class can be merged into {@link RunScriptOperation}.
*/
public class ScriptExecutorOperation extends AbstractManagementOperation {

private final NodeEngine nodeEngine;

private String engineName;
private String script;
private Object result;

@SuppressWarnings("unused")
public ScriptExecutorOperation() {
nodeEngine = getNodeEngine();
}

public ScriptExecutorOperation(String engineName, String script) {
this.nodeEngine = getNodeEngine();
this.engineName = engineName;
this.script = script;
}

ScriptExecutorOperation(NodeEngine nodeEngine, String engineName, String script) {
this.nodeEngine = nodeEngine;
this.engineName = engineName;
this.script = script;
}

@Override
public void run() {
ManagementCenterConfig managementCenterConfig = getNodeEngine().getConfig().getManagementCenterConfig();
ManagementCenterConfig managementCenterConfig = nodeEngine.getConfig().getManagementCenterConfig();
if (!managementCenterConfig.isScriptingEnabled()) {
throw new AccessControlException("Using ScriptEngine is not allowed on this Hazelcast member.");
}
Expand All @@ -58,7 +71,7 @@ public void run() {
if (engine == null) {
throw new IllegalArgumentException("Could not find ScriptEngine named '" + engineName + "'.");
}
engine.put("hazelcast", getNodeEngine().getHazelcastInstance());
engine.put("hazelcast", nodeEngine.getHazelcastInstance());
try {
this.result = engine.eval(script);
} catch (ScriptException e) {
Expand Down
Expand Up @@ -56,7 +56,7 @@ public interface ExecutionService {
String CLIENT_EXECUTOR = "hz:client";

/**
* Name of the client executor.
* Name of the client query executor.
*/
String CLIENT_QUERY_EXECUTOR = "hz:client-query";

Expand Down Expand Up @@ -104,6 +104,12 @@ public interface ExecutionService {
*/
String MAP_LOAD_ALL_KEYS_EXECUTOR = "hz:map-loadAllKeys";

/**
* Name of the Management Center executor. Used to execute blocking tasks
* related with operations run by Management Center.
*/
String MC_EXECUTOR = "hz:mc";

/**
* @param name for the executor service
* @param poolSize the maximum number of threads to allow in the pool
Expand Down
Expand Up @@ -578,6 +578,13 @@ private int getWhenNoSSLDetected() {
public static final HazelcastProperty MC_MAX_VISIBLE_SLOW_OPERATION_COUNT
= new HazelcastProperty("hazelcast.mc.max.visible.slow.operations.count", 10);

/**
* The number of threads that the Management Center service has available for processing operations
* run sent from connected Management Center instance.
*/
public static final HazelcastProperty MC_EXECUTOR_THREAD_COUNT
= new HazelcastProperty("hazelcast.mc.executor.thread.count", 2);

public static final HazelcastProperty CONNECTION_MONITOR_INTERVAL
= new HazelcastProperty("hazelcast.connection.monitor.interval", 100, MILLISECONDS);
public static final HazelcastProperty CONNECTION_MONITOR_MAX_FAULTS
Expand Down

0 comments on commit 9445ff9

Please sign in to comment.