Skip to content

Commit

Permalink
Fixed #317
Browse files Browse the repository at this point in the history
  • Loading branch information
climategadgets committed Apr 17, 2024
1 parent f0b3c5c commit b09c43a
Showing 1 changed file with 32 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class EntityProvider<T> implements AutoCloseable {

protected final Logger logger = LogManager.getLogger();
private static final Duration DISCOVERY_TIMEOUT = Duration.ofSeconds(10);

private final String kind;
private final Map<String, T> address2entity = new LinkedHashMap<>();
private final Sinks.Many<Map.Entry<String, T>> sink;
Expand Down Expand Up @@ -75,15 +79,41 @@ public T getById(String consumer, String id) {

// VT: FIXME: Swap getById() and getMonoById() logic so both are more reactive

var found = getFlux()
Map.Entry<String, T> found;

try {

found = getFlux()
.filter(kv -> kv.getKey().equals(id))
.blockFirst();
.blockFirst(DISCOVERY_TIMEOUT);

} catch (IllegalStateException ex) {

var cause = ex.getCause();

if (cause instanceof TimeoutException) {
logger.error("{}: \"{}\" not found among configured {} IDs within {}, check if it is configured and available; existing mappings follow", consumer, id, kind, DISCOVERY_TIMEOUT);
} else {
logger.error("{}: \"{}\" not found among configured {} IDs; exception trace and existing mappings follow:", consumer, id, kind, ex);
}

dumpMappings(consumer, id);
return null;

}

if (found != null) {
return found.getValue();
}

logger.error("{}: \"{}\" not found among configured {} IDs; existing mappings follow:", consumer, id, kind);

dumpMappings(consumer, id);
return null;
}

private void dumpMappings(String consumer, String id) {

getFlux()
.subscribe(entry -> logger.error(
" id={}, {}={}",
Expand All @@ -94,8 +124,6 @@ public T getById(String consumer, String id) {
: entry.getValue()));

logger.error("{}: {}: skipping to proceed with the rest of the configuration", consumer, id);

return null;
}

public Mono<T> getMonoById(String consumer, String id) {
Expand Down

0 comments on commit b09c43a

Please sign in to comment.