From f38acf2652db7eafc68b0843efa91b1116e78867 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E4=BC=9F=E6=9D=B0?= Date: Tue, 29 Sep 2020 12:45:22 +0800 Subject: [PATCH] Distinguish guarantee service by job name in singleton AbstractDistributeOnceElasticJobListener (#1508) * Distinguish guarantee service by job name in singleton AbstractDistributeOnceElasticJobListener * Fix checkstyle --- ...tractDistributeOnceElasticJobListener.java | 68 ++++++++++++++----- .../guarantee/GuaranteeListenerManager.java | 7 +- .../lite/internal/schedule/JobScheduler.java | 31 ++++++--- .../lite/internal/util/ParameterUtils.java | 50 ++++++++++++++ .../DistributeOnceElasticJobListenerTest.java | 5 +- .../TestDistributeOnceElasticJobListener.java | 13 +--- .../TestDistributeOnceElasticJobListener.java | 4 -- .../GuaranteeListenerManagerTest.java | 12 ++-- .../internal/util/ParameterUtilsTest.java | 44 ++++++++++++ .../fixture/listener/SimpleOnceListener.java | 1 - 10 files changed, 184 insertions(+), 51 deletions(-) create mode 100644 elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/util/ParameterUtils.java create mode 100644 elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/util/ParameterUtilsTest.java diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java index 7ef17f87f9..ab78fe8218 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java +++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.elasticjob.lite.api.listener; -import lombok.Setter; +import lombok.Getter; import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils; import org.apache.shardingsphere.elasticjob.infra.env.TimeService; import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException; @@ -26,32 +26,34 @@ import org.apache.shardingsphere.elasticjob.lite.internal.guarantee.GuaranteeService; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * Distributed once elasticjob listener. */ public abstract class AbstractDistributeOnceElasticJobListener implements ElasticJobListener { - private final long startedTimeoutMilliseconds; - - private final Object startedWait = new Object(); - - private final long completedTimeoutMilliseconds; - - private final Object completedWait = new Object(); - - @Setter - private GuaranteeService guaranteeService; + private final ConcurrentMap listenerContexts = new ConcurrentHashMap<>(); private final TimeService timeService = new TimeService(); - public AbstractDistributeOnceElasticJobListener(final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds) { - this.startedTimeoutMilliseconds = startedTimeoutMilliseconds <= 0L ? Long.MAX_VALUE : startedTimeoutMilliseconds; - this.completedTimeoutMilliseconds = completedTimeoutMilliseconds <= 0L ? Long.MAX_VALUE : completedTimeoutMilliseconds; + /** + * Add guarantee service for specific job. + * + * @param guaranteeService guarantee service + * @param jobName job name + * @param startedTimeoutMilliseconds started timeout milliseconds + * @param completedTimeoutMilliseconds completed timeout milliseconds + */ + public void addGuaranteeService(final GuaranteeService guaranteeService, final String jobName, final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds) { + listenerContexts.computeIfAbsent(jobName, unused -> new DistributeOnceListenerContext(startedTimeoutMilliseconds, completedTimeoutMilliseconds, guaranteeService)); } @Override public final void beforeJobExecuted(final ShardingContexts shardingContexts) { + DistributeOnceListenerContext context = listenerContexts.get(shardingContexts.getJobName()); + GuaranteeService guaranteeService = context.getGuaranteeService(); Set shardingItems = shardingContexts.getShardingItemParameters().keySet(); guaranteeService.registerStart(shardingItems); while (!guaranteeService.isRegisterStartSuccess(shardingItems)) { @@ -63,7 +65,9 @@ public final void beforeJobExecuted(final ShardingContexts shardingContexts) { return; } long before = timeService.getCurrentMillis(); + long startedTimeoutMilliseconds = context.getStartedTimeoutMilliseconds(); try { + Object startedWait = context.getStartedWait(); synchronized (startedWait) { startedWait.wait(startedTimeoutMilliseconds); } @@ -78,6 +82,8 @@ public final void beforeJobExecuted(final ShardingContexts shardingContexts) { @Override public final void afterJobExecuted(final ShardingContexts shardingContexts) { + DistributeOnceListenerContext context = listenerContexts.get(shardingContexts.getJobName()); + GuaranteeService guaranteeService = context.getGuaranteeService(); Set shardingItems = shardingContexts.getShardingItemParameters().keySet(); guaranteeService.registerComplete(shardingItems); while (!guaranteeService.isRegisterCompleteSuccess(shardingItems)) { @@ -89,7 +95,9 @@ public final void afterJobExecuted(final ShardingContexts shardingContexts) { return; } long before = timeService.getCurrentMillis(); + long completedTimeoutMilliseconds = context.getCompletedTimeoutMilliseconds(); try { + Object completedWait = context.getCompletedWait(); synchronized (completedWait) { completedWait.wait(completedTimeoutMilliseconds); } @@ -122,8 +130,12 @@ private void handleTimeout(final long timeoutMilliseconds) { /** * Notify waiting task start. + * + * @param jobName job name */ - public void notifyWaitingTaskStart() { + public void notifyWaitingTaskStart(final String jobName) { + DistributeOnceListenerContext context = listenerContexts.get(jobName); + Object startedWait = context.getStartedWait(); synchronized (startedWait) { startedWait.notifyAll(); } @@ -131,10 +143,34 @@ public void notifyWaitingTaskStart() { /** * Notify waiting task complete. + * + * @param jobName job name */ - public void notifyWaitingTaskComplete() { + public void notifyWaitingTaskComplete(final String jobName) { + DistributeOnceListenerContext context = listenerContexts.get(jobName); + Object completedWait = context.getCompletedWait(); synchronized (completedWait) { completedWait.notifyAll(); } } + + @Getter + private static class DistributeOnceListenerContext { + + private final long startedTimeoutMilliseconds; + + private final Object startedWait = new Object(); + + private final long completedTimeoutMilliseconds; + + private final Object completedWait = new Object(); + + private final GuaranteeService guaranteeService; + + DistributeOnceListenerContext(final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds, final GuaranteeService guaranteeService) { + this.startedTimeoutMilliseconds = startedTimeoutMilliseconds <= 0L ? Long.MAX_VALUE : startedTimeoutMilliseconds; + this.completedTimeoutMilliseconds = completedTimeoutMilliseconds <= 0L ? Long.MAX_VALUE : completedTimeoutMilliseconds; + this.guaranteeService = guaranteeService; + } + } } diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManager.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManager.java index 74975924fb..738dc26c6b 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManager.java +++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManager.java @@ -34,9 +34,12 @@ public final class GuaranteeListenerManager extends AbstractListenerManager { private final Collection elasticJobListeners; + private final String jobName; + public GuaranteeListenerManager(final CoordinatorRegistryCenter regCenter, final String jobName, final Collection elasticJobListeners) { super(regCenter, jobName); this.guaranteeNode = new GuaranteeNode(jobName); + this.jobName = jobName; this.elasticJobListeners = elasticJobListeners; } @@ -53,7 +56,7 @@ protected void dataChanged(final String path, final Type eventType, final String if (Type.NODE_DELETED == eventType && guaranteeNode.isStartedRootNode(path)) { for (ElasticJobListener each : elasticJobListeners) { if (each instanceof AbstractDistributeOnceElasticJobListener) { - ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart(); + ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart(jobName); } } } @@ -67,7 +70,7 @@ protected void dataChanged(final String path, final Type eventType, final String if (Type.NODE_DELETED == eventType && guaranteeNode.isCompletedRootNode(path)) { for (ElasticJobListener each : elasticJobListeners) { if (each instanceof AbstractDistributeOnceElasticJobListener) { - ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskComplete(); + ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskComplete(jobName); } } } diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java index ab52c6c778..c1bf0c3721 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java +++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java @@ -29,6 +29,7 @@ import org.apache.shardingsphere.elasticjob.lite.internal.guarantee.GuaranteeService; import org.apache.shardingsphere.elasticjob.lite.internal.setup.JobClassNameProviderFactory; import org.apache.shardingsphere.elasticjob.lite.internal.setup.SetUpFacade; +import org.apache.shardingsphere.elasticjob.lite.internal.util.ParameterUtils; import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration; import org.quartz.JobBuilder; @@ -39,6 +40,8 @@ import org.quartz.simpl.SimpleThreadPool; import java.util.Collection; +import java.util.Collections; +import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; @@ -76,14 +79,13 @@ public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob this.regCenter = regCenter; elasticJobType = null; final Collection elasticJobListeners = jobConfig.getJobListenerTypes().stream() - .map(ElasticJobListenerFactory::getListener).collect(Collectors.toList()); + .map(elasticJobTypeWithParameter -> lookupElasticJobListener(jobConfig.getJobName(), elasticJobTypeWithParameter)).collect(Collectors.toList()); setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), elasticJobListeners); schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName()); jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), elasticJobListeners, tracingConfig); jobExecutor = null == elasticJob ? new ElasticJobExecutor(elasticJobType, jobConfig, jobFacade) : new ElasticJobExecutor(elasticJob, jobConfig, jobFacade); String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob); this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName, jobConfig); - setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListeners); jobScheduleController = createJobScheduleController(); } @@ -95,23 +97,32 @@ public JobScheduler(final CoordinatorRegistryCenter regCenter, final String elas this.regCenter = regCenter; this.elasticJobType = elasticJobType; final Collection elasticJobListeners = jobConfig.getJobListenerTypes().stream() - .map(ElasticJobListenerFactory::getListener).collect(Collectors.toList()); + .map(elasticJobTypeWithParameter -> lookupElasticJobListener(jobConfig.getJobName(), elasticJobTypeWithParameter)).collect(Collectors.toList()); setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), elasticJobListeners); schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName()); jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), elasticJobListeners, tracingConfig); jobExecutor = new ElasticJobExecutor(elasticJobType, jobConfig, jobFacade); this.jobConfig = setUpFacade.setUpJobConfiguration(elasticJobType, jobConfig); - setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListeners); jobScheduleController = createJobScheduleController(); } - private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistryCenter regCenter, final Collection elasticJobListeners) { - GuaranteeService guaranteeService = new GuaranteeService(regCenter, jobConfig.getJobName()); - for (ElasticJobListener each : elasticJobListeners) { - if (each instanceof AbstractDistributeOnceElasticJobListener) { - ((AbstractDistributeOnceElasticJobListener) each).setGuaranteeService(guaranteeService); - } + private ElasticJobListener lookupElasticJobListener(final String jobName, final String jobListenerTypeWithParameter) { + String[] split = jobListenerTypeWithParameter.split("\\?"); + String jobListenerType = split[0]; + ElasticJobListener listener = ElasticJobListenerFactory.getListener(jobListenerType); + if (!(listener instanceof AbstractDistributeOnceElasticJobListener)) { + return listener; } + Map parameters = 1 < split.length ? ParameterUtils.parseQuery(split[1]) : Collections.emptyMap(); + return configureGuaranteeService(jobName, parameters, listener); + } + + private ElasticJobListener configureGuaranteeService(final String jobName, final Map parameters, final ElasticJobListener listener) { + GuaranteeService guaranteeService = new GuaranteeService(regCenter, jobName); + long startedTimeoutMilliseconds = Long.parseLong(parameters.getOrDefault("startedTimeoutMilliseconds", "0")); + long completedTimeoutMilliseconds = Long.parseLong(parameters.getOrDefault("completedTimeoutMilliseconds", "0")); + ((AbstractDistributeOnceElasticJobListener) listener).addGuaranteeService(guaranteeService, jobName, startedTimeoutMilliseconds, completedTimeoutMilliseconds); + return listener; } private JobScheduleController createJobScheduleController() { diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/util/ParameterUtils.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/util/ParameterUtils.java new file mode 100644 index 0000000000..cbcb861b98 --- /dev/null +++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/util/ParameterUtils.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.elasticjob.lite.internal.util; + +import com.google.common.base.Strings; +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Parameter utils. + */ +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +public final class ParameterUtils { + + /** + * Parse string like key1=value1&key2=value2 to {@link Map}. + * + * @param query parameter string + * @return map + */ + public static Map parseQuery(final String query) { + if (Strings.isNullOrEmpty(query)) { + return Collections.emptyMap(); + } + return Arrays.stream(query.split("&")).map(String::trim) + .filter(pair -> !pair.isEmpty()) + .map(parameter -> parameter.split("=")) + .collect(Collectors.toMap(pair -> pair[0], pair -> 1 < pair.length ? pair[1] : "")); + } +} diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java index 549daef62b..bfeef4c7b9 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java +++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java @@ -57,8 +57,9 @@ public final class DistributeOnceElasticJobListenerTest { @Before public void setUp() { - distributeOnceElasticJobListener = new TestDistributeOnceElasticJobListener(elasticJobListenerCaller); - distributeOnceElasticJobListener.setGuaranteeService(guaranteeService); + distributeOnceElasticJobListener = new TestDistributeOnceElasticJobListener(); + distributeOnceElasticJobListener.setCaller(elasticJobListenerCaller); + distributeOnceElasticJobListener.addGuaranteeService(guaranteeService, "test_job", 1L, 1L); ReflectionUtils.setSuperclassFieldValue(distributeOnceElasticJobListener, "timeService", timeService); Map map = new HashMap<>(2, 1); map.put(0, ""); diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/fixture/TestDistributeOnceElasticJobListener.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/fixture/TestDistributeOnceElasticJobListener.java index 6150d6531d..be05179ea2 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/fixture/TestDistributeOnceElasticJobListener.java +++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/fixture/TestDistributeOnceElasticJobListener.java @@ -17,21 +17,14 @@ package org.apache.shardingsphere.elasticjob.lite.api.listener.fixture; +import lombok.Setter; import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts; import org.apache.shardingsphere.elasticjob.lite.api.listener.AbstractDistributeOnceElasticJobListener; public final class TestDistributeOnceElasticJobListener extends AbstractDistributeOnceElasticJobListener { - private final ElasticJobListenerCaller caller; - - public TestDistributeOnceElasticJobListener() { - this(null); - } - - public TestDistributeOnceElasticJobListener(final ElasticJobListenerCaller caller) { - super(1L, 1L); - this.caller = caller; - } + @Setter + private ElasticJobListenerCaller caller; @Override public void doBeforeJobExecutedAtLastStarted(final ShardingContexts shardingContexts) { diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/listener/TestDistributeOnceElasticJobListener.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/listener/TestDistributeOnceElasticJobListener.java index 5bcb7f8b9d..c2c1ef7e64 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/listener/TestDistributeOnceElasticJobListener.java +++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/listener/TestDistributeOnceElasticJobListener.java @@ -22,10 +22,6 @@ public class TestDistributeOnceElasticJobListener extends AbstractDistributeOnceElasticJobListener { - public TestDistributeOnceElasticJobListener() { - super(100L, 100L); - } - @Override public void doBeforeJobExecutedAtLastStarted(final ShardingContexts shardingContexts) { } diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManagerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManagerTest.java index ca2be424c6..279379a586 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManagerTest.java +++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManagerTest.java @@ -64,36 +64,36 @@ public void assertStart() { @Test public void assertStartedNodeRemovedJobListenerWhenIsNotRemoved() { guaranteeListenerManager.new StartedNodeRemovedJobListener().dataChanged("/test_job/guarantee/started", Type.NODE_CHANGED, ""); - verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart(); + verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart("test_job"); } @Test public void assertStartedNodeRemovedJobListenerWhenIsNotStartedNode() { guaranteeListenerManager.new StartedNodeRemovedJobListener().dataChanged("/other_job/guarantee/started", Type.NODE_DELETED, ""); - verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart(); + verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart("test_job"); } @Test public void assertStartedNodeRemovedJobListenerWhenIsRemovedAndStartedNode() { guaranteeListenerManager.new StartedNodeRemovedJobListener().dataChanged("/test_job/guarantee/started", Type.NODE_DELETED, ""); - verify(distributeOnceElasticJobListener).notifyWaitingTaskStart(); + verify(distributeOnceElasticJobListener).notifyWaitingTaskStart("test_job"); } @Test public void assertCompletedNodeRemovedJobListenerWhenIsNotRemoved() { guaranteeListenerManager.new CompletedNodeRemovedJobListener().dataChanged("/test_job/guarantee/completed", Type.NODE_CHANGED, ""); - verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart(); + verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart("test_job"); } @Test public void assertCompletedNodeRemovedJobListenerWhenIsNotCompletedNode() { guaranteeListenerManager.new CompletedNodeRemovedJobListener().dataChanged("/other_job/guarantee/completed", Type.NODE_DELETED, ""); - verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart(); + verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart("test_job"); } @Test public void assertCompletedNodeRemovedJobListenerWhenIsRemovedAndCompletedNode() { guaranteeListenerManager.new CompletedNodeRemovedJobListener().dataChanged("/test_job/guarantee/completed", Type.NODE_DELETED, ""); - verify(distributeOnceElasticJobListener).notifyWaitingTaskComplete(); + verify(distributeOnceElasticJobListener).notifyWaitingTaskComplete("test_job"); } } diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/util/ParameterUtilsTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/util/ParameterUtilsTest.java new file mode 100644 index 0000000000..63d737ef94 --- /dev/null +++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/util/ParameterUtilsTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.elasticjob.lite.internal.util; + +import org.junit.Test; + +import java.util.Map; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +public final class ParameterUtilsTest { + + @Test + public void assertParseQueryString() { + String queryString = "key1=foo&key2&key3=bar"; + Map result = ParameterUtils.parseQuery(queryString); + assertThat(result.get("key1"), is("foo")); + assertThat(result.get("key2"), is("")); + assertThat(result.get("key3"), is("bar")); + } + + @Test + public void assertParseEmptyString() { + Map result = ParameterUtils.parseQuery(""); + assertTrue(result.isEmpty()); + } +} diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/listener/SimpleOnceListener.java b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/listener/SimpleOnceListener.java index b7dc2130e6..67e785911d 100644 --- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/listener/SimpleOnceListener.java +++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/listener/SimpleOnceListener.java @@ -40,7 +40,6 @@ public SimpleOnceListener() { } public SimpleOnceListener(final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds) { - super(startedTimeoutMilliseconds, completedTimeoutMilliseconds); this.startedTimeoutMilliseconds = startedTimeoutMilliseconds; this.completedTimeoutMilliseconds = completedTimeoutMilliseconds; }