diff --git a/src/main/java/com/uid2/optout/partner/EndpointConfig.java b/src/main/java/com/uid2/optout/partner/EndpointConfig.java index a3c2a00..89fdd37 100644 --- a/src/main/java/com/uid2/optout/partner/EndpointConfig.java +++ b/src/main/java/com/uid2/optout/partner/EndpointConfig.java @@ -5,7 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.uid2.shared.Const; import com.uid2.shared.auth.ClientKey; -import io.vertx.core.http.HttpMethod; +import io.netty.handler.codec.http.HttpMethod; import io.vertx.core.json.JsonObject; import java.time.Instant; diff --git a/src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java b/src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java index 140ce12..c669ab7 100644 --- a/src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java +++ b/src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java @@ -1,14 +1,21 @@ package com.uid2.optout.partner; import com.uid2.optout.web.RetryingWebClient; +import com.uid2.optout.web.UnexpectedStatusCodeException; import com.uid2.shared.Utils; import com.uid2.shared.optout.OptOutEntry; import com.uid2.shared.optout.OptOutUtils; +import io.netty.handler.codec.http.HttpMethod; import io.vertx.core.Future; import io.vertx.core.Vertx; +import org.apache.http.client.utils.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.HttpRequest; +import java.util.Set; import java.util.regex.Pattern; public class OptOutPartnerEndpoint implements IOptOutPartnerEndpoint { @@ -16,7 +23,11 @@ public class OptOutPartnerEndpoint implements IOptOutPartnerEndpoint { public static final String VALUEREF_OPTOUT_EPOCH = "${OPTOUT_EPOCH}"; public static final String QUOTEDVREF_ADVERTISING_ID = Pattern.quote(OptOutPartnerEndpoint.VALUEREF_ADVERTISING_ID); public static final String QUOTEDVEF_OPTOUT_EPOCH = Pattern.quote(OptOutPartnerEndpoint.VALUEREF_OPTOUT_EPOCH); + + private static final Set SUCCESS_STATUS_CODES = Set.of(200, 204); + private static final Set RETRYABLE_STATUS_CODES = Set.of(429, 500, 502, 503, 504); private static final Logger LOGGER = LoggerFactory.getLogger(OptOutPartnerEndpoint.class); + private final EndpointConfig config; private final RetryingWebClient retryingClient; @@ -33,36 +44,56 @@ public String name() { @Override public Future send(OptOutEntry entry) { return this.retryingClient.send( - req -> { - for (String queryParam : this.config.queryParams()) { + (URI uri, HttpMethod method) -> { + URIBuilder uriBuilder = new URIBuilder(uri); + + for (String queryParam : config.queryParams()) { int indexOfEqualSign = queryParam.indexOf('='); String paramName = queryParam.substring(0, indexOfEqualSign); String paramValue = queryParam.substring(indexOfEqualSign + 1); String replacedValue = replaceValueReferences(entry, paramValue); - req.setQueryParam(paramName, replacedValue); + + uriBuilder.addParameter(paramName, replacedValue); + } + + URI uriWithParams; + try { + uriWithParams = uriBuilder.build(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); } + HttpRequest.Builder builder = HttpRequest.newBuilder() + .uri(uriWithParams) + .method(method.toString(), HttpRequest.BodyPublishers.noBody()); + for (String additionalHeader : this.config.additionalHeaders()) { int indexOfColonSign = additionalHeader.indexOf(':'); String headerName = additionalHeader.substring(0, indexOfColonSign); String headerValue = additionalHeader.substring(indexOfColonSign + 1); String replacedValue = replaceValueReferences(entry, headerValue); - req.headers().add(headerName, replacedValue); + builder.header(headerName, replacedValue); } - LOGGER.info("replaying optout " + config.url() + " - advertising_id: " + Utils.maskPii(entry.advertisingId) + ", epoch: " + String.valueOf(entry.timestamp)); + LOGGER.info("replaying optout " + config.url() + " - advertising_id: " + Utils.maskPii(entry.advertisingId) + ", epoch: " + entry.timestamp); - return req; + return builder.build(); }, resp -> { - // returning tri-state boolean - // - TRUE: result looks good - // - FALSE: retry-able error code returned - // - NULL: failed and should not retry - if (resp == null) return false; - else if (resp.statusCode() == 200) return true; - else if (resp.statusCode() == 500) return false; - else return null; + if (resp == null) { + throw new RuntimeException("response is null"); + } + + if (SUCCESS_STATUS_CODES.contains(resp.statusCode())) { + return true; + } + + LOGGER.info("received non-200 response: " + resp.statusCode() + "-" + resp.body() + " for optout " + config.url() + " - advertising_id: " + Utils.maskPii(entry.advertisingId) + ", epoch: " + entry.timestamp); + if (RETRYABLE_STATUS_CODES.contains(resp.statusCode())) { + return false; + } else { + throw new UnexpectedStatusCodeException(resp.statusCode()); + } } ); } diff --git a/src/main/java/com/uid2/optout/util/Tuple.java b/src/main/java/com/uid2/optout/util/Tuple.java new file mode 100644 index 0000000..9f00353 --- /dev/null +++ b/src/main/java/com/uid2/optout/util/Tuple.java @@ -0,0 +1,31 @@ +package com.uid2.optout.util; + +public class Tuple { + public static class Tuple2 { + private final T1 item1; + private final T2 item2; + + public Tuple2(T1 item1, T2 item2) { + assert item1 != null; + assert item2 != null; + + this.item1 = item1; + this.item2 = item2; + } + + public T1 getItem1() { return item1; } + public T2 getItem2() { return item2; } + + @Override + public int hashCode() { return item1.hashCode() ^ item2.hashCode(); } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Tuple2)) return false; + Tuple2 pairo = (Tuple2) o; + return this.item1.equals(pairo.item1) && + this.item2.equals(pairo.item2); + } + } +} + diff --git a/src/main/java/com/uid2/optout/vertx/OptOutSender.java b/src/main/java/com/uid2/optout/vertx/OptOutSender.java index 4fd8b13..75b16ea 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutSender.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutSender.java @@ -4,6 +4,9 @@ import com.uid2.optout.partner.EndpointConfig; import com.uid2.optout.partner.IOptOutPartnerEndpoint; import com.uid2.optout.partner.OptOutPartnerEndpoint; +import com.uid2.optout.util.Tuple; +import com.uid2.optout.web.TooManyRetriesException; +import com.uid2.optout.web.UnexpectedStatusCodeException; import com.uid2.shared.health.HealthComponent; import com.uid2.shared.health.HealthManager; import com.uid2.shared.optout.*; @@ -26,6 +29,7 @@ import java.time.temporal.ChronoUnit; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -41,7 +45,24 @@ // take entries save in the delta file, and send to partner optout endpoints. // public class OptOutSender extends AbstractVerticle { - private static final Logger LOGGER = LoggerFactory.getLogger(OptOutSender.class); + private static class OptOutSenderLogger { + private final Logger logger = LoggerFactory.getLogger(OptOutSender.class); + private final String partnerName; + + public OptOutSenderLogger(String partnerName) { + this.partnerName = partnerName; + } + + public void info(String message, Object... args) { + logger.info("[" + this.partnerName + "] " + message, args); + } + + public void error(String message, Object... args) { + logger.error("[" + this.partnerName + "] " + message, args); + } + } + + private final OptOutSenderLogger logger; private final HealthComponent healthComponent; private final String deltaConsumerDir; private final int deltaRotateInterval; @@ -49,12 +70,12 @@ public class OptOutSender extends AbstractVerticle { private final int senderReplicaId; private final int totalReplicas; private final IOptOutPartnerEndpoint remotePartner; - private final Counter counterTotalEntriesSent; private final String eventCloudSyncDownloaded; + private final Map, Counter> entryReplayStatusCounters = new HashMap<>(); private final AtomicInteger pendingFilesCount = new AtomicInteger(0); - private final AtomicLong lastEndtrySent = new AtomicLong(Instant.EPOCH.getEpochSecond()); + private final AtomicLong lastEntrySent = new AtomicLong(0); private LinkedList pendingFiles = new LinkedList<>(); - private boolean isReplaying = false; + private AtomicBoolean isReplaying = new AtomicBoolean(false); private CompletableFuture pendingAsyncOp = null; // name of the file that stores timestamp private Path timestampFile = null; @@ -68,6 +89,7 @@ public OptOutSender(JsonObject jsonConfig, Vertx vertx, EndpointConfig partnerCo } public OptOutSender(JsonObject jsonConfig, IOptOutPartnerEndpoint optOutPartner, String eventCloudSyncDownloaded) { + this.logger = new OptOutSenderLogger(optOutPartner.name()); this.healthComponent = HealthManager.instance.registerComponent("optout-sender-" + optOutPartner.name()); this.healthComponent.setHealthStatus(false, "not started"); @@ -85,7 +107,7 @@ public OptOutSender(JsonObject jsonConfig, IOptOutPartnerEndpoint optOutPartner, this.timestampFile = Paths.get(jsonConfig.getString(Const.Config.OptOutDataDirProp), "remote_replicate", this.remotePartner.name() + "_timestamp.txt"); this.processedDeltasFile = Paths.get(jsonConfig.getString(Const.Config.OptOutDataDirProp), "remote_replicate", this.remotePartner.name() + "_processed.txt"); - Gauge.builder("uid2.optout.last_entry_sent", () -> this.lastEndtrySent.get()) + Gauge.builder("uid2.optout.last_entry_sent", () -> this.lastEntrySent.get()) .description("gauge for last entry send epoch seconds, per each remote partner") .tag("remote_partner", remotePartner.name()) .register(Metrics.globalRegistry); @@ -94,63 +116,58 @@ public OptOutSender(JsonObject jsonConfig, IOptOutPartnerEndpoint optOutPartner, .description("gauge for remaining delta files to send to remote, per each remote partner") .tag("remote_partner", remotePartner.name()) .register(Metrics.globalRegistry); - - this.counterTotalEntriesSent = Counter.builder("uid2.optout.entries_sent") - .description("counter for total entries sent, per each remote partner") - .tag("remote_partner", remotePartner.name()) - .register(Metrics.globalRegistry); } @Override public void start(Promise startPromise) throws Exception { - LOGGER.info("starting OptOutSender"); + this.logger.info("starting OptOutSender"); this.healthComponent.setHealthStatus(false, "still starting"); try { EventBus eb = vertx.eventBus(); - LOGGER.info("replica id is set to " + this.replicaId); + this.logger.info("replica id is set to " + this.replicaId); if (this.replicaId == this.senderReplicaId) { - LOGGER.info("this is replica " + this.replicaId + ", and will be responsible for consolidating deltas before replaying to remote"); + this.logger.info("this is replica " + this.replicaId + ", and will be responsible for consolidating deltas before replaying to remote"); eb.consumer(this.eventCloudSyncDownloaded, msg -> this.handleCloudDownloaded(msg)); // before mark startPromise complete, scan local delta files and find unprocessed deltas this.scanLocalForUnprocessed().onComplete(ar -> startPromise.handle(ar)); } else { - LOGGER.info("this is not replica " + this.senderReplicaId + ", and will not be responsible for consolidating deltas before replaying to remote"); + this.logger.info("this is not replica " + this.senderReplicaId + ", and will not be responsible for consolidating deltas before replaying to remote"); startPromise.complete(); } } catch (Exception ex) { - LOGGER.error(ex.getMessage(), ex); + this.logger.error(ex.getMessage(), ex); startPromise.fail(new Throwable(ex)); } startPromise.future() .onSuccess(v -> { - LOGGER.info("started OptOutSender"); + this.logger.info("started OptOutSender"); this.healthComponent.setHealthStatus(true); }) .onFailure(e -> { - LOGGER.error("failed starting OptOutSender", e); + this.logger.error("failed starting OptOutSender", e); this.healthComponent.setHealthStatus(false, e.getMessage()); }); } @Override public void stop(Promise stopPromise) throws Exception { - LOGGER.info("shutting down OptOutSender."); + this.logger.info("shutting down OptOutSender."); AtomicInteger shutdownTryCounter = new AtomicInteger(0); vertx.setPeriodic(500, i -> { - if (this.isReplaying == false || shutdownTryCounter.incrementAndGet() > 120) { + if (this.isReplaying.get() == false || shutdownTryCounter.incrementAndGet() > 120) { // wait for at most 60s (120 * 500ms) for current replaying to complete stopPromise.complete(); } }); stopPromise.future() - .onSuccess(v -> LOGGER.info("stopped OptOutSender")) - .onFailure(e -> LOGGER.error("failed stopping OptOutSender", e)); + .onSuccess(v -> this.logger.info("stopped OptOutSender")) + .onFailure(e -> this.logger.error("failed stopping OptOutSender", e)); } // returning name of the file that stores timestamp @@ -169,7 +186,9 @@ private Future scanLocalForUnprocessed() { return CompositeFuture.all(step1, step2).compose(cf -> { HashSet processedDeltas = new HashSet<>(Arrays.asList(cf.resultAt(0))); this.lastProcessedTimestamp = Instant.ofEpochSecond(cf.resultAt(1)); - LOGGER.info("found total " + processedDeltas.size() + " local deltas on disk"); + this.lastEntrySent.set(this.lastProcessedTimestamp.getEpochSecond()); + + this.logger.info("found total " + processedDeltas.size() + " local deltas on disk"); // checking our deltaConsumerDir File dirToList = new File(deltaConsumerDir); @@ -202,14 +221,14 @@ private Future scanLocalForUnprocessed() { if (!processedDeltas.contains(fullName)) { // log an error if an unprocessed delta is found before the timestamp if (fileTimestamp.isBefore(this.lastProcessedTimestamp)) { - LOGGER.error("unprocessed delta file: " + fullName + " found before the last processed timestamp: " + this.lastProcessedTimestamp); + this.logger.error("unprocessed delta file: " + fullName + " found before the last processed timestamp: " + this.lastProcessedTimestamp); } this.pendingFiles.add(fullName); } } - LOGGER.info("added " + this.pendingFiles.size() + " local deltas as pending deltas"); + this.logger.info("added " + this.pendingFiles.size() + " local deltas as pending deltas"); return Future.succeededFuture(); }); @@ -219,19 +238,22 @@ private void handleCloudDownloaded(Message msg) { try { String filename = msg.body(); if (!OptOutUtils.isDeltaFile(filename)) { - LOGGER.info("ignoring non-delta file " + filename + " downloaded from s3"); + this.logger.info("ignoring non-delta file " + filename + " downloaded from s3"); return; } - LOGGER.info("received delta " + filename + " to consolidate and replicate to remote"); + this.logger.info("received delta " + filename + " to consolidate and replicate to remote"); OptOutUtils.addSorted(this.pendingFiles, filename, OptOutUtils.DeltaFilenameComparator); // if it is still replaying the last one, return - if (this.isReplaying) return; + if (this.isReplaying.get()) { + this.logger.info("still replaying the last delta, will not start replaying this one"); + return; + } this.processPendingFilesToConsolidate(Instant.now()); } catch (Exception ex) { - LOGGER.error("handleLogReplay failed unexpectedly: " + ex.getMessage(), ex); + this.logger.error("handleLogReplay failed unexpectedly: " + ex.getMessage(), ex); } } @@ -251,10 +273,10 @@ private void processPendingFilesToConsolidate(Instant now) { // if lastProcessedTimestamp is not initialized, just process up to the current timestamp Instant firstDeltaTimestamp = OptOutUtils.getFileTimestamp(this.pendingFiles.get(0)); nextTimestamp = firstDeltaTimestamp.plusSeconds(OptOutUtils.getSecondsBeforeNextSlot(firstDeltaTimestamp, this.deltaRotateInterval)); - LOGGER.info("last processed timestamp is found to be uninitialized, will process all deltas up to: " + nextTimestamp); + this.logger.info("last processed timestamp is found to be uninitialized, will process all deltas up to: " + nextTimestamp); } else { nextTimestamp = this.lastProcessedTimestamp.plus(this.deltaRotateInterval, ChronoUnit.SECONDS); - LOGGER.info("last processed timestamp is " + this.lastProcessedTimestamp + ", will process all deltas up to: " + nextTimestamp); + this.logger.info("last processed timestamp is " + this.lastProcessedTimestamp + ", will process all deltas up to: " + nextTimestamp); } ListIterator iterator = this.pendingFiles.listIterator(); @@ -276,17 +298,17 @@ private void processPendingFilesToConsolidate(Instant now) { } // either we received deltas from all replicas, or we waited for an entire delta rotation interval - LOGGER.info("current slot: " + currentSlot); + this.logger.info("current slot: " + currentSlot); if (deltasForCurrentIntervalReceived >= this.totalReplicas || currentSlot.isAfter(nextTimestamp)) { if (deltasToConsolidate.size() == 0) { - LOGGER.info("received 0 new deltas, between " + this.lastProcessedTimestamp + " and " + nextTimestamp); + this.logger.info("received 0 new deltas, between " + this.lastProcessedTimestamp + " and " + nextTimestamp); } else { - LOGGER.info("received " + deltasForCurrentIntervalReceived + " new deltas, and total of " + deltasToConsolidate.size() + " to consolidate between " + this.lastProcessedTimestamp + " and " + nextTimestamp); + this.logger.info("received " + deltasForCurrentIntervalReceived + " new deltas, and total of " + deltasToConsolidate.size() + " to consolidate between " + this.lastProcessedTimestamp + " and " + nextTimestamp); } // if received deltas is the same or greater than the total replicas in the current consolidating window // or if the consolidating time window (last, last + deltaRotateInterval) is already in the past - this.isReplaying = true; + this.isReplaying.set(true); this.kickOffDeltaReplayWithConsolidation(nextTimestamp, deltasToConsolidate); } } @@ -295,7 +317,7 @@ private void kickOffDeltaReplayWithConsolidation(Instant nextTimestamp, ListexecuteBlocking(promise -> deltaReplayWithConsolidation(promise, deltasToConsolidate), ar -> { if (ar.failed()) { - LOGGER.error("delta consolidation failed", new Exception(ar.cause())); + this.logger.error("delta consolidation failed", new Exception(ar.cause())); } else { updateProcessedDeltas(nextTimestamp, deltasToConsolidate); // once complete, check if we could start the next round @@ -303,7 +325,7 @@ private void kickOffDeltaReplayWithConsolidation(Instant nextTimestamp, List deltasConsolidated) { if (deltasConsolidated.size() == 0) { - LOGGER.info("skip updating processed delta timestamp due to 0 deltas being processed"); + this.logger.info("skip updating processed delta timestamp due to 0 deltas being processed"); return; } - LOGGER.info("updating processed delta timestamp to: " + nextTimestamp); + this.logger.info("updating processed delta timestamp to: " + nextTimestamp); OptOutUtils.writeTimestampToFile(vertx, this.timestampFile, nextTimestamp.getEpochSecond()).compose(v -> { - LOGGER.info("updated processed delta timestamp to: " + nextTimestamp); + this.logger.info("updated processed delta timestamp to: " + nextTimestamp); // if no files in the list, skip appending process delta filenames to disk if (deltasConsolidated.size() == 0) return Future.succeededFuture(); // persist the list of files on disk - LOGGER.info("appending " + deltasConsolidated.size() + " files to processed delta list"); + this.logger.info("appending " + deltasConsolidated.size() + " files to processed delta list"); return OptOutUtils.appendLinesToFile(vertx, this.processedDeltasFile, deltasConsolidated); }).onFailure(v -> { String filenames = String.join(",", deltasConsolidated); - LOGGER.error("unable to persistent last delta timestamp and/or processed delta filenames: " + nextTimestamp + ": " + filenames); + this.logger.error("unable to persistent last delta timestamp and/or processed delta filenames: " + nextTimestamp + ": " + filenames); }); for (String deltaFile : deltasConsolidated) { this.pendingFiles.remove(deltaFile); } - LOGGER.info("removed " + deltasConsolidated.size() + " delta(s) from pending to process list"); + this.logger.info("removed " + deltasConsolidated.size() + " delta(s) from pending to process list"); } private void deltaReplayWithConsolidation(Promise promise, List deltasToConsolidate) { @@ -347,7 +369,7 @@ private void deltaReplayWithConsolidation(Promise promise, List de try { OptOutHeap heap = new OptOutHeap(1000); for (String deltaFile : deltasToConsolidate) { - LOGGER.info("loading delta " + deltaFile); + this.logger.info("loading delta " + deltaFile); Path fp = Paths.get(deltaFile); try { @@ -355,63 +377,74 @@ private void deltaReplayWithConsolidation(Promise promise, List de OptOutCollection store = new OptOutCollection(data); heap.add(store); } catch (NoSuchFileException ex) { - LOGGER.error("ignoring non-existing file: " + ex.getFile().toString()); + this.logger.error("ignoring non-existing file: " + ex.getFile().toString()); } } OptOutPartition consolidatedDelta = heap.toPartition(true); deltaReplay(promise, consolidatedDelta, deltasToConsolidate); } catch (Exception ex) { - LOGGER.error("deltaReplay failed unexpectedly: " + ex.getMessage(), ex); + this.logger.error("deltaReplay failed unexpectedly: " + ex.getMessage(), ex); // this error is a code logic error and needs to be fixed promise.fail(new Throwable(ex)); } } + private void recordEntryReplayStatus(String status) { + this.entryReplayStatusCounters.computeIfAbsent(new Tuple.Tuple2<>(remotePartner.name(), status), pair -> Counter + .builder("uid2.optout.entries_sent") + .description("Counter for entry replay status") + .tags("remote_partner", String.valueOf(pair.getItem1()), "status", String.valueOf(pair.getItem2())) + .register(Metrics.globalRegistry)).increment(); + } + private void deltaReplay(Promise promise, OptOutCollection store, List fileList) { try { // generate comma separated filename list for logging String filenames = String.join(",", fileList); - this.pendingAsyncOp = new CompletableFuture(); // sequentially send each entry Future lastOp = Future.succeededFuture(); for (int i = 0; i < store.size(); ++i) { final OptOutEntry entry = store.get(i); - lastOp = lastOp.compose(v -> this.remotePartner.send(entry)); - lastOp.onComplete(v -> { - this.lastEndtrySent.set(Instant.now().getEpochSecond()); - this.counterTotalEntriesSent.increment(); + Future sendOp = this.remotePartner.send(entry); + sendOp.onComplete(v -> { + if (v.succeeded()) { + recordEntryReplayStatus("success"); + this.lastEntrySent.set(entry.timestamp); + } else { + if (v.cause() instanceof TooManyRetriesException) { + recordEntryReplayStatus("too_many_retries"); + } else if (v.cause() instanceof UnexpectedStatusCodeException) { + recordEntryReplayStatus("unexpected_status_code_" + ((UnexpectedStatusCodeException) v.cause()).getStatusCode()); + } else { + recordEntryReplayStatus("unknown_error"); + } + + this.logger.error("deltaReplay failed sending entry: " + entry.timestamp, v.cause()); + } }); + + lastOp = lastOp.compose(v -> sendOp); } lastOp.onComplete(ar -> { if (ar.failed()) { - LOGGER.error("deltaReplay failed sending delta " + filenames + " to remote: " + this.remotePartner.name(), ar.cause()); - LOGGER.error("deltaReplay has " + this.pendingFilesCount.get() + " pending file"); - LOGGER.error("deltaReplay will restart in 3600s"); - vertx.setTimer(1000 * 3600, i -> this.pendingAsyncOp.completeExceptionally(new Exception(ar.cause()))); + this.logger.error("deltaReplay failed sending delta " + filenames + " to remote: " + this.remotePartner.name(), ar.cause()); + this.logger.error("deltaReplay has " + this.pendingFilesCount.get() + " pending file"); + this.logger.error("deltaReplay will restart in 3600s"); + vertx.setTimer(1000 * 3600, i -> promise.fail(ar.cause())); } else { - LOGGER.info("finished delta replay for file: " + filenames); - this.pendingAsyncOp.complete(null); + this.logger.info("finished delta replay for file: " + filenames); String completeMsg = this.remotePartner.name() + "," + filenames; vertx.eventBus().send(Const.Event.DeltaSentRemote, completeMsg); + promise.complete(); } }); - // this causes it to block on the worker thread (this function should be called on a worker thread) - try { - this.pendingAsyncOp.get(); - promise.complete(); - } catch (Exception ex) { - LOGGER.error("deltaReplay failed unexpectedly: " + ex.getMessage(), ex); - promise.fail(ex); - } finally { - this.pendingAsyncOp = null; - } } catch (Exception ex) { - LOGGER.error("deltaReplay failed unexpectedly: " + ex.getMessage(), ex); + this.logger.error("deltaReplay failed unexpectedly: " + ex.getMessage(), ex); // this error is a code logic error and needs to be fixed promise.fail(new Throwable(ex)); } diff --git a/src/main/java/com/uid2/optout/web/RetryingWebClient.java b/src/main/java/com/uid2/optout/web/RetryingWebClient.java index 32e0789..b69a3cf 100644 --- a/src/main/java/com/uid2/optout/web/RetryingWebClient.java +++ b/src/main/java/com/uid2/optout/web/RetryingWebClient.java @@ -1,87 +1,85 @@ package com.uid2.optout.web; +import io.netty.handler.codec.http.HttpMethod; import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.Vertx; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.http.HttpMethod; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.vertx.ext.web.client.HttpRequest; -import io.vertx.ext.web.client.HttpResponse; -import io.vertx.ext.web.client.WebClient; -import io.vertx.ext.web.client.WebClientOptions; import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import java.util.function.BiFunction; public class RetryingWebClient { private static final Logger LOGGER = LoggerFactory.getLogger(RetryingWebClient.class); private final URI uri; private final HttpMethod method; - private final WebClient client; private final int retryCount; private final int retryBackoffMs; + private final HttpClient httpClient; private Vertx vertx; public RetryingWebClient(Vertx vertx, String uri, HttpMethod method, int retryCount, int retryBackoffMs) { this.vertx = vertx; this.uri = URI.create(uri); this.method = method; - - WebClientOptions options = new WebClientOptions(); - - // Disabling Temporary Measure to skip https validation - // options.setVerifyHost(false); - - this.client = WebClient.create(vertx, options); + this.httpClient = HttpClient.newHttpClient(); this.retryCount = retryCount; this.retryBackoffMs = retryBackoffMs; } - public Future send(Function, HttpRequest> requestCreator, Function, Boolean> responseValidator) { + public Future send(BiFunction requestCreator, Function responseValidator) { return this.send(requestCreator, responseValidator, 0); } - public Future send(Function, HttpRequest> requestCreator, Function, Boolean> responseValidator, int currentRetries) { + public Future send(BiFunction requestCreator, Function responseValidator, int currentRetries) { Promise promise = Promise.promise(); - HttpRequest req = this.client.requestAbs(method, this.uri.toString()); - requestCreator.apply(req).send(ar -> { + + HttpRequest request = requestCreator.apply(this.uri, this.method); + CompletableFuture> asyncResponse = this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()); + + asyncResponse.thenAccept(response -> { try { - if (ar.failed()) { - // log cause() if failed - LOGGER.error("failed sending to " + uri, ar.cause()); + Boolean responseOK = responseValidator.apply(response); + if (responseOK == null) { + throw new RuntimeException("Response validator returned null"); } - // responseValidator returns a tri-state boolean - // - TRUE: result looks good - // - FALSE: retry-able error code returned - // - NULL: failed and should not retry - Boolean responseOK = responseValidator.apply(ar.result()); - if (responseOK == null) { - promise.fail("non-retry-able error happened for sending to " + this.uri + ", stop retrying and fail"); - } else if (ar.succeeded() && responseOK) { + if (responseOK) { promise.complete(); } else if (currentRetries < this.retryCount) { LOGGER.error("failed sending to " + uri + ", currentRetries: " + currentRetries + ", backing off before retrying"); if (this.retryBackoffMs > 0) { vertx.setTimer(this.retryBackoffMs, i -> { send(requestCreator, responseValidator, currentRetries + 1) - .onComplete(ar2 -> promise.handle(ar2)); + .onComplete(ar2 -> promise.handle(ar2)); }); } else { send(requestCreator, responseValidator, currentRetries + 1) - .onComplete(ar2 -> promise.handle(ar2)); + .onComplete(ar2 -> promise.handle(ar2)); } } else { - promise.fail("retry count exceeded for sending to " + this.uri); + LOGGER.error("retry count exceeded for sending to " + this.uri); + throw new TooManyRetriesException(currentRetries); } - } catch (Exception ex) { - LOGGER.error("unexpected exception: " + ex.getMessage(), ex); - promise.fail(new Throwable(ex)); + } + catch (Throwable ex) { + promise.fail(ex); } }); + + asyncResponse.exceptionally(ex -> { + promise.fail(ex); + return null; + }); + + return promise.future(); } } diff --git a/src/main/java/com/uid2/optout/web/TooManyRetriesException.java b/src/main/java/com/uid2/optout/web/TooManyRetriesException.java new file mode 100644 index 0000000..fdcde6b --- /dev/null +++ b/src/main/java/com/uid2/optout/web/TooManyRetriesException.java @@ -0,0 +1,7 @@ +package com.uid2.optout.web; + +public class TooManyRetriesException extends Exception { + public TooManyRetriesException(int retries) { + super("Too many retries: " + retries); + } +} diff --git a/src/main/java/com/uid2/optout/web/UnexpectedStatusCodeException.java b/src/main/java/com/uid2/optout/web/UnexpectedStatusCodeException.java new file mode 100644 index 0000000..c47d508 --- /dev/null +++ b/src/main/java/com/uid2/optout/web/UnexpectedStatusCodeException.java @@ -0,0 +1,14 @@ +package com.uid2.optout.web; + +public class UnexpectedStatusCodeException extends RuntimeException { + private final int statusCode; + + public UnexpectedStatusCodeException(int statusCode) { + super("Unexpected status code: " + statusCode); + this.statusCode = statusCode; + } + + public int getStatusCode() { + return statusCode; + } +} diff --git a/src/test/java/com/uid2/optout/web/RetryingWebClientTest.java b/src/test/java/com/uid2/optout/web/RetryingWebClientTest.java index 76d883c..94c6695 100644 --- a/src/test/java/com/uid2/optout/web/RetryingWebClientTest.java +++ b/src/test/java/com/uid2/optout/web/RetryingWebClientTest.java @@ -1,7 +1,7 @@ package com.uid2.optout.web; +import io.netty.handler.codec.http.HttpMethod; import io.vertx.core.Vertx; -import io.vertx.core.http.HttpMethod; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import org.junit.After; @@ -9,6 +9,8 @@ import org.junit.Test; import org.junit.runner.RunWith; +import java.net.URI; +import java.net.http.HttpRequest; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; @@ -59,10 +61,10 @@ public void post_expectSuccess(TestContext ctx) { expectSuccess(ctx, HttpMethod.POST); } - private void expectSuccess(TestContext ctx, HttpMethod method) { - RetryingWebClient c = new RetryingWebClient(vertx, "http://localhost:18082/200", method, 0, 0); - c.send(req -> { - return req; + private void expectSuccess(TestContext ctx, HttpMethod testMethod) { + RetryingWebClient c = new RetryingWebClient(vertx, "http://localhost:18082/200", testMethod, 0, 0); + c.send((URI uri, HttpMethod method) -> { + return HttpRequest.newBuilder().uri(uri).method(method.toString(), HttpRequest.BodyPublishers.noBody()).build(); }, resp -> { ctx.assertEquals(200, resp.statusCode()); return 200 == resp.statusCode(); @@ -79,17 +81,17 @@ public void post_expectRetryFailure_zeroBackoff(TestContext ctx) { expectRetryFailure_zeroBackoff(ctx, HttpMethod.POST); } - private void expectRetryFailure_zeroBackoff(TestContext ctx, HttpMethod method) { + private void expectRetryFailure_zeroBackoff(TestContext ctx, HttpMethod testMethod) { AtomicInteger totalAttempts = new AtomicInteger(0); - RetryingWebClient c = new RetryingWebClient(vertx, "http://localhost:18082/404", method, 3, 0); - c.send(req -> { - return req; + RetryingWebClient c = new RetryingWebClient(vertx, "http://localhost:18082/404", testMethod, 3, 0); + c.send((URI uri, HttpMethod method) -> { + return HttpRequest.newBuilder().uri(uri).method(method.toString(), HttpRequest.BodyPublishers.noBody()).build(); }, resp -> { totalAttempts.incrementAndGet(); ctx.assertEquals(404, resp.statusCode()); // returning false for retry return false; - }).onComplete(ctx.asyncAssertFailure()); + }).onComplete(ctx.asyncAssertFailure(v -> ctx.assertTrue(v instanceof TooManyRetriesException))); } @Test @@ -102,17 +104,20 @@ public void post_expectRetryFailure_withBackoff(TestContext ctx) { expectRetryFailure_withBackoff(ctx, HttpMethod.POST); } - private void expectRetryFailure_withBackoff(TestContext ctx, HttpMethod method) { + private void expectRetryFailure_withBackoff(TestContext ctx, HttpMethod testMethod) { AtomicInteger totalAttempts = new AtomicInteger(0); - RetryingWebClient c = new RetryingWebClient(vertx, "http://localhost:18082/404", method, 3, 1); - c.send(req -> { - return req; + RetryingWebClient c = new RetryingWebClient(vertx, "http://localhost:18082/404", testMethod, 3, 1); + c.send((URI uri, HttpMethod method) -> { + return HttpRequest.newBuilder().uri(uri).method(method.toString(), HttpRequest.BodyPublishers.noBody()).build(); }, resp -> { totalAttempts.incrementAndGet(); ctx.assertEquals(404, resp.statusCode()); // returning false for retry return false; - }).onComplete(ctx.asyncAssertFailure(v -> ctx.assertEquals(4, (int) totalAttempts.get()))); + }).onComplete(ctx.asyncAssertFailure(v -> { + ctx.assertEquals(4, (int) totalAttempts.get()); + ctx.assertTrue(v instanceof TooManyRetriesException); + })); } @Test @@ -125,13 +130,13 @@ public void post_expectSuccess_withRandomFailures(TestContext ctx) { expectSuccess_withRandomFailures(ctx, HttpMethod.POST); } - private void expectSuccess_withRandomFailures(TestContext ctx, HttpMethod method) { + private void expectSuccess_withRandomFailures(TestContext ctx, HttpMethod testMethod) { for (int i = 0; i < 10; ++i) { AtomicInteger totalAttempts = new AtomicInteger(0); RetryingWebClient c = new RetryingWebClient(vertx, "http://localhost:18082/random/500_500_500_200", - method, 100, 1); - c.send(req -> { - return req; + testMethod, 100, 1); + c.send((URI uri, HttpMethod method) -> { + return HttpRequest.newBuilder().uri(uri).method(method.toString(), HttpRequest.BodyPublishers.noBody()).build(); }, resp -> { totalAttempts.incrementAndGet(); return resp.statusCode() == 200; @@ -152,20 +157,21 @@ public void post_expectImmediateFailure_withNonRetryErrors(TestContext ctx) { expectImmediateFailure_withNonRetryErrors(ctx, HttpMethod.POST); } - private void expectImmediateFailure_withNonRetryErrors(TestContext ctx, HttpMethod method) { + private void expectImmediateFailure_withNonRetryErrors(TestContext ctx, HttpMethod testMethod) { for (int i = 0; i < 10; ++i) { AtomicInteger totalAttempts = new AtomicInteger(0); - RetryingWebClient c = new RetryingWebClient(vertx, "http://localhost:18082/404", method, 100, 1); - c.send(req -> { - return req; + RetryingWebClient c = new RetryingWebClient(vertx, "http://localhost:18082/404", testMethod, 100, 1); + c.send((URI uri, HttpMethod method) -> { + return HttpRequest.newBuilder().uri(uri).method(method.toString(), HttpRequest.BodyPublishers.noBody()).build(); }, resp -> { totalAttempts.incrementAndGet(); if (resp.statusCode() == 200) return true; else if (resp.statusCode() == 500) return false; - else return null; + else throw new UnexpectedStatusCodeException(resp.statusCode()); }).onComplete(ctx.asyncAssertFailure(v -> { // check that it only attempted once and failed ctx.assertEquals(1, totalAttempts.get()); + ctx.assertTrue(v instanceof UnexpectedStatusCodeException); })); } }