From e7af3f79c0264da518e90530b09f487e5fecd1a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E4=BC=9F=E6=9D=B0?= Date: Thu, 1 Oct 2020 20:56:09 +0800 Subject: [PATCH] Revert "Distinguish guarantee service by job name in singleton AbstractDistributeOnceElasticJobListener (#1508)" (#1533) This reverts commit f38acf26 --- ...tractDistributeOnceElasticJobListener.java | 68 +++++-------------- .../guarantee/GuaranteeListenerManager.java | 7 +- .../lite/internal/schedule/JobScheduler.java | 31 +++------ .../lite/internal/util/ParameterUtils.java | 50 -------------- .../DistributeOnceElasticJobListenerTest.java | 5 +- .../TestDistributeOnceElasticJobListener.java | 13 +++- .../TestDistributeOnceElasticJobListener.java | 4 ++ .../GuaranteeListenerManagerTest.java | 12 ++-- .../internal/util/ParameterUtilsTest.java | 44 ------------ .../fixture/listener/SimpleOnceListener.java | 1 + 10 files changed, 51 insertions(+), 184 deletions(-) delete mode 100644 elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/util/ParameterUtils.java delete mode 100644 elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/util/ParameterUtilsTest.java diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java index ab78fe8218..7ef17f87f9 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java +++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/listener/AbstractDistributeOnceElasticJobListener.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.elasticjob.lite.api.listener; -import lombok.Getter; +import lombok.Setter; import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils; import org.apache.shardingsphere.elasticjob.infra.env.TimeService; import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException; @@ -26,34 +26,32 @@ import org.apache.shardingsphere.elasticjob.lite.internal.guarantee.GuaranteeService; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; /** * Distributed once elasticjob listener. */ public abstract class AbstractDistributeOnceElasticJobListener implements ElasticJobListener { - private final ConcurrentMap listenerContexts = new ConcurrentHashMap<>(); + private final long startedTimeoutMilliseconds; + + private final Object startedWait = new Object(); + + private final long completedTimeoutMilliseconds; + + private final Object completedWait = new Object(); + + @Setter + private GuaranteeService guaranteeService; private final TimeService timeService = new TimeService(); - /** - * Add guarantee service for specific job. - * - * @param guaranteeService guarantee service - * @param jobName job name - * @param startedTimeoutMilliseconds started timeout milliseconds - * @param completedTimeoutMilliseconds completed timeout milliseconds - */ - public void addGuaranteeService(final GuaranteeService guaranteeService, final String jobName, final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds) { - listenerContexts.computeIfAbsent(jobName, unused -> new DistributeOnceListenerContext(startedTimeoutMilliseconds, completedTimeoutMilliseconds, guaranteeService)); + public AbstractDistributeOnceElasticJobListener(final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds) { + this.startedTimeoutMilliseconds = startedTimeoutMilliseconds <= 0L ? Long.MAX_VALUE : startedTimeoutMilliseconds; + this.completedTimeoutMilliseconds = completedTimeoutMilliseconds <= 0L ? Long.MAX_VALUE : completedTimeoutMilliseconds; } @Override public final void beforeJobExecuted(final ShardingContexts shardingContexts) { - DistributeOnceListenerContext context = listenerContexts.get(shardingContexts.getJobName()); - GuaranteeService guaranteeService = context.getGuaranteeService(); Set shardingItems = shardingContexts.getShardingItemParameters().keySet(); guaranteeService.registerStart(shardingItems); while (!guaranteeService.isRegisterStartSuccess(shardingItems)) { @@ -65,9 +63,7 @@ public final void beforeJobExecuted(final ShardingContexts shardingContexts) { return; } long before = timeService.getCurrentMillis(); - long startedTimeoutMilliseconds = context.getStartedTimeoutMilliseconds(); try { - Object startedWait = context.getStartedWait(); synchronized (startedWait) { startedWait.wait(startedTimeoutMilliseconds); } @@ -82,8 +78,6 @@ public final void beforeJobExecuted(final ShardingContexts shardingContexts) { @Override public final void afterJobExecuted(final ShardingContexts shardingContexts) { - DistributeOnceListenerContext context = listenerContexts.get(shardingContexts.getJobName()); - GuaranteeService guaranteeService = context.getGuaranteeService(); Set shardingItems = shardingContexts.getShardingItemParameters().keySet(); guaranteeService.registerComplete(shardingItems); while (!guaranteeService.isRegisterCompleteSuccess(shardingItems)) { @@ -95,9 +89,7 @@ public final void afterJobExecuted(final ShardingContexts shardingContexts) { return; } long before = timeService.getCurrentMillis(); - long completedTimeoutMilliseconds = context.getCompletedTimeoutMilliseconds(); try { - Object completedWait = context.getCompletedWait(); synchronized (completedWait) { completedWait.wait(completedTimeoutMilliseconds); } @@ -130,12 +122,8 @@ private void handleTimeout(final long timeoutMilliseconds) { /** * Notify waiting task start. - * - * @param jobName job name */ - public void notifyWaitingTaskStart(final String jobName) { - DistributeOnceListenerContext context = listenerContexts.get(jobName); - Object startedWait = context.getStartedWait(); + public void notifyWaitingTaskStart() { synchronized (startedWait) { startedWait.notifyAll(); } @@ -143,34 +131,10 @@ public void notifyWaitingTaskStart(final String jobName) { /** * Notify waiting task complete. - * - * @param jobName job name */ - public void notifyWaitingTaskComplete(final String jobName) { - DistributeOnceListenerContext context = listenerContexts.get(jobName); - Object completedWait = context.getCompletedWait(); + public void notifyWaitingTaskComplete() { synchronized (completedWait) { completedWait.notifyAll(); } } - - @Getter - private static class DistributeOnceListenerContext { - - private final long startedTimeoutMilliseconds; - - private final Object startedWait = new Object(); - - private final long completedTimeoutMilliseconds; - - private final Object completedWait = new Object(); - - private final GuaranteeService guaranteeService; - - DistributeOnceListenerContext(final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds, final GuaranteeService guaranteeService) { - this.startedTimeoutMilliseconds = startedTimeoutMilliseconds <= 0L ? Long.MAX_VALUE : startedTimeoutMilliseconds; - this.completedTimeoutMilliseconds = completedTimeoutMilliseconds <= 0L ? Long.MAX_VALUE : completedTimeoutMilliseconds; - this.guaranteeService = guaranteeService; - } - } } diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManager.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManager.java index 738dc26c6b..74975924fb 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManager.java +++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManager.java @@ -34,12 +34,9 @@ public final class GuaranteeListenerManager extends AbstractListenerManager { private final Collection elasticJobListeners; - private final String jobName; - public GuaranteeListenerManager(final CoordinatorRegistryCenter regCenter, final String jobName, final Collection elasticJobListeners) { super(regCenter, jobName); this.guaranteeNode = new GuaranteeNode(jobName); - this.jobName = jobName; this.elasticJobListeners = elasticJobListeners; } @@ -56,7 +53,7 @@ protected void dataChanged(final String path, final Type eventType, final String if (Type.NODE_DELETED == eventType && guaranteeNode.isStartedRootNode(path)) { for (ElasticJobListener each : elasticJobListeners) { if (each instanceof AbstractDistributeOnceElasticJobListener) { - ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart(jobName); + ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart(); } } } @@ -70,7 +67,7 @@ protected void dataChanged(final String path, final Type eventType, final String if (Type.NODE_DELETED == eventType && guaranteeNode.isCompletedRootNode(path)) { for (ElasticJobListener each : elasticJobListeners) { if (each instanceof AbstractDistributeOnceElasticJobListener) { - ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskComplete(jobName); + ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskComplete(); } } } diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java index c1bf0c3721..ab52c6c778 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java +++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java @@ -29,7 +29,6 @@ import org.apache.shardingsphere.elasticjob.lite.internal.guarantee.GuaranteeService; import org.apache.shardingsphere.elasticjob.lite.internal.setup.JobClassNameProviderFactory; import org.apache.shardingsphere.elasticjob.lite.internal.setup.SetUpFacade; -import org.apache.shardingsphere.elasticjob.lite.internal.util.ParameterUtils; import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration; import org.quartz.JobBuilder; @@ -40,8 +39,6 @@ import org.quartz.simpl.SimpleThreadPool; import java.util.Collection; -import java.util.Collections; -import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; @@ -79,13 +76,14 @@ public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob this.regCenter = regCenter; elasticJobType = null; final Collection elasticJobListeners = jobConfig.getJobListenerTypes().stream() - .map(elasticJobTypeWithParameter -> lookupElasticJobListener(jobConfig.getJobName(), elasticJobTypeWithParameter)).collect(Collectors.toList()); + .map(ElasticJobListenerFactory::getListener).collect(Collectors.toList()); setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), elasticJobListeners); schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName()); jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), elasticJobListeners, tracingConfig); jobExecutor = null == elasticJob ? new ElasticJobExecutor(elasticJobType, jobConfig, jobFacade) : new ElasticJobExecutor(elasticJob, jobConfig, jobFacade); String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob); this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName, jobConfig); + setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListeners); jobScheduleController = createJobScheduleController(); } @@ -97,32 +95,23 @@ public JobScheduler(final CoordinatorRegistryCenter regCenter, final String elas this.regCenter = regCenter; this.elasticJobType = elasticJobType; final Collection elasticJobListeners = jobConfig.getJobListenerTypes().stream() - .map(elasticJobTypeWithParameter -> lookupElasticJobListener(jobConfig.getJobName(), elasticJobTypeWithParameter)).collect(Collectors.toList()); + .map(ElasticJobListenerFactory::getListener).collect(Collectors.toList()); setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), elasticJobListeners); schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName()); jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), elasticJobListeners, tracingConfig); jobExecutor = new ElasticJobExecutor(elasticJobType, jobConfig, jobFacade); this.jobConfig = setUpFacade.setUpJobConfiguration(elasticJobType, jobConfig); + setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListeners); jobScheduleController = createJobScheduleController(); } - private ElasticJobListener lookupElasticJobListener(final String jobName, final String jobListenerTypeWithParameter) { - String[] split = jobListenerTypeWithParameter.split("\\?"); - String jobListenerType = split[0]; - ElasticJobListener listener = ElasticJobListenerFactory.getListener(jobListenerType); - if (!(listener instanceof AbstractDistributeOnceElasticJobListener)) { - return listener; + private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistryCenter regCenter, final Collection elasticJobListeners) { + GuaranteeService guaranteeService = new GuaranteeService(regCenter, jobConfig.getJobName()); + for (ElasticJobListener each : elasticJobListeners) { + if (each instanceof AbstractDistributeOnceElasticJobListener) { + ((AbstractDistributeOnceElasticJobListener) each).setGuaranteeService(guaranteeService); + } } - Map parameters = 1 < split.length ? ParameterUtils.parseQuery(split[1]) : Collections.emptyMap(); - return configureGuaranteeService(jobName, parameters, listener); - } - - private ElasticJobListener configureGuaranteeService(final String jobName, final Map parameters, final ElasticJobListener listener) { - GuaranteeService guaranteeService = new GuaranteeService(regCenter, jobName); - long startedTimeoutMilliseconds = Long.parseLong(parameters.getOrDefault("startedTimeoutMilliseconds", "0")); - long completedTimeoutMilliseconds = Long.parseLong(parameters.getOrDefault("completedTimeoutMilliseconds", "0")); - ((AbstractDistributeOnceElasticJobListener) listener).addGuaranteeService(guaranteeService, jobName, startedTimeoutMilliseconds, completedTimeoutMilliseconds); - return listener; } private JobScheduleController createJobScheduleController() { diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/util/ParameterUtils.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/util/ParameterUtils.java deleted file mode 100644 index cbcb861b98..0000000000 --- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/util/ParameterUtils.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.elasticjob.lite.internal.util; - -import com.google.common.base.Strings; -import lombok.AccessLevel; -import lombok.RequiredArgsConstructor; - -import java.util.Arrays; -import java.util.Collections; -import java.util.Map; -import java.util.stream.Collectors; - -/** - * Parameter utils. - */ -@RequiredArgsConstructor(access = AccessLevel.PRIVATE) -public final class ParameterUtils { - - /** - * Parse string like key1=value1&key2=value2 to {@link Map}. - * - * @param query parameter string - * @return map - */ - public static Map parseQuery(final String query) { - if (Strings.isNullOrEmpty(query)) { - return Collections.emptyMap(); - } - return Arrays.stream(query.split("&")).map(String::trim) - .filter(pair -> !pair.isEmpty()) - .map(parameter -> parameter.split("=")) - .collect(Collectors.toMap(pair -> pair[0], pair -> 1 < pair.length ? pair[1] : "")); - } -} diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java index bfeef4c7b9..549daef62b 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java +++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/DistributeOnceElasticJobListenerTest.java @@ -57,9 +57,8 @@ public final class DistributeOnceElasticJobListenerTest { @Before public void setUp() { - distributeOnceElasticJobListener = new TestDistributeOnceElasticJobListener(); - distributeOnceElasticJobListener.setCaller(elasticJobListenerCaller); - distributeOnceElasticJobListener.addGuaranteeService(guaranteeService, "test_job", 1L, 1L); + distributeOnceElasticJobListener = new TestDistributeOnceElasticJobListener(elasticJobListenerCaller); + distributeOnceElasticJobListener.setGuaranteeService(guaranteeService); ReflectionUtils.setSuperclassFieldValue(distributeOnceElasticJobListener, "timeService", timeService); Map map = new HashMap<>(2, 1); map.put(0, ""); diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/fixture/TestDistributeOnceElasticJobListener.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/fixture/TestDistributeOnceElasticJobListener.java index be05179ea2..6150d6531d 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/fixture/TestDistributeOnceElasticJobListener.java +++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/listener/fixture/TestDistributeOnceElasticJobListener.java @@ -17,14 +17,21 @@ package org.apache.shardingsphere.elasticjob.lite.api.listener.fixture; -import lombok.Setter; import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts; import org.apache.shardingsphere.elasticjob.lite.api.listener.AbstractDistributeOnceElasticJobListener; public final class TestDistributeOnceElasticJobListener extends AbstractDistributeOnceElasticJobListener { - @Setter - private ElasticJobListenerCaller caller; + private final ElasticJobListenerCaller caller; + + public TestDistributeOnceElasticJobListener() { + this(null); + } + + public TestDistributeOnceElasticJobListener(final ElasticJobListenerCaller caller) { + super(1L, 1L); + this.caller = caller; + } @Override public void doBeforeJobExecutedAtLastStarted(final ShardingContexts shardingContexts) { diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/listener/TestDistributeOnceElasticJobListener.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/listener/TestDistributeOnceElasticJobListener.java index c2c1ef7e64..5bcb7f8b9d 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/listener/TestDistributeOnceElasticJobListener.java +++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/integrate/listener/TestDistributeOnceElasticJobListener.java @@ -22,6 +22,10 @@ public class TestDistributeOnceElasticJobListener extends AbstractDistributeOnceElasticJobListener { + public TestDistributeOnceElasticJobListener() { + super(100L, 100L); + } + @Override public void doBeforeJobExecutedAtLastStarted(final ShardingContexts shardingContexts) { } diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManagerTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManagerTest.java index 279379a586..ca2be424c6 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManagerTest.java +++ b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/guarantee/GuaranteeListenerManagerTest.java @@ -64,36 +64,36 @@ public void assertStart() { @Test public void assertStartedNodeRemovedJobListenerWhenIsNotRemoved() { guaranteeListenerManager.new StartedNodeRemovedJobListener().dataChanged("/test_job/guarantee/started", Type.NODE_CHANGED, ""); - verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart("test_job"); + verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart(); } @Test public void assertStartedNodeRemovedJobListenerWhenIsNotStartedNode() { guaranteeListenerManager.new StartedNodeRemovedJobListener().dataChanged("/other_job/guarantee/started", Type.NODE_DELETED, ""); - verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart("test_job"); + verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart(); } @Test public void assertStartedNodeRemovedJobListenerWhenIsRemovedAndStartedNode() { guaranteeListenerManager.new StartedNodeRemovedJobListener().dataChanged("/test_job/guarantee/started", Type.NODE_DELETED, ""); - verify(distributeOnceElasticJobListener).notifyWaitingTaskStart("test_job"); + verify(distributeOnceElasticJobListener).notifyWaitingTaskStart(); } @Test public void assertCompletedNodeRemovedJobListenerWhenIsNotRemoved() { guaranteeListenerManager.new CompletedNodeRemovedJobListener().dataChanged("/test_job/guarantee/completed", Type.NODE_CHANGED, ""); - verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart("test_job"); + verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart(); } @Test public void assertCompletedNodeRemovedJobListenerWhenIsNotCompletedNode() { guaranteeListenerManager.new CompletedNodeRemovedJobListener().dataChanged("/other_job/guarantee/completed", Type.NODE_DELETED, ""); - verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart("test_job"); + verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart(); } @Test public void assertCompletedNodeRemovedJobListenerWhenIsRemovedAndCompletedNode() { guaranteeListenerManager.new CompletedNodeRemovedJobListener().dataChanged("/test_job/guarantee/completed", Type.NODE_DELETED, ""); - verify(distributeOnceElasticJobListener).notifyWaitingTaskComplete("test_job"); + verify(distributeOnceElasticJobListener).notifyWaitingTaskComplete(); } } diff --git a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/util/ParameterUtilsTest.java b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/util/ParameterUtilsTest.java deleted file mode 100644 index 63d737ef94..0000000000 --- a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/util/ParameterUtilsTest.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.elasticjob.lite.internal.util; - -import org.junit.Test; - -import java.util.Map; - -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -public final class ParameterUtilsTest { - - @Test - public void assertParseQueryString() { - String queryString = "key1=foo&key2&key3=bar"; - Map result = ParameterUtils.parseQuery(queryString); - assertThat(result.get("key1"), is("foo")); - assertThat(result.get("key2"), is("")); - assertThat(result.get("key3"), is("bar")); - } - - @Test - public void assertParseEmptyString() { - Map result = ParameterUtils.parseQuery(""); - assertTrue(result.isEmpty()); - } -} diff --git a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/listener/SimpleOnceListener.java b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/listener/SimpleOnceListener.java index 67e785911d..b7dc2130e6 100644 --- a/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/listener/SimpleOnceListener.java +++ b/elasticjob-lite/elasticjob-lite-spring/elasticjob-lite-spring-namespace/src/test/java/org/apache/shardingsphere/elasticjob/lite/spring/namespace/fixture/listener/SimpleOnceListener.java @@ -40,6 +40,7 @@ public SimpleOnceListener() { } public SimpleOnceListener(final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds) { + super(startedTimeoutMilliseconds, completedTimeoutMilliseconds); this.startedTimeoutMilliseconds = startedTimeoutMilliseconds; this.completedTimeoutMilliseconds = completedTimeoutMilliseconds; }