Skip to content

Commit

Permalink
[Component-DSL] Concurrency fixes
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/aries/trunk@1814205 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Carlos Sierra Andrés committed Nov 3, 2017
1 parent bb8baea commit f1f6ca6
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.Dictionary;
import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand All @@ -36,29 +37,36 @@ public ConfigurationOSGiImpl(String pid) {
new AtomicReference<>(null);

AtomicReference<Runnable>
tupleAtomicReference = new AtomicReference<>(() -> {});
terminatorAtomicReference = new AtomicReference<>(() -> {});

AtomicReference<ServiceRegistration<ManagedService>>
serviceRegistrationReferece = new AtomicReference<>(null);

AtomicBoolean closed = new AtomicBoolean();

Runnable start = () ->
serviceRegistrationReferece.set(
bundleContext.registerService(
ManagedService.class,
properties -> {
atomicReference.set(properties);

Runnable old = tupleAtomicReference.get();

if (old != null) {
old.run();
}
signalLeave(terminatorAtomicReference);

if (properties != null) {
tupleAtomicReference.set(op.apply(properties));
}
else {
tupleAtomicReference.set(null);
terminatorAtomicReference.set(
op.apply(properties));

if (closed.get()) {
/*
if we have closed while executing the
effects we have to execute the terminator
directly instead of storing it
*/
signalLeave(terminatorAtomicReference);

return;
}
}
},
new Hashtable<String, Object>() {{
Expand All @@ -68,16 +76,23 @@ public ConfigurationOSGiImpl(String pid) {
return new OSGiResultImpl(
start,
() -> {
serviceRegistrationReferece.get().unregister();
closed.set(true);

Runnable runnable =
tupleAtomicReference.get();
serviceRegistrationReferece.get().unregister();

if (runnable != null) {
runnable.run();
}
signalLeave(terminatorAtomicReference);
});
});
}

private static void signalLeave(
AtomicReference<Runnable> terminatorAtomicReference) {

Runnable old = terminatorAtomicReference.getAndSet(null);

if (old != null) {
old.run();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;

/**
Expand All @@ -42,11 +42,14 @@ public ConfigurationsOSGiImpl(String factoryPid) {
AtomicReference<ServiceRegistration<ManagedServiceFactory>>
serviceRegistrationReference = new AtomicReference<>(null);

AtomicBoolean closed = new AtomicBoolean();

Runnable start = () ->
serviceRegistrationReference.set(
bundleContext.registerService(
ManagedServiceFactory.class,
new ConfigurationsManagedServiceFactory(results, op),
new ConfigurationsManagedServiceFactory(
results, op, closed),
new Hashtable<String, Object>() {{
put("service.pid", factoryPid);
}}));
Expand All @@ -55,13 +58,15 @@ public ConfigurationsOSGiImpl(String factoryPid) {
return new OSGiResultImpl(
start,
() -> {
closed.set(true);

serviceRegistrationReference.get().unregister();

for (Runnable runnable :
results.values()) {
results.replaceAll((key, terminator) -> {
terminator.run();

runnable.run();
}
return null;
});
});
});
}
Expand All @@ -71,14 +76,17 @@ private static class ConfigurationsManagedServiceFactory

private final Map<String, Runnable> _results;

private final Function<Dictionary<String, ?>, Runnable> _addedSource;
private final Function<Dictionary<String, ?>, Runnable> _op;
private AtomicBoolean _closed;

public ConfigurationsManagedServiceFactory(
Map<String, Runnable> results,
Function<Dictionary<String, ?>, Runnable> addedSource) {
Function<Dictionary<String, ?>, Runnable> op,
AtomicBoolean closed) {

_results = results;
_addedSource = addedSource;
_op = op;
_closed = closed;
}

@Override
Expand All @@ -97,13 +105,26 @@ public String getName() {
public void updated(String s, Dictionary<String, ?> dictionary)
throws ConfigurationException {

Runnable terminator = _addedSource.apply(dictionary);
Runnable terminator = _op.apply(dictionary);

Runnable old = _results.put(s, terminator);

if (old != null) {
old.run();
}

if (_closed.get()) {
/* if we have been closed while executing the effects we have
to check if this terminator has been left unexecuted.
*/
_results.computeIfPresent(
s,
(key, runnable) -> {
runnable.run();

return null;
});
}
}

}
Expand Down

0 comments on commit f1f6ca6

Please sign in to comment.