Skip to content

Commit

Permalink
Revert "Distinguish guarantee service by job name in singleton Abstra…
Browse files Browse the repository at this point in the history
…ctDistributeOnceElasticJobListener (#1508)" (#1533)

This reverts commit f38acf2
  • Loading branch information
TeslaCN committed Oct 1, 2020
1 parent fc4880b commit e7af3f7
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 184 deletions.
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.elasticjob.lite.api.listener;

import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils;
import org.apache.shardingsphere.elasticjob.infra.env.TimeService;
import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException;
Expand All @@ -26,34 +26,32 @@
import org.apache.shardingsphere.elasticjob.lite.internal.guarantee.GuaranteeService;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Distributed once elasticjob listener.
*/
public abstract class AbstractDistributeOnceElasticJobListener implements ElasticJobListener {

private final ConcurrentMap<String, DistributeOnceListenerContext> listenerContexts = new ConcurrentHashMap<>();
private final long startedTimeoutMilliseconds;

private final Object startedWait = new Object();

private final long completedTimeoutMilliseconds;

private final Object completedWait = new Object();

@Setter
private GuaranteeService guaranteeService;

private final TimeService timeService = new TimeService();

/**
* Add guarantee service for specific job.
*
* @param guaranteeService guarantee service
* @param jobName job name
* @param startedTimeoutMilliseconds started timeout milliseconds
* @param completedTimeoutMilliseconds completed timeout milliseconds
*/
public void addGuaranteeService(final GuaranteeService guaranteeService, final String jobName, final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds) {
listenerContexts.computeIfAbsent(jobName, unused -> new DistributeOnceListenerContext(startedTimeoutMilliseconds, completedTimeoutMilliseconds, guaranteeService));
public AbstractDistributeOnceElasticJobListener(final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds) {
this.startedTimeoutMilliseconds = startedTimeoutMilliseconds <= 0L ? Long.MAX_VALUE : startedTimeoutMilliseconds;
this.completedTimeoutMilliseconds = completedTimeoutMilliseconds <= 0L ? Long.MAX_VALUE : completedTimeoutMilliseconds;
}

@Override
public final void beforeJobExecuted(final ShardingContexts shardingContexts) {
DistributeOnceListenerContext context = listenerContexts.get(shardingContexts.getJobName());
GuaranteeService guaranteeService = context.getGuaranteeService();
Set<Integer> shardingItems = shardingContexts.getShardingItemParameters().keySet();
guaranteeService.registerStart(shardingItems);
while (!guaranteeService.isRegisterStartSuccess(shardingItems)) {
Expand All @@ -65,9 +63,7 @@ public final void beforeJobExecuted(final ShardingContexts shardingContexts) {
return;
}
long before = timeService.getCurrentMillis();
long startedTimeoutMilliseconds = context.getStartedTimeoutMilliseconds();
try {
Object startedWait = context.getStartedWait();
synchronized (startedWait) {
startedWait.wait(startedTimeoutMilliseconds);
}
Expand All @@ -82,8 +78,6 @@ public final void beforeJobExecuted(final ShardingContexts shardingContexts) {

@Override
public final void afterJobExecuted(final ShardingContexts shardingContexts) {
DistributeOnceListenerContext context = listenerContexts.get(shardingContexts.getJobName());
GuaranteeService guaranteeService = context.getGuaranteeService();
Set<Integer> shardingItems = shardingContexts.getShardingItemParameters().keySet();
guaranteeService.registerComplete(shardingItems);
while (!guaranteeService.isRegisterCompleteSuccess(shardingItems)) {
Expand All @@ -95,9 +89,7 @@ public final void afterJobExecuted(final ShardingContexts shardingContexts) {
return;
}
long before = timeService.getCurrentMillis();
long completedTimeoutMilliseconds = context.getCompletedTimeoutMilliseconds();
try {
Object completedWait = context.getCompletedWait();
synchronized (completedWait) {
completedWait.wait(completedTimeoutMilliseconds);
}
Expand Down Expand Up @@ -130,47 +122,19 @@ private void handleTimeout(final long timeoutMilliseconds) {

/**
* Notify waiting task start.
*
* @param jobName job name
*/
public void notifyWaitingTaskStart(final String jobName) {
DistributeOnceListenerContext context = listenerContexts.get(jobName);
Object startedWait = context.getStartedWait();
public void notifyWaitingTaskStart() {
synchronized (startedWait) {
startedWait.notifyAll();
}
}

/**
* Notify waiting task complete.
*
* @param jobName job name
*/
public void notifyWaitingTaskComplete(final String jobName) {
DistributeOnceListenerContext context = listenerContexts.get(jobName);
Object completedWait = context.getCompletedWait();
public void notifyWaitingTaskComplete() {
synchronized (completedWait) {
completedWait.notifyAll();
}
}

@Getter
private static class DistributeOnceListenerContext {

private final long startedTimeoutMilliseconds;

private final Object startedWait = new Object();

private final long completedTimeoutMilliseconds;

private final Object completedWait = new Object();

private final GuaranteeService guaranteeService;

DistributeOnceListenerContext(final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds, final GuaranteeService guaranteeService) {
this.startedTimeoutMilliseconds = startedTimeoutMilliseconds <= 0L ? Long.MAX_VALUE : startedTimeoutMilliseconds;
this.completedTimeoutMilliseconds = completedTimeoutMilliseconds <= 0L ? Long.MAX_VALUE : completedTimeoutMilliseconds;
this.guaranteeService = guaranteeService;
}
}
}
Expand Up @@ -34,12 +34,9 @@ public final class GuaranteeListenerManager extends AbstractListenerManager {

private final Collection<ElasticJobListener> elasticJobListeners;

private final String jobName;

public GuaranteeListenerManager(final CoordinatorRegistryCenter regCenter, final String jobName, final Collection<ElasticJobListener> elasticJobListeners) {
super(regCenter, jobName);
this.guaranteeNode = new GuaranteeNode(jobName);
this.jobName = jobName;
this.elasticJobListeners = elasticJobListeners;
}

Expand All @@ -56,7 +53,7 @@ protected void dataChanged(final String path, final Type eventType, final String
if (Type.NODE_DELETED == eventType && guaranteeNode.isStartedRootNode(path)) {
for (ElasticJobListener each : elasticJobListeners) {
if (each instanceof AbstractDistributeOnceElasticJobListener) {
((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart(jobName);
((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart();
}
}
}
Expand All @@ -70,7 +67,7 @@ protected void dataChanged(final String path, final Type eventType, final String
if (Type.NODE_DELETED == eventType && guaranteeNode.isCompletedRootNode(path)) {
for (ElasticJobListener each : elasticJobListeners) {
if (each instanceof AbstractDistributeOnceElasticJobListener) {
((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskComplete(jobName);
((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskComplete();
}
}
}
Expand Down
Expand Up @@ -29,7 +29,6 @@
import org.apache.shardingsphere.elasticjob.lite.internal.guarantee.GuaranteeService;
import org.apache.shardingsphere.elasticjob.lite.internal.setup.JobClassNameProviderFactory;
import org.apache.shardingsphere.elasticjob.lite.internal.setup.SetUpFacade;
import org.apache.shardingsphere.elasticjob.lite.internal.util.ParameterUtils;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration;
import org.quartz.JobBuilder;
Expand All @@ -40,8 +39,6 @@
import org.quartz.simpl.SimpleThreadPool;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -79,13 +76,14 @@ public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob
this.regCenter = regCenter;
elasticJobType = null;
final Collection<ElasticJobListener> elasticJobListeners = jobConfig.getJobListenerTypes().stream()
.map(elasticJobTypeWithParameter -> lookupElasticJobListener(jobConfig.getJobName(), elasticJobTypeWithParameter)).collect(Collectors.toList());
.map(ElasticJobListenerFactory::getListener).collect(Collectors.toList());
setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), elasticJobListeners);
schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), elasticJobListeners, tracingConfig);
jobExecutor = null == elasticJob ? new ElasticJobExecutor(elasticJobType, jobConfig, jobFacade) : new ElasticJobExecutor(elasticJob, jobConfig, jobFacade);
String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob);
this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName, jobConfig);
setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListeners);
jobScheduleController = createJobScheduleController();
}

Expand All @@ -97,32 +95,23 @@ public JobScheduler(final CoordinatorRegistryCenter regCenter, final String elas
this.regCenter = regCenter;
this.elasticJobType = elasticJobType;
final Collection<ElasticJobListener> elasticJobListeners = jobConfig.getJobListenerTypes().stream()
.map(elasticJobTypeWithParameter -> lookupElasticJobListener(jobConfig.getJobName(), elasticJobTypeWithParameter)).collect(Collectors.toList());
.map(ElasticJobListenerFactory::getListener).collect(Collectors.toList());
setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), elasticJobListeners);
schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), elasticJobListeners, tracingConfig);
jobExecutor = new ElasticJobExecutor(elasticJobType, jobConfig, jobFacade);
this.jobConfig = setUpFacade.setUpJobConfiguration(elasticJobType, jobConfig);
setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListeners);
jobScheduleController = createJobScheduleController();
}

private ElasticJobListener lookupElasticJobListener(final String jobName, final String jobListenerTypeWithParameter) {
String[] split = jobListenerTypeWithParameter.split("\\?");
String jobListenerType = split[0];
ElasticJobListener listener = ElasticJobListenerFactory.getListener(jobListenerType);
if (!(listener instanceof AbstractDistributeOnceElasticJobListener)) {
return listener;
private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistryCenter regCenter, final Collection<ElasticJobListener> elasticJobListeners) {
GuaranteeService guaranteeService = new GuaranteeService(regCenter, jobConfig.getJobName());
for (ElasticJobListener each : elasticJobListeners) {
if (each instanceof AbstractDistributeOnceElasticJobListener) {
((AbstractDistributeOnceElasticJobListener) each).setGuaranteeService(guaranteeService);
}
}
Map<String, String> parameters = 1 < split.length ? ParameterUtils.parseQuery(split[1]) : Collections.emptyMap();
return configureGuaranteeService(jobName, parameters, listener);
}

private ElasticJobListener configureGuaranteeService(final String jobName, final Map<String, String> parameters, final ElasticJobListener listener) {
GuaranteeService guaranteeService = new GuaranteeService(regCenter, jobName);
long startedTimeoutMilliseconds = Long.parseLong(parameters.getOrDefault("startedTimeoutMilliseconds", "0"));
long completedTimeoutMilliseconds = Long.parseLong(parameters.getOrDefault("completedTimeoutMilliseconds", "0"));
((AbstractDistributeOnceElasticJobListener) listener).addGuaranteeService(guaranteeService, jobName, startedTimeoutMilliseconds, completedTimeoutMilliseconds);
return listener;
}

private JobScheduleController createJobScheduleController() {
Expand Down

This file was deleted.

Expand Up @@ -57,9 +57,8 @@ public final class DistributeOnceElasticJobListenerTest {

@Before
public void setUp() {
distributeOnceElasticJobListener = new TestDistributeOnceElasticJobListener();
distributeOnceElasticJobListener.setCaller(elasticJobListenerCaller);
distributeOnceElasticJobListener.addGuaranteeService(guaranteeService, "test_job", 1L, 1L);
distributeOnceElasticJobListener = new TestDistributeOnceElasticJobListener(elasticJobListenerCaller);
distributeOnceElasticJobListener.setGuaranteeService(guaranteeService);
ReflectionUtils.setSuperclassFieldValue(distributeOnceElasticJobListener, "timeService", timeService);
Map<Integer, String> map = new HashMap<>(2, 1);
map.put(0, "");
Expand Down
Expand Up @@ -17,14 +17,21 @@

package org.apache.shardingsphere.elasticjob.lite.api.listener.fixture;

import lombok.Setter;
import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
import org.apache.shardingsphere.elasticjob.lite.api.listener.AbstractDistributeOnceElasticJobListener;

public final class TestDistributeOnceElasticJobListener extends AbstractDistributeOnceElasticJobListener {

@Setter
private ElasticJobListenerCaller caller;
private final ElasticJobListenerCaller caller;

public TestDistributeOnceElasticJobListener() {
this(null);
}

public TestDistributeOnceElasticJobListener(final ElasticJobListenerCaller caller) {
super(1L, 1L);
this.caller = caller;
}

@Override
public void doBeforeJobExecutedAtLastStarted(final ShardingContexts shardingContexts) {
Expand Down
Expand Up @@ -22,6 +22,10 @@

public class TestDistributeOnceElasticJobListener extends AbstractDistributeOnceElasticJobListener {

public TestDistributeOnceElasticJobListener() {
super(100L, 100L);
}

@Override
public void doBeforeJobExecutedAtLastStarted(final ShardingContexts shardingContexts) {
}
Expand Down
Expand Up @@ -64,36 +64,36 @@ public void assertStart() {
@Test
public void assertStartedNodeRemovedJobListenerWhenIsNotRemoved() {
guaranteeListenerManager.new StartedNodeRemovedJobListener().dataChanged("/test_job/guarantee/started", Type.NODE_CHANGED, "");
verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart("test_job");
verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart();
}

@Test
public void assertStartedNodeRemovedJobListenerWhenIsNotStartedNode() {
guaranteeListenerManager.new StartedNodeRemovedJobListener().dataChanged("/other_job/guarantee/started", Type.NODE_DELETED, "");
verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart("test_job");
verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart();
}

@Test
public void assertStartedNodeRemovedJobListenerWhenIsRemovedAndStartedNode() {
guaranteeListenerManager.new StartedNodeRemovedJobListener().dataChanged("/test_job/guarantee/started", Type.NODE_DELETED, "");
verify(distributeOnceElasticJobListener).notifyWaitingTaskStart("test_job");
verify(distributeOnceElasticJobListener).notifyWaitingTaskStart();
}

@Test
public void assertCompletedNodeRemovedJobListenerWhenIsNotRemoved() {
guaranteeListenerManager.new CompletedNodeRemovedJobListener().dataChanged("/test_job/guarantee/completed", Type.NODE_CHANGED, "");
verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart("test_job");
verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart();
}

@Test
public void assertCompletedNodeRemovedJobListenerWhenIsNotCompletedNode() {
guaranteeListenerManager.new CompletedNodeRemovedJobListener().dataChanged("/other_job/guarantee/completed", Type.NODE_DELETED, "");
verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart("test_job");
verify(distributeOnceElasticJobListener, times(0)).notifyWaitingTaskStart();
}

@Test
public void assertCompletedNodeRemovedJobListenerWhenIsRemovedAndCompletedNode() {
guaranteeListenerManager.new CompletedNodeRemovedJobListener().dataChanged("/test_job/guarantee/completed", Type.NODE_DELETED, "");
verify(distributeOnceElasticJobListener).notifyWaitingTaskComplete("test_job");
verify(distributeOnceElasticJobListener).notifyWaitingTaskComplete();
}
}

0 comments on commit e7af3f7

Please sign in to comment.