Skip to content

Commit

Permalink
Distinguish guarantee service by job name in singleton AbstractDistri…
Browse files Browse the repository at this point in the history
…buteOnceElasticJobListener (#1508)

* Distinguish guarantee service by job name in singleton AbstractDistributeOnceElasticJobListener

* Fix checkstyle
  • Loading branch information
TeslaCN committed Sep 29, 2020
1 parent 9c003c6 commit f38acf2
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 51 deletions.
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.elasticjob.lite.api.listener;

import lombok.Setter;
import lombok.Getter;
import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils;
import org.apache.shardingsphere.elasticjob.infra.env.TimeService;
import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException;
Expand All @@ -26,32 +26,34 @@
import org.apache.shardingsphere.elasticjob.lite.internal.guarantee.GuaranteeService;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Distributed once elasticjob listener.
*/
public abstract class AbstractDistributeOnceElasticJobListener implements ElasticJobListener {

private final long startedTimeoutMilliseconds;

private final Object startedWait = new Object();

private final long completedTimeoutMilliseconds;

private final Object completedWait = new Object();

@Setter
private GuaranteeService guaranteeService;
private final ConcurrentMap<String, DistributeOnceListenerContext> listenerContexts = new ConcurrentHashMap<>();

private final TimeService timeService = new TimeService();

public AbstractDistributeOnceElasticJobListener(final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds) {
this.startedTimeoutMilliseconds = startedTimeoutMilliseconds <= 0L ? Long.MAX_VALUE : startedTimeoutMilliseconds;
this.completedTimeoutMilliseconds = completedTimeoutMilliseconds <= 0L ? Long.MAX_VALUE : completedTimeoutMilliseconds;
/**
* Add guarantee service for specific job.
*
* @param guaranteeService guarantee service
* @param jobName job name
* @param startedTimeoutMilliseconds started timeout milliseconds
* @param completedTimeoutMilliseconds completed timeout milliseconds
*/
public void addGuaranteeService(final GuaranteeService guaranteeService, final String jobName, final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds) {
listenerContexts.computeIfAbsent(jobName, unused -> new DistributeOnceListenerContext(startedTimeoutMilliseconds, completedTimeoutMilliseconds, guaranteeService));
}

@Override
public final void beforeJobExecuted(final ShardingContexts shardingContexts) {
DistributeOnceListenerContext context = listenerContexts.get(shardingContexts.getJobName());
GuaranteeService guaranteeService = context.getGuaranteeService();
Set<Integer> shardingItems = shardingContexts.getShardingItemParameters().keySet();
guaranteeService.registerStart(shardingItems);
while (!guaranteeService.isRegisterStartSuccess(shardingItems)) {
Expand All @@ -63,7 +65,9 @@ public final void beforeJobExecuted(final ShardingContexts shardingContexts) {
return;
}
long before = timeService.getCurrentMillis();
long startedTimeoutMilliseconds = context.getStartedTimeoutMilliseconds();
try {
Object startedWait = context.getStartedWait();
synchronized (startedWait) {
startedWait.wait(startedTimeoutMilliseconds);
}
Expand All @@ -78,6 +82,8 @@ public final void beforeJobExecuted(final ShardingContexts shardingContexts) {

@Override
public final void afterJobExecuted(final ShardingContexts shardingContexts) {
DistributeOnceListenerContext context = listenerContexts.get(shardingContexts.getJobName());
GuaranteeService guaranteeService = context.getGuaranteeService();
Set<Integer> shardingItems = shardingContexts.getShardingItemParameters().keySet();
guaranteeService.registerComplete(shardingItems);
while (!guaranteeService.isRegisterCompleteSuccess(shardingItems)) {
Expand All @@ -89,7 +95,9 @@ public final void afterJobExecuted(final ShardingContexts shardingContexts) {
return;
}
long before = timeService.getCurrentMillis();
long completedTimeoutMilliseconds = context.getCompletedTimeoutMilliseconds();
try {
Object completedWait = context.getCompletedWait();
synchronized (completedWait) {
completedWait.wait(completedTimeoutMilliseconds);
}
Expand Down Expand Up @@ -122,19 +130,47 @@ private void handleTimeout(final long timeoutMilliseconds) {

/**
* Notify waiting task start.
*
* @param jobName job name
*/
public void notifyWaitingTaskStart() {
public void notifyWaitingTaskStart(final String jobName) {
DistributeOnceListenerContext context = listenerContexts.get(jobName);
Object startedWait = context.getStartedWait();
synchronized (startedWait) {
startedWait.notifyAll();
}
}

/**
* Notify waiting task complete.
*
* @param jobName job name
*/
public void notifyWaitingTaskComplete() {
public void notifyWaitingTaskComplete(final String jobName) {
DistributeOnceListenerContext context = listenerContexts.get(jobName);
Object completedWait = context.getCompletedWait();
synchronized (completedWait) {
completedWait.notifyAll();
}
}

@Getter
private static class DistributeOnceListenerContext {

private final long startedTimeoutMilliseconds;

private final Object startedWait = new Object();

private final long completedTimeoutMilliseconds;

private final Object completedWait = new Object();

private final GuaranteeService guaranteeService;

DistributeOnceListenerContext(final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds, final GuaranteeService guaranteeService) {
this.startedTimeoutMilliseconds = startedTimeoutMilliseconds <= 0L ? Long.MAX_VALUE : startedTimeoutMilliseconds;
this.completedTimeoutMilliseconds = completedTimeoutMilliseconds <= 0L ? Long.MAX_VALUE : completedTimeoutMilliseconds;
this.guaranteeService = guaranteeService;
}
}
}
Expand Up @@ -34,9 +34,12 @@ public final class GuaranteeListenerManager extends AbstractListenerManager {

private final Collection<ElasticJobListener> elasticJobListeners;

private final String jobName;

public GuaranteeListenerManager(final CoordinatorRegistryCenter regCenter, final String jobName, final Collection<ElasticJobListener> elasticJobListeners) {
super(regCenter, jobName);
this.guaranteeNode = new GuaranteeNode(jobName);
this.jobName = jobName;
this.elasticJobListeners = elasticJobListeners;
}

Expand All @@ -53,7 +56,7 @@ protected void dataChanged(final String path, final Type eventType, final String
if (Type.NODE_DELETED == eventType && guaranteeNode.isStartedRootNode(path)) {
for (ElasticJobListener each : elasticJobListeners) {
if (each instanceof AbstractDistributeOnceElasticJobListener) {
((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart();
((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart(jobName);
}
}
}
Expand All @@ -67,7 +70,7 @@ protected void dataChanged(final String path, final Type eventType, final String
if (Type.NODE_DELETED == eventType && guaranteeNode.isCompletedRootNode(path)) {
for (ElasticJobListener each : elasticJobListeners) {
if (each instanceof AbstractDistributeOnceElasticJobListener) {
((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskComplete();
((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskComplete(jobName);
}
}
}
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.shardingsphere.elasticjob.lite.internal.guarantee.GuaranteeService;
import org.apache.shardingsphere.elasticjob.lite.internal.setup.JobClassNameProviderFactory;
import org.apache.shardingsphere.elasticjob.lite.internal.setup.SetUpFacade;
import org.apache.shardingsphere.elasticjob.lite.internal.util.ParameterUtils;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration;
import org.quartz.JobBuilder;
Expand All @@ -39,6 +40,8 @@
import org.quartz.simpl.SimpleThreadPool;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -76,14 +79,13 @@ public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob
this.regCenter = regCenter;
elasticJobType = null;
final Collection<ElasticJobListener> elasticJobListeners = jobConfig.getJobListenerTypes().stream()
.map(ElasticJobListenerFactory::getListener).collect(Collectors.toList());
.map(elasticJobTypeWithParameter -> lookupElasticJobListener(jobConfig.getJobName(), elasticJobTypeWithParameter)).collect(Collectors.toList());
setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), elasticJobListeners);
schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), elasticJobListeners, tracingConfig);
jobExecutor = null == elasticJob ? new ElasticJobExecutor(elasticJobType, jobConfig, jobFacade) : new ElasticJobExecutor(elasticJob, jobConfig, jobFacade);
String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob);
this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName, jobConfig);
setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListeners);
jobScheduleController = createJobScheduleController();
}

Expand All @@ -95,23 +97,32 @@ public JobScheduler(final CoordinatorRegistryCenter regCenter, final String elas
this.regCenter = regCenter;
this.elasticJobType = elasticJobType;
final Collection<ElasticJobListener> elasticJobListeners = jobConfig.getJobListenerTypes().stream()
.map(ElasticJobListenerFactory::getListener).collect(Collectors.toList());
.map(elasticJobTypeWithParameter -> lookupElasticJobListener(jobConfig.getJobName(), elasticJobTypeWithParameter)).collect(Collectors.toList());
setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), elasticJobListeners);
schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), elasticJobListeners, tracingConfig);
jobExecutor = new ElasticJobExecutor(elasticJobType, jobConfig, jobFacade);
this.jobConfig = setUpFacade.setUpJobConfiguration(elasticJobType, jobConfig);
setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListeners);
jobScheduleController = createJobScheduleController();
}

private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistryCenter regCenter, final Collection<ElasticJobListener> elasticJobListeners) {
GuaranteeService guaranteeService = new GuaranteeService(regCenter, jobConfig.getJobName());
for (ElasticJobListener each : elasticJobListeners) {
if (each instanceof AbstractDistributeOnceElasticJobListener) {
((AbstractDistributeOnceElasticJobListener) each).setGuaranteeService(guaranteeService);
}
private ElasticJobListener lookupElasticJobListener(final String jobName, final String jobListenerTypeWithParameter) {
String[] split = jobListenerTypeWithParameter.split("\\?");
String jobListenerType = split[0];
ElasticJobListener listener = ElasticJobListenerFactory.getListener(jobListenerType);
if (!(listener instanceof AbstractDistributeOnceElasticJobListener)) {
return listener;
}
Map<String, String> parameters = 1 < split.length ? ParameterUtils.parseQuery(split[1]) : Collections.emptyMap();
return configureGuaranteeService(jobName, parameters, listener);
}

private ElasticJobListener configureGuaranteeService(final String jobName, final Map<String, String> parameters, final ElasticJobListener listener) {
GuaranteeService guaranteeService = new GuaranteeService(regCenter, jobName);
long startedTimeoutMilliseconds = Long.parseLong(parameters.getOrDefault("startedTimeoutMilliseconds", "0"));
long completedTimeoutMilliseconds = Long.parseLong(parameters.getOrDefault("completedTimeoutMilliseconds", "0"));
((AbstractDistributeOnceElasticJobListener) listener).addGuaranteeService(guaranteeService, jobName, startedTimeoutMilliseconds, completedTimeoutMilliseconds);
return listener;
}

private JobScheduleController createJobScheduleController() {
Expand Down
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.lite.internal.util;

import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Parameter utils.
*/
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public final class ParameterUtils {

/**
* Parse string like <code>key1=value1&amp;key2=value2</code> to {@link Map}.
*
* @param query parameter string
* @return map
*/
public static Map<String, String> parseQuery(final String query) {
if (Strings.isNullOrEmpty(query)) {
return Collections.emptyMap();
}
return Arrays.stream(query.split("&")).map(String::trim)
.filter(pair -> !pair.isEmpty())
.map(parameter -> parameter.split("="))
.collect(Collectors.toMap(pair -> pair[0], pair -> 1 < pair.length ? pair[1] : ""));
}
}
Expand Up @@ -57,8 +57,9 @@ public final class DistributeOnceElasticJobListenerTest {

@Before
public void setUp() {
distributeOnceElasticJobListener = new TestDistributeOnceElasticJobListener(elasticJobListenerCaller);
distributeOnceElasticJobListener.setGuaranteeService(guaranteeService);
distributeOnceElasticJobListener = new TestDistributeOnceElasticJobListener();
distributeOnceElasticJobListener.setCaller(elasticJobListenerCaller);
distributeOnceElasticJobListener.addGuaranteeService(guaranteeService, "test_job", 1L, 1L);
ReflectionUtils.setSuperclassFieldValue(distributeOnceElasticJobListener, "timeService", timeService);
Map<Integer, String> map = new HashMap<>(2, 1);
map.put(0, "");
Expand Down
Expand Up @@ -17,21 +17,14 @@

package org.apache.shardingsphere.elasticjob.lite.api.listener.fixture;

import lombok.Setter;
import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
import org.apache.shardingsphere.elasticjob.lite.api.listener.AbstractDistributeOnceElasticJobListener;

public final class TestDistributeOnceElasticJobListener extends AbstractDistributeOnceElasticJobListener {

private final ElasticJobListenerCaller caller;

public TestDistributeOnceElasticJobListener() {
this(null);
}

public TestDistributeOnceElasticJobListener(final ElasticJobListenerCaller caller) {
super(1L, 1L);
this.caller = caller;
}
@Setter
private ElasticJobListenerCaller caller;

@Override
public void doBeforeJobExecutedAtLastStarted(final ShardingContexts shardingContexts) {
Expand Down
Expand Up @@ -22,10 +22,6 @@

public class TestDistributeOnceElasticJobListener extends AbstractDistributeOnceElasticJobListener {

public TestDistributeOnceElasticJobListener() {
super(100L, 100L);
}

@Override
public void doBeforeJobExecutedAtLastStarted(final ShardingContexts shardingContexts) {
}
Expand Down
Expand Up @@ -64,36 +64,36 @@ public void assertStart() {
@Test
public void assertStartedNodeRemovedJobListenerWhenIsNotRemoved() {
guaranteeListenerManager.new StartedNodeRemovedJobListener().dataChanged("/test_job/guarantee/started", Type.NODE_CHANGED, "");
verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart();
verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart("test_job");
}

@Test
public void assertStartedNodeRemovedJobListenerWhenIsNotStartedNode() {
guaranteeListenerManager.new StartedNodeRemovedJobListener().dataChanged("/other_job/guarantee/started", Type.NODE_DELETED, "");
verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart();
verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart("test_job");
}

@Test
public void assertStartedNodeRemovedJobListenerWhenIsRemovedAndStartedNode() {
guaranteeListenerManager.new StartedNodeRemovedJobListener().dataChanged("/test_job/guarantee/started", Type.NODE_DELETED, "");
verify(distributeOnceElasticJobListener).notifyWaitingTaskStart();
verify(distributeOnceElasticJobListener).notifyWaitingTaskStart("test_job");
}

@Test
public void assertCompletedNodeRemovedJobListenerWhenIsNotRemoved() {
guaranteeListenerManager.new CompletedNodeRemovedJobListener().dataChanged("/test_job/guarantee/completed", Type.NODE_CHANGED, "");
verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart();
verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart("test_job");
}

@Test
public void assertCompletedNodeRemovedJobListenerWhenIsNotCompletedNode() {
guaranteeListenerManager.new CompletedNodeRemovedJobListener().dataChanged("/other_job/guarantee/completed", Type.NODE_DELETED, "");
verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart();
verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart("test_job");
}

@Test
public void assertCompletedNodeRemovedJobListenerWhenIsRemovedAndCompletedNode() {
guaranteeListenerManager.new CompletedNodeRemovedJobListener().dataChanged("/test_job/guarantee/completed", Type.NODE_DELETED, "");
verify(distributeOnceElasticJobListener).notifyWaitingTaskComplete();
verify(distributeOnceElasticJobListener).notifyWaitingTaskComplete("test_job");
}
}

0 comments on commit f38acf2

Please sign in to comment.