From f9f554e68bfbffdfd8f87db76d546f7202f1541b Mon Sep 17 00:00:00 2001 From: Weihan Kong Date: Mon, 23 Jan 2017 17:18:00 -0800 Subject: [PATCH 1/7] Add protective check for ZooKeeper writing data that is bigger than 1MB ZooKeeper server drops connections for requests that are trying to write data bigger than 1 MB, without returning any error code. When a Helix user does so, the request times out without giving a reason. ZkClient in Helix is a wrapper for ZkClient in ZooKeeper. Add check in the Helix ZkClient wrapper to give user exact timeout reason when data is bigger than 1MB. Add unit test. --- .../org/apache/helix/manager/zk/ZkClient.java | 24 ++++++++++++++----- .../zk/TestZkClient.java} | 16 +++++++++---- 2 files changed, 29 insertions(+), 11 deletions(-) rename helix-core/src/test/java/org/apache/helix/{TestZkClientWrapper.java => manager/zk/TestZkClient.java} (88%) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java index 0a61e82121..8f11eb3823 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java @@ -30,6 +30,8 @@ import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.serialize.SerializableSerializer; import org.I0Itec.zkclient.serialize.ZkSerializer; +import org.apache.helix.HelixException; +import org.apache.helix.ZNRecord; import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler; import org.apache.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler; import org.apache.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler; @@ -287,7 +289,7 @@ public void writeData(final String path, Object datat, final int expectedVersion long startT = System.nanoTime(); try { final byte[] data = serialize(datat, path); - + checkDataSizeLimit(data); retryUntilConnected(new Callable() { @Override @@ -308,12 +310,13 @@ public Stat writeDataGetStat(final String path, Object datat, final int expected throws InterruptedException { long start = System.nanoTime(); try { - final byte[] bytes = _zkSerializer.serialize(datat, path); + final byte[] data = _zkSerializer.serialize(datat, path); + checkDataSizeLimit(data); return retryUntilConnected(new Callable() { @Override public Stat call() throws Exception { - return ((ZkConnection) _connection).getZookeeper().setData(path, bytes, expectedVersion); + return ((ZkConnection) _connection).getZookeeper().setData(path, data, expectedVersion); } }); } finally { @@ -325,7 +328,7 @@ public Stat call() throws Exception { } @Override - public String create(final String path, Object data, final CreateMode mode) + public String create(final String path, Object datat, final CreateMode mode) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { if (path == null) { throw new NullPointerException("path must not be null."); @@ -333,13 +336,14 @@ public String create(final String path, Object data, final CreateMode mode) long startT = System.nanoTime(); try { - final byte[] bytes = data == null ? null : serialize(data, path); + final byte[] data = datat == null ? null : serialize(datat, path); + checkDataSizeLimit(data); return retryUntilConnected(new Callable() { @Override public String call() throws Exception { - return _connection.create(path, bytes, mode); + return _connection.create(path, data, mode); } }); } finally { @@ -451,4 +455,12 @@ public T call() throws Exception { } }); } + + private void checkDataSizeLimit(byte[] data) { + if (data != null && data.length > ZNRecord.SIZE_LIMIT) { + LOG.error("Data size larger than 1M, will not write to zk. Data (first 1k): " + + new String(data).substring(0, 1024)); + throw new HelixException("Data size larger than 1M"); + } + } } diff --git a/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java similarity index 88% rename from helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java rename to helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java index bc3d266ed9..0019d40c0d 100644 --- a/helix-core/src/test/java/org/apache/helix/TestZkClientWrapper.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java @@ -1,4 +1,4 @@ -package org.apache.helix; +package org.apache.helix.manager.zk; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -21,9 +21,9 @@ import org.I0Itec.zkclient.IZkStateListener; import org.I0Itec.zkclient.ZkConnection; +import org.apache.helix.HelixException; import org.apache.helix.ZNRecord; -import org.apache.helix.manager.zk.ZNRecordSerializer; -import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.ZkUnitTestBase; import org.apache.log4j.Logger; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -35,8 +35,8 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -public class TestZkClientWrapper extends ZkUnitTestBase { - private static Logger LOG = Logger.getLogger(TestZkClientWrapper.class); +public class TestZkClient extends ZkUnitTestBase { + private static Logger LOG = Logger.getLogger(TestZkClient.class); ZkClient _zkClient; @@ -113,4 +113,10 @@ public void process(WatchedEvent event) { zookeeper = connection.getZookeeper(); System.out.println("After session expiry sessionId= " + zookeeper.getSessionId()); } + + @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Data size larger than 1M.*") + void testDataSizeLimit() { + ZNRecord data = new ZNRecord(new String(new char[1024*1024])); + _zkClient.writeData("/test", data, -1); + } } From d5a2395d207da618e767d1f07e20fa116d716cb3 Mon Sep 17 00:00:00 2001 From: Junkai Xue Date: Wed, 8 Feb 2017 17:12:18 -0800 Subject: [PATCH 2/7] Fix TestBatchMessage test fail Test fail because new NO_OP message send as new MessageHandlerFactory registered. --- .../org/apache/helix/integration/TestBatchMessage.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java index e4a29909e5..9e62c0d0a2 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java @@ -105,7 +105,9 @@ public void testBasic() throws Exception { ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName)); Assert.assertTrue(result); - Assert.assertTrue(listener._maxNbOfChilds <= 2, + // Change to three is because there is an extra factory registered + // So one extra NO_OP message send + Assert.assertTrue(listener._maxNbOfChilds <= 3, "Should get no more than 2 messages (O->S and S->M)"); // clean up @@ -185,7 +187,9 @@ public void testChangeBatchMessageMode() throws Exception { ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName)); Assert.assertTrue(result); - Assert.assertTrue(listener._maxNbOfChilds <= 2, + // Change to three is because there is an extra factory registered + // So one extra NO_OP message send + Assert.assertTrue(listener._maxNbOfChilds <= 3, "Should get no more than 2 messages (O->S and S->M)"); // clean up From 94f3961842263d04eef89019a1955e4c49e3305c Mon Sep 17 00:00:00 2001 From: Weihan Kong Date: Wed, 8 Feb 2017 23:38:49 -0800 Subject: [PATCH 3/7] Prevent ClusterControllerManager from starting multiple times ClusterControllerManager is a runnable wrapper for a Helix Controller that could run on a separate thread for testing purpose. Since HelixManager.connect() should not be called more than once, this Controller should not be started more than once, either. --- .../integration/manager/ClusterControllerManager.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java index 9e10771b7e..92ed52bde2 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java @@ -36,6 +36,8 @@ public class ClusterControllerManager extends ZKHelixManager implements Runnable private final CountDownLatch _stopCountDown = new CountDownLatch(1); private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1); + private boolean _started = false; + public ClusterControllerManager(String zkAddr, String clusterName) { this(zkAddr, clusterName, "controller"); } @@ -48,13 +50,20 @@ public void syncStop() { _stopCountDown.countDown(); try { _waitStopFinishCountDown.await(); + _started = false; } catch (InterruptedException e) { LOG.error("Interrupted waiting for finish", e); } } + // This should not be called more than once because HelixManager.connect() should not be called more than once. public void syncStart() { - // TODO: prevent start multiple times + if (_started) { + throw new RuntimeException("Helix Controller already started. Do not call syncStart() more than once."); + } else { + _started = true; + } + new Thread(this).start(); try { _startCountDown.await(); From 408082a33d91f84556c3da31232fb6d4097b4371 Mon Sep 17 00:00:00 2001 From: Weihan Kong Date: Mon, 13 Feb 2017 13:52:16 -0800 Subject: [PATCH 4/7] Be able to stop workflow when no job is running. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Currently, to stop a workflow, the target state of the workflow is set to STOP, then when each job(as a resource in ideal state) was processed in job rebalancer, it will check whether all the jobs in the workflow is done(not in IN_PROGRESS or STOPPING) and set the workflow state to be STOP. However, if all the jobs are already done, there’s no job in ideal state to process, so the workflow state never gets a chance to be set to STOP. This commit adds a check in workflow rebalancer to set the state when all jobs are already done. A test is added to test specifically this case. --- .../apache/helix/task/WorkflowRebalancer.java | 4 ++ .../integration/task/TestStopWorkflow.java | 45 +++++++++++++++++++ 2 files changed, 49 insertions(+) create mode 100644 helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java index 830f93a1b9..8e72f7a530 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java @@ -76,6 +76,10 @@ public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clu if (targetState == TargetState.STOP) { LOG.info("Workflow " + workflow + "is marked as stopped."); + if (isWorkflowStopped(workflowCtx, workflowCfg)) { + workflowCtx.setWorkflowState(TaskState.STOPPED); + TaskUtil.setWorkflowContext(_manager, workflow, workflowCtx); + } return buildEmptyAssignment(workflow, currStateOutput); } diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java new file mode 100644 index 0000000000..b6416989df --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestStopWorkflow.java @@ -0,0 +1,45 @@ +package org.apache.helix.integration.task; + +import com.google.common.collect.ImmutableMap; +import org.apache.helix.TestHelper; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobQueue; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.TaskUtil; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestStopWorkflow extends TaskTestBase { + @BeforeClass + public void beforeClass() throws Exception { + _numParitions = 1; + super.beforeClass(); + } + + @Test + public void testStopWorkflow() throws InterruptedException { + String jobQueueName = TestHelper.getTestMethodName(); + JobConfig.Builder jobBuilder = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG) + .setMaxAttemptsPerTask(1) + .setWorkflow(jobQueueName) + .setJobCommandConfigMap(ImmutableMap.of(MockTask.SUCCESS_COUNT_BEFORE_FAIL, "1")); + + JobQueue.Builder jobQueue = TaskTestUtil.buildJobQueue(jobQueueName); + jobQueue.enqueueJob("job1_will_succeed", jobBuilder); + jobQueue.enqueueJob("job2_will_fail", jobBuilder); + _driver.start(jobQueue.build()); + + // job1 should succeed and job2 should fail, wait until that happens + _driver.pollForJobState(jobQueueName, + TaskUtil.getNamespacedJobName(jobQueueName, "job2_will_fail"), TaskState.FAILED); + + Assert.assertTrue(_driver.getWorkflowContext(jobQueueName).getWorkflowState().equals(TaskState.IN_PROGRESS)); + + // Now stop the workflow, and it should be stopped because all jobs have completed or failed. + _driver.waitToStop(jobQueueName, 4000); + + Assert.assertTrue(_driver.getWorkflowContext(jobQueueName).getWorkflowState().equals(TaskState.STOPPED)); + } +} From 177d5bdc29fc2011e12ca82d7bdf5456ef31a956 Mon Sep 17 00:00:00 2001 From: Junkai Xue Date: Tue, 3 Oct 2017 15:18:32 -0700 Subject: [PATCH 5/7] Add timeout in JobConfig To support job-level timeout for the task framework, add the configuration field. Associated changed is made in builder and JobBean. --- .../java/org/apache/helix/task/JobConfig.java | 30 +++++++++++++++++-- .../java/org/apache/helix/task/TaskState.java | 7 ++++- .../org/apache/helix/task/beans/JobBean.java | 1 + 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java index 12aa058be2..6c3aed4b89 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java @@ -79,6 +79,10 @@ protected enum JobConfigProperty { * The command configuration to be used by the tasks. */ JobCommandConfig, + /** + * The allowed execution time of the job. + */ + Timeout, /** * The timeout for a task. */ @@ -151,6 +155,7 @@ protected enum JobConfigProperty { } //Default property values + public static final long DEFAULT_TIMEOUT = Long.MAX_VALUE; public static final long DEFAULT_TIMEOUT_PER_TASK = 60 * 60 * 1000; // 1 hr. public static final long DEFAULT_TASK_RETRY_DELAY = -1; // no delay public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 10; @@ -171,7 +176,7 @@ public JobConfig(HelixProperty property) { public JobConfig(String jobId, JobConfig jobConfig) { this(jobConfig.getWorkflow(), jobConfig.getTargetResource(), jobConfig.getTargetPartitions(), jobConfig.getTargetPartitionStates(), jobConfig.getCommand(), - jobConfig.getJobCommandConfigMap(), jobConfig.getTimeoutPerTask(), + jobConfig.getJobCommandConfigMap(), jobConfig.getTimeout(), jobConfig.getTimeoutPerTask(), jobConfig.getNumConcurrentTasksPerInstance(), jobConfig.getMaxAttemptsPerTask(), jobConfig.getMaxAttemptsPerTask(), jobConfig.getFailureThreshold(), jobConfig.getTaskRetryDelay(), jobConfig.isDisableExternalView(), @@ -183,7 +188,7 @@ public JobConfig(String jobId, JobConfig jobConfig) { private JobConfig(String workflow, String targetResource, List targetPartitions, Set targetPartitionStates, String command, Map jobCommandConfigMap, - long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask, + long timeout, long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask, int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay, boolean disableExternalView, boolean ignoreDependentJobFailure, Map taskConfigMap, String jobType, String instanceGroupTag, @@ -221,6 +226,7 @@ private JobConfig(String workflow, String targetResource, List targetPar if (executionStart > 0) { getRecord().setLongField(JobConfigProperty.StartTime.name(), executionStart); } + getRecord().setLongField(JobConfigProperty.Timeout.name(), timeout); getRecord().setLongField(JobConfigProperty.TimeoutPerPartition.name(), timeoutPerTask); getRecord().setIntField(JobConfigProperty.MaxAttemptsPerTask.name(), maxAttemptsPerTask); getRecord().setIntField(JobConfigProperty.MaxForcedReassignmentsPerTask.name(), @@ -289,6 +295,10 @@ public Map getJobCommandConfigMap() { : null; } + public long getTimeout() { + return getRecord().getLongField(JobConfigProperty.Timeout.name(), DEFAULT_TIMEOUT); + } + public long getTimeoutPerTask() { return getRecord() .getLongField(JobConfigProperty.TimeoutPerPartition.name(), DEFAULT_TIMEOUT_PER_TASK); @@ -389,6 +399,7 @@ public static class Builder { private String _command; private Map _commandConfig; private Map _taskConfigMap = Maps.newHashMap(); + private long _timeout = DEFAULT_TIMEOUT; private long _timeoutPerTask = DEFAULT_TIMEOUT_PER_TASK; private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE; private int _maxAttemptsPerTask = DEFAULT_MAX_ATTEMPTS_PER_TASK; @@ -417,7 +428,7 @@ public JobConfig build() { validate(); return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates, - _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance, + _command, _commandConfig, _timeout, _timeoutPerTask, _numConcurrentTasksPerInstance, _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay, _disableExternalView, _ignoreDependentJobFailure, _taskConfigMap, _jobType, _instanceGroupTag, _executionDelay, _executionStart, _jobId, _expiry, @@ -456,6 +467,9 @@ public static Builder fromMap(Map cfg) { cfg.get(JobConfigProperty.JobCommandConfig.name())); b.setJobCommandConfigMap(commandConfigMap); } + if (cfg.containsKey(JobConfigProperty.Timeout.name())) { + b.setTimeout(Long.parseLong(cfg.get(JobConfigProperty.Timeout.name()))); + } if (cfg.containsKey(JobConfigProperty.TimeoutPerPartition.name())) { b.setTimeoutPerTask(Long.parseLong(cfg.get(JobConfigProperty.TimeoutPerPartition.name()))); } @@ -544,6 +558,11 @@ public Builder setJobCommandConfigMap(Map v) { return this; } + public Builder setTimeout(long v) { + _timeout = v; + return this; + } + public Builder setTimeoutPerTask(long v) { _timeoutPerTask = v; return this; @@ -660,6 +679,10 @@ private void validate() { } } } + if (_timeout < 0) { + throw new IllegalArgumentException(String + .format("%s has invalid value %s", JobConfigProperty.Timeout, _timeout)); + } if (_timeoutPerTask < 0) { throw new IllegalArgumentException(String .format("%s has invalid value %s", JobConfigProperty.TimeoutPerPartition, @@ -696,6 +719,7 @@ public static Builder from(JobBean jobBean) { b.setMaxAttemptsPerTask(jobBean.maxAttemptsPerTask) .setNumConcurrentTasksPerInstance(jobBean.numConcurrentTasksPerInstance) + .setTimeout(jobBean.timeout) .setTimeoutPerTask(jobBean.timeoutPerPartition) .setFailureThreshold(jobBean.failureThreshold).setTaskRetryDelay(jobBean.taskRetryDelay) .setDisableExternalView(jobBean.disableExternalView) diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskState.java b/helix-core/src/main/java/org/apache/helix/task/TaskState.java index 4e12f2dda9..3713c40515 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskState.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskState.java @@ -50,5 +50,10 @@ public enum TaskState { /** * The task are aborted due to workflow fail */ - ABORTED + ABORTED, + /** + * The allowed execution time for the job. + * TODO: also use this for the task + */ + TIMED_OUT } diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java index 7b42ad2f59..8d2f259e20 100644 --- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java +++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java @@ -38,6 +38,7 @@ public class JobBean { public String command; public Map jobCommandConfigMap; public List tasks; + public long timeout = JobConfig.DEFAULT_TIMEOUT; public long timeoutPerPartition = JobConfig.DEFAULT_TIMEOUT_PER_TASK; public int numConcurrentTasksPerInstance = JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE; public int maxAttemptsPerTask = JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK; From bd113a181a25611b7e2581f0bcbbce24d4aab1eb Mon Sep 17 00:00:00 2001 From: Junkai Xue Date: Tue, 3 Oct 2017 15:22:47 -0700 Subject: [PATCH 6/7] Config to change the interval of healthReport collection To better support the health report, the configurable collection time interval is necessary. Add this configuration in system environment since there is no zookeeper connection happens before create healthreport tasks. --- .../helix/healthcheck/ParticipantHealthReportTask.java | 9 ++++++++- .../java/org/apache/helix/manager/zk/ZKHelixManager.java | 9 +++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportTask.java b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportTask.java index f1c4c24bb2..e0f38ba739 100644 --- a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportTask.java +++ b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportTask.java @@ -29,6 +29,7 @@ public class ParticipantHealthReportTask extends HelixTimerTask { private static final Logger LOG = Logger.getLogger(ParticipantHealthReportTask.class); public final static int DEFAULT_REPORT_LATENCY = 60 * 1000; + private final int _reportLatency; Timer _timer; final ParticipantHealthReportCollectorImpl _healthReportCollector; @@ -42,7 +43,13 @@ public void run() { } public ParticipantHealthReportTask(ParticipantHealthReportCollectorImpl healthReportCollector) { + this(healthReportCollector, DEFAULT_REPORT_LATENCY); + } + + public ParticipantHealthReportTask(ParticipantHealthReportCollectorImpl healthReportCollector, + int reportLatency) { _healthReportCollector = healthReportCollector; + _reportLatency = reportLatency; } @Override @@ -51,7 +58,7 @@ public void start() { LOG.info("Start HealthCheckInfoReportingTask"); _timer = new Timer("ParticipantHealthReportTimerTask", true); _timer.scheduleAtFixedRate(new ParticipantHealthReportTimerTask(), - new Random().nextInt(DEFAULT_REPORT_LATENCY), DEFAULT_REPORT_LATENCY); + new Random().nextInt(_reportLatency), _reportLatency); } else { LOG.warn("ParticipantHealthReportTimerTask already started"); } diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index 670a65e4f0..fdc6a55b4e 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -100,6 +100,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { * helix version# */ private final String _version; + private int _reportLatency; protected ZkClient _zkclient = null; private final DefaultMessagingService _messagingService; @@ -231,6 +232,8 @@ public ZKHelixManager(String clusterName, String instanceName, InstanceType inst _connectionRetryTimeout = getSystemPropertyAsInt("zk.connectionReEstablishment.timeout", DEFAULT_CONNECTION_ESTABLISHMENT_RETRY_TIMEOUT); + _reportLatency = getSystemPropertyAsInt("helixmanager.participantHealthReport.reportLatency", + ParticipantHealthReportTask.DEFAULT_REPORT_LATENCY); /** * instance type specific init @@ -240,7 +243,8 @@ public ZKHelixManager(String clusterName, String instanceName, InstanceType inst _stateMachineEngine = new HelixStateMachineEngine(this); _participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName); - _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector)); + _timerTasks + .add(new ParticipantHealthReportTask(_participantHealthInfoCollector, _reportLatency)); break; case CONTROLLER: _stateMachineEngine = null; @@ -253,7 +257,8 @@ public ZKHelixManager(String clusterName, String instanceName, InstanceType inst _participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName); - _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector)); + _timerTasks + .add(new ParticipantHealthReportTask(_participantHealthInfoCollector, _reportLatency)); _controllerTimerTasks.add(new StatusDumpTask(this)); break; case ADMINISTRATOR: From d57882b9b613a2203886e1ef0da74ccc077d64c3 Mon Sep 17 00:00:00 2001 From: Junkai Xue Date: Tue, 3 Oct 2017 15:28:50 -0700 Subject: [PATCH 7/7] [HELIX-668] Fix remove context with namespaced job name --- .../src/main/java/org/apache/helix/task/TaskDriver.java | 2 +- .../helix/integration/TestPartitionMovementThrottle.java | 4 ++-- .../integration/task/TestGenericTaskAssignmentCalculator.java | 2 +- .../test/java/org/apache/helix/manager/zk/TestZkClient.java | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index 97703f7de8..df5cdf6384 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -360,7 +360,7 @@ private void removeJob(String queueName, String jobName) { removeJobStateFromQueue(queueName, jobName); // Delete the job from property store - TaskUtil.removeJobContext(_propertyStore, jobName); + TaskUtil.removeJobContext(_propertyStore, namespacedJobName); } /** Remove the job name from the DAG from the queue configuration */ diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java index 7a87a0f63f..a2596b3aa8 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java @@ -268,13 +268,13 @@ public static void enableThrottleRecord() { */ if (!resourcePatitionTransitionTimes.containsKey(message.getResourceName())) { resourcePatitionTransitionTimes - .put(message.getResourceName(), new ArrayList()); + .put(message.getResourceName(), Collections.synchronizedList(new ArrayList())); } resourcePatitionTransitionTimes.get(message.getResourceName()).add(partitionTransitionTime); if (!instancePatitionTransitionTimes.containsKey(message.getTgtName())) { instancePatitionTransitionTimes - .put(message.getTgtName(), new ArrayList()); + .put(message.getTgtName(), Collections.synchronizedList(new ArrayList())); } instancePatitionTransitionTimes.get(message.getTgtName()).add(partitionTransitionTime); } diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java index cd6822a989..030b7b9fe3 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java @@ -180,7 +180,7 @@ public void testAbortTaskForWorkflowFail() } } - Assert.assertEquals(abortedTask, 4); + Assert.assertTrue(abortedTask > 0); } private class TaskOne extends MockTask { diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java index 0019d40c0d..1c86877b5b 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java @@ -116,7 +116,7 @@ public void process(WatchedEvent event) { @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Data size larger than 1M.*") void testDataSizeLimit() { - ZNRecord data = new ZNRecord(new String(new char[1024*1024])); + ZNRecord data = new ZNRecord(new String(new char[1024*1024*128])); _zkClient.writeData("/test", data, -1); } }