diff --git a/build.gradle b/build.gradle index b0d5feb..89f0187 100644 --- a/build.gradle +++ b/build.gradle @@ -10,6 +10,8 @@ plugins { apply plugin: 'java' apply plugin: 'io.spring.dependency-management' +apply from: "dependencies.gradle" + group = 'net.ripe.rpki' version = '0.0.1-SNAPSHOT' @@ -31,6 +33,8 @@ dependencies { implementation 'com.google.guava:guava:31.1-jre' implementation 'org.apache.commons:commons-lang3:3.0' + implementation "net.ripe.rpki:rpki-commons:$rpki_commons_version" + testImplementation 'org.springframework.boot:spring-boot-starter-test' } diff --git a/dependencies.gradle b/dependencies.gradle new file mode 100644 index 0000000..5dd036e --- /dev/null +++ b/dependencies.gradle @@ -0,0 +1,4 @@ +ext { + rpki_commons_version = '1.32' + +} diff --git a/src/main/java/net/ripe/rpki/rsyncit/rrdp/RrdpFetcher.java b/src/main/java/net/ripe/rpki/rsyncit/rrdp/RrdpFetcher.java index 57e7ab7..fac56a7 100644 --- a/src/main/java/net/ripe/rpki/rsyncit/rrdp/RrdpFetcher.java +++ b/src/main/java/net/ripe/rpki/rsyncit/rrdp/RrdpFetcher.java @@ -2,6 +2,15 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import net.ripe.rpki.commons.crypto.cms.RpkiSignedObject; +import net.ripe.rpki.commons.crypto.cms.aspa.AspaCmsParser; +import net.ripe.rpki.commons.crypto.cms.roa.RoaCmsParser; +import net.ripe.rpki.commons.crypto.cms.manifest.ManifestCmsParser; +import net.ripe.rpki.commons.crypto.cms.ghostbuster.GhostbustersCmsParser; +import net.ripe.rpki.commons.crypto.x509cert.X509ResourceCertificateParser; +import net.ripe.rpki.commons.crypto.crl.X509Crl; +import net.ripe.rpki.commons.util.RepositoryObjectType; +import net.ripe.rpki.commons.validation.ValidationResult; import net.ripe.rpki.rsyncit.config.Config; import net.ripe.rpki.rsyncit.util.Sha256; import net.ripe.rpki.rsyncit.util.Time; @@ -23,13 +32,16 @@ import javax.xml.xpath.XPathFactory; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.math.BigInteger; import java.net.URI; import java.time.Duration; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Base64; import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -48,25 +60,34 @@ public RrdpFetcher(Config config, WebClient httpClient, State state) { log.info("RrdpFetcher for {}", config.rrdpUrl()); } - private byte[] blockForHttpGetRequest(String uri, Duration timeout) { - return httpClient.get().uri(uri).retrieve().bodyToMono(byte[].class).block(timeout); + private Downloaded blockForHttpGetRequest(String uri, Duration timeout) { + var lastModified = new AtomicReference(null); + var body = httpClient.get().uri(uri).retrieve() + .toEntity(byte[].class) + .doOnSuccess(e -> { + final long modified = e.getHeaders().getLastModified(); + if (modified != -1) { + lastModified.set(Instant.ofEpochMilli(modified)); + } + }) + .block(timeout) + .getBody(); + return new Downloaded(body, lastModified.get()); } /** * Load snapshot and validate hash */ - private byte[] loadSnapshot(String snapshotUrl, String desiredSnapshotHash) throws SnapshotStructureException { + private Downloaded loadSnapshot(String snapshotUrl, String expectedSnapshotHash) throws SnapshotStructureException { log.info("loading RRDP snapshot from {}", snapshotUrl); - final byte[] snapshotBytes = blockForHttpGetRequest(snapshotUrl, config.requestTimeout()); - - final String realSnapshotHash = Sha256.asString(snapshotBytes); - if (!realSnapshotHash.equalsIgnoreCase(desiredSnapshotHash)) { + var snapshot = blockForHttpGetRequest(snapshotUrl, config.requestTimeout()); + final String realSnapshotHash = Sha256.asString(snapshot.content()); + if (!realSnapshotHash.equalsIgnoreCase(expectedSnapshotHash)) { throw new SnapshotStructureException(snapshotUrl, - "with len(content) = %d had sha256(content) = %s, expected %s".formatted(snapshotBytes.length, realSnapshotHash, desiredSnapshotHash)); + "with len(content) = %d had sha256(content) = %s, expected %s".formatted(snapshot.content().length, realSnapshotHash, expectedSnapshotHash)); } - - return snapshotBytes; + return snapshot; } public FetchResult fetchObjects() { @@ -82,7 +103,7 @@ public FetchResult fetchObjectsEx() { try { final DocumentBuilder documentBuilder = XML.newDocumentBuilder(); - final byte[] notificationBytes = blockForHttpGetRequest(config.rrdpUrl(), config.requestTimeout()); + final byte[] notificationBytes = blockForHttpGetRequest(config.rrdpUrl(), config.requestTimeout()).content(); final Document notificationXmlDoc = documentBuilder.parse(new ByteArrayInputStream(notificationBytes)); final int serial = Integer.parseInt(notificationXmlDoc.getDocumentElement().getAttribute("serial")); @@ -100,16 +121,16 @@ public FetchResult fetchObjectsEx() { } long begin = System.currentTimeMillis(); - final byte[] snapshotContent = loadSnapshot(snapshotUrl, desiredSnapshotHash); + var snapshot = loadSnapshot(snapshotUrl, desiredSnapshotHash); long end = System.currentTimeMillis(); log.info("Downloaded snapshot in {}ms", (end - begin)); - final Document snapshotXmlDoc = documentBuilder.parse(new ByteArrayInputStream(snapshotContent)); + final Document snapshotXmlDoc = documentBuilder.parse(new ByteArrayInputStream(snapshot.content())); var doc = snapshotXmlDoc.getDocumentElement(); validateSnapshotStructure(serial, snapshotUrl, doc); - var processPublishElementResult = processPublishElements(doc); + var processPublishElementResult = processPublishElements(doc, snapshot.lastModified()); return new SuccessfulFetch(processPublishElementResult.objects, sessionId, serial); } catch (SnapshotStructureException | ParserConfigurationException | XPathExpressionException | SAXException | @@ -156,18 +177,30 @@ private static void validateSnapshotStructure(int notificationSerial, String sna } } - private ProcessPublishElementResult processPublishElements(Element doc) throws XPathExpressionException { + private ProcessPublishElementResult processPublishElements(Element doc, Instant lastModified) throws XPathExpressionException { var queryPublish = XPathFactory.newDefaultInstance().newXPath().compile("/snapshot/publish"); final NodeList publishedObjects = (NodeList) queryPublish.evaluate(doc, XPathConstants.NODESET); + // Generate timestamp that will be tracked per object and used as FS modification timestamp. + // Use last-modified header from the snapshot if available, otherwise truncate current time + // to the closest hour -- it is unlikely that different instances will have clocks off by a lot, + // so rounding down to an hour should generate the same timestamps _most of the time_. + // + var defaultTimestamp = lastModified != null ? lastModified : Instant.now().truncatedTo(ChronoUnit.HOURS); + + // This timestamp is only needed for marking objects in the timestamp cache. var now = Instant.now(); - var collisionCount = new AtomicInteger(); + var collisionCount = new AtomicInteger(); var decoder = Base64.getDecoder(); - var t = Time.timed(() -> IntStream + var objectItems = IntStream .range(0, publishedObjects.getLength()) .mapToObj(publishedObjects::item) + .toList(); + + var t = Time.timed(() -> objectItems + .parallelStream() .map(item -> { var objectUri = item.getAttributes().getNamedItem("uri").getNodeValue(); var content = item.getTextContent(); @@ -177,8 +210,17 @@ private ProcessPublishElementResult processPublishElements(Element doc) throws X // off before decoding. See also: // https://www.w3.org/TR/2004/PER-xmlschema-2-20040318/datatypes.html#base64Binary var decoded = decoder.decode(content.trim()); - var hash = Sha256.asString(decoded); - final Instant createAt = state.getOrUpdateCreatedAt(hash, now); + + var hash = Sha256.asBytes(decoded); + + // Try to get some creation timestamp from the object itself. If it's impossible to parse + // the object, use the default based on the last-modified header of the snapshot. + // + // Cache the timestamp per hash do avoid re-parsing every object in the snapshot every time. + // + final Instant createAt = state.cacheTimestamps(Sha256.asString(hash), now, + () -> spiceWithHash(getTimestampForObject(objectUri, decoded, defaultTimestamp), hash)); + return new RpkiObject(URI.create(objectUri), decoded, createAt); } catch (RuntimeException e) { log.error("Cannot decode object data for URI {}\n{}", objectUri, content); @@ -192,8 +234,9 @@ private ProcessPublishElementResult processPublishElements(Element doc) throws X .entrySet().stream() .map(item -> { if (item.getValue().size() > 1) { - var collect = item.getValue().stream().map(coll -> - Sha256.asString(coll.bytes())). + var collect = item.getValue(). + stream(). + map(coll -> Sha256.asString(coll.bytes())). collect(Collectors.joining(", ")); log.warn("Multiple objects for {}, keeping first element: {}", item.getKey(), collect); collisionCount.addAndGet(item.getValue().size() - 1); @@ -208,6 +251,62 @@ private ProcessPublishElementResult processPublishElements(Element doc) throws X return new ProcessPublishElementResult(objects, collisionCount.get()); } + private static Instant extractSigningTime(RpkiSignedObject o) { + return Instant.ofEpochMilli(o.getSigningTime().getMillis()); + } + + private Instant getTimestampForObject(final String objectUri, final byte[] decoded, Instant lastModified) { + final RepositoryObjectType objectType = RepositoryObjectType.parse(objectUri); + try { + return switch (objectType) { + case Manifest -> { + ManifestCmsParser manifestCmsParser = new ManifestCmsParser(); + manifestCmsParser.parse(ValidationResult.withLocation(objectUri), decoded); + yield extractSigningTime(manifestCmsParser.getManifestCms()); + } + case Aspa -> { + var aspaCmsParser = new AspaCmsParser(); + aspaCmsParser.parse(ValidationResult.withLocation(objectUri), decoded); + yield extractSigningTime(aspaCmsParser.getAspa()); + } + case Roa -> { + RoaCmsParser roaCmsParser = new RoaCmsParser(); + roaCmsParser.parse(ValidationResult.withLocation(objectUri), decoded); + yield extractSigningTime(roaCmsParser.getRoaCms()); + } + case Certificate -> { + X509ResourceCertificateParser x509CertificateParser = new X509ResourceCertificateParser(); + x509CertificateParser.parse(ValidationResult.withLocation(objectUri), decoded); + final var cert = x509CertificateParser.getCertificate().getCertificate(); + yield Instant.ofEpochMilli(cert.getNotBefore().getTime()); + } + case Crl -> { + final X509Crl x509Crl = X509Crl.parseDerEncoded(decoded, ValidationResult.withLocation(objectUri)); + final var crl = x509Crl.getCrl(); + yield Instant.ofEpochMilli(crl.getThisUpdate().getTime()); + } + case Gbr -> { + GhostbustersCmsParser ghostbustersCmsParser = new GhostbustersCmsParser(); + ghostbustersCmsParser.parse(ValidationResult.withLocation(objectUri), decoded); + yield extractSigningTime(ghostbustersCmsParser.getGhostbustersCms()); + } + case Unknown -> lastModified; + }; + } catch (Exception e) { + return lastModified; + } + } + + /** + * Add artificial millisecond offset to the timestamp based on hash of the object. + * This MAY help for the corner case of objects having second-accuracy timestamps + * and the timestatmp in seconds being the same for multiple objects. + */ + private Instant spiceWithHash(Instant t, byte[] hash) { + final BigInteger ms = new BigInteger(hash).mod(BigInteger.valueOf(1000L)); + return t.truncatedTo(ChronoUnit.SECONDS).plusMillis(ms.longValue()); + } + record ProcessPublishElementResult(List objects, int collisionCount) {} public sealed interface FetchResult permits SuccessfulFetch, NoUpdates, FailedFetch, Timeout {} @@ -217,5 +316,7 @@ public record NoUpdates(String sessionId, Integer serial) implements FetchResult public record FailedFetch(Exception exception) implements FetchResult {} public record Timeout() implements FetchResult {} + public record Downloaded(byte[] content, Instant lastModified) {}; + } diff --git a/src/main/java/net/ripe/rpki/rsyncit/rrdp/State.java b/src/main/java/net/ripe/rpki/rsyncit/rrdp/State.java index 9a05b7f..1c210af 100644 --- a/src/main/java/net/ripe/rpki/rsyncit/rrdp/State.java +++ b/src/main/java/net/ripe/rpki/rsyncit/rrdp/State.java @@ -10,16 +10,18 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; @Getter public class State { @Setter RrdpState rrdpState; - Map times; + ConcurrentHashMap times; public State() { - this.times = new HashMap<>(); + this.times = new ConcurrentHashMap<>(); } // Remove entries that were not mentioned in RRDP repository for a while @@ -33,15 +35,10 @@ public synchronized void removeOldObject(Instant cutOffTime) { hashesToDelete.forEach(times::remove); } - public synchronized Instant getOrUpdateCreatedAt(String hash, Instant now) { - var ts = times.get(hash); - if (ts == null) { - times.put(hash, new Times(now, now)); - return now; - } else { - ts.setLastMentioned(now); - return ts.getCreatedAt(); - } + public Instant cacheTimestamps(String hash, Instant now, Supplier createdAt) { + var ts = times.computeIfAbsent(hash, h -> new Times(createdAt.get(), now)); + ts.setLastMentioned(now); + return ts.getCreatedAt(); } @Data diff --git a/src/main/java/net/ripe/rpki/rsyncit/service/SyncService.java b/src/main/java/net/ripe/rpki/rsyncit/service/SyncService.java index 207642b..df31fdb 100644 --- a/src/main/java/net/ripe/rpki/rsyncit/service/SyncService.java +++ b/src/main/java/net/ripe/rpki/rsyncit/service/SyncService.java @@ -24,7 +24,6 @@ public class SyncService { private final WebClientBuilderFactory webClientFactory; private final AppConfig appConfig; private final State state; - private final MeterRegistry meterRegistry; private final RRDPFetcherMetrics metrics; @Autowired @@ -33,7 +32,6 @@ public SyncService(WebClientBuilderFactory webClientFactory, MeterRegistry meterRegistry) { this.webClientFactory = webClientFactory; this.appConfig = appConfig; - this.meterRegistry = meterRegistry; this.metrics = new RRDPFetcherMetrics(meterRegistry); this.state = new State(); }