Skip to content

Commit

Permalink
remove notification retry task (apache#6401)
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenlj committed Nov 25, 2020
1 parent 3f67d2e commit c6e0d2a
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 139 deletions.
Expand Up @@ -51,10 +51,6 @@ public void removeFailedUnsubscribedTask(URL url, NotifyListener listener) {
failbackRegistry.removeFailedUnsubscribedTask(url.getOriginalURL(), new NotifyListener.ReverseCompatibleNotifyListener(listener));
}

public void removeFailedNotifiedTask(URL url, NotifyListener listener) {
failbackRegistry.removeFailedNotifiedTask(url.getOriginalURL(), new NotifyListener.ReverseCompatibleNotifyListener(listener));
}

@Override
public void register(URL url) {
failbackRegistry.register(url.getOriginalURL());
Expand Down

This file was deleted.

Expand Up @@ -21,7 +21,6 @@
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.retry.FailedNotifiedTask;
import org.apache.dubbo.registry.retry.FailedRegisteredTask;
import org.apache.dubbo.registry.retry.FailedSubscribedTask;
import org.apache.dubbo.registry.retry.FailedUnregisteredTask;
Expand Down Expand Up @@ -57,8 +56,6 @@ public abstract class FailbackRegistry extends AbstractRegistry {

private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();

private final ConcurrentMap<Holder, FailedNotifiedTask> failedNotified = new ConcurrentHashMap<Holder, FailedNotifiedTask>();

/**
* The time in milliseconds the retryExecutor will wait
*/
Expand Down Expand Up @@ -93,11 +90,6 @@ public void removeFailedUnsubscribedTask(URL url, NotifyListener listener) {
failedUnsubscribed.remove(h);
}

public void removeFailedNotifiedTask(URL url, NotifyListener listener) {
Holder h = new Holder(url, listener);
failedNotified.remove(h);
}

private void addFailedRegistered(URL url) {
FailedRegisteredTask oldOne = failedRegistered.get(url);
if (oldOne != null) {
Expand Down Expand Up @@ -159,7 +151,6 @@ private void removeFailedSubscribed(URL url, NotifyListener listener) {
f.cancel();
}
removeFailedUnsubscribed(url, listener);
removeFailedNotified(url, listener);
}

private void addFailedUnsubscribed(URL url, NotifyListener listener) {
Expand All @@ -184,28 +175,6 @@ private void removeFailedUnsubscribed(URL url, NotifyListener listener) {
}
}

private void addFailedNotified(URL url, NotifyListener listener, List<URL> urls) {
Holder h = new Holder(url, listener);
FailedNotifiedTask newTask = new FailedNotifiedTask(url, listener);
FailedNotifiedTask f = failedNotified.putIfAbsent(h, newTask);
if (f == null) {
// never has a retry task. then start a new task for retry.
newTask.addUrlToRetry(urls);
retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
} else {
// just add urls which needs retry.
newTask.addUrlToRetry(urls);
}
}

private void removeFailedNotified(URL url, NotifyListener listener) {
Holder h = new Holder(url, listener);
FailedNotifiedTask f = failedNotified.remove(h);
if (f != null) {
f.cancel();
}
}

ConcurrentMap<URL, FailedRegisteredTask> getFailedRegistered() {
return failedRegistered;
}
Expand All @@ -222,9 +191,6 @@ ConcurrentMap<Holder, FailedUnsubscribedTask> getFailedUnsubscribed() {
return failedUnsubscribed;
}

ConcurrentMap<Holder, FailedNotifiedTask> getFailedNotified() {
return failedNotified;
}

@Override
public void register(URL url) {
Expand Down Expand Up @@ -397,9 +363,8 @@ protected void notify(URL url, NotifyListener listener, List<URL> urls) {
try {
doNotify(url, listener, urls);
} catch (Exception t) {
// Record a failed registration request to a failed list, retry regularly
addFailedNotified(url, listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
// Record a failed registration request to a failed list
logger.error("Failed to notify addresses for subscribe " + url + ", cause: " + t.getMessage(), t);
}
}

Expand Down
Expand Up @@ -27,7 +27,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL;
Expand Down Expand Up @@ -153,36 +152,6 @@ public void notify(List<URL> urls) {
assertEquals(true, notified.get());
}

@Test
public void testDoRetry_nofify() throws Exception {

//Initial value 0
final AtomicInteger count = new AtomicInteger(0);

NotifyListener listner = new NotifyListener() {
@Override
public void notify(List<URL> urls) {
count.incrementAndGet();
//The exception is thrown for the first time to see if the back will be called again to incrementAndGet
if (count.get() == 1L) {
throw new RuntimeException("test exception please ignore");
}
}
};
registry = new MockRegistry(registryUrl, new CountDownLatch(0));
registry.subscribe(serviceUrl.setProtocol(CONSUMER_PROTOCOL).addParameters(CollectionUtils.toStringMap("check", "false")), listner);

assertEquals(1, count.get()); //Make sure that the subscribe call has just been called once count.incrementAndGet after the call is completed
//Wait for the timer.
for (int i = 0; i < trytimes; i++) {
System.out.println("failback notify retry ,times:" + i);
if (count.get() == 2)
break;
Thread.sleep(sleeptime);
}
assertEquals(2, count.get());
}

@Test
public void testRecover() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(4);
Expand Down

0 comments on commit c6e0d2a

Please sign in to comment.