Skip to content

Commit

Permalink
Merge pull request #5 from RIPE-NCC/timestampts-from-objects
Browse files Browse the repository at this point in the history
Timestamps from objects
  • Loading branch information
lolepezy committed Jun 12, 2023
2 parents 4ab8f73 + 6edda5c commit b2b730b
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 34 deletions.
4 changes: 4 additions & 0 deletions build.gradle
Expand Up @@ -10,6 +10,8 @@ plugins {
apply plugin: 'java'
apply plugin: 'io.spring.dependency-management'

apply from: "dependencies.gradle"

group = 'net.ripe.rpki'
version = '0.0.1-SNAPSHOT'

Expand All @@ -31,6 +33,8 @@ dependencies {
implementation 'com.google.guava:guava:31.1-jre'
implementation 'org.apache.commons:commons-lang3:3.0'

implementation "net.ripe.rpki:rpki-commons:$rpki_commons_version"

testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

Expand Down
4 changes: 4 additions & 0 deletions dependencies.gradle
@@ -0,0 +1,4 @@
ext {
rpki_commons_version = '1.32'

}
143 changes: 122 additions & 21 deletions src/main/java/net/ripe/rpki/rsyncit/rrdp/RrdpFetcher.java
Expand Up @@ -2,6 +2,15 @@

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import net.ripe.rpki.commons.crypto.cms.RpkiSignedObject;
import net.ripe.rpki.commons.crypto.cms.aspa.AspaCmsParser;
import net.ripe.rpki.commons.crypto.cms.roa.RoaCmsParser;
import net.ripe.rpki.commons.crypto.cms.manifest.ManifestCmsParser;
import net.ripe.rpki.commons.crypto.cms.ghostbuster.GhostbustersCmsParser;
import net.ripe.rpki.commons.crypto.x509cert.X509ResourceCertificateParser;
import net.ripe.rpki.commons.crypto.crl.X509Crl;
import net.ripe.rpki.commons.util.RepositoryObjectType;
import net.ripe.rpki.commons.validation.ValidationResult;
import net.ripe.rpki.rsyncit.config.Config;
import net.ripe.rpki.rsyncit.util.Sha256;
import net.ripe.rpki.rsyncit.util.Time;
Expand All @@ -23,13 +32,16 @@
import javax.xml.xpath.XPathFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.math.BigInteger;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Base64;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -48,25 +60,34 @@ public RrdpFetcher(Config config, WebClient httpClient, State state) {
log.info("RrdpFetcher for {}", config.rrdpUrl());
}

private byte[] blockForHttpGetRequest(String uri, Duration timeout) {
return httpClient.get().uri(uri).retrieve().bodyToMono(byte[].class).block(timeout);
private Downloaded blockForHttpGetRequest(String uri, Duration timeout) {
var lastModified = new AtomicReference<Instant>(null);
var body = httpClient.get().uri(uri).retrieve()
.toEntity(byte[].class)
.doOnSuccess(e -> {
final long modified = e.getHeaders().getLastModified();
if (modified != -1) {
lastModified.set(Instant.ofEpochMilli(modified));
}
})
.block(timeout)
.getBody();
return new Downloaded(body, lastModified.get());
}

/**
* Load snapshot and validate hash
*/
private byte[] loadSnapshot(String snapshotUrl, String desiredSnapshotHash) throws SnapshotStructureException {
private Downloaded loadSnapshot(String snapshotUrl, String expectedSnapshotHash) throws SnapshotStructureException {
log.info("loading RRDP snapshot from {}", snapshotUrl);

final byte[] snapshotBytes = blockForHttpGetRequest(snapshotUrl, config.requestTimeout());

final String realSnapshotHash = Sha256.asString(snapshotBytes);
if (!realSnapshotHash.equalsIgnoreCase(desiredSnapshotHash)) {
var snapshot = blockForHttpGetRequest(snapshotUrl, config.requestTimeout());
final String realSnapshotHash = Sha256.asString(snapshot.content());
if (!realSnapshotHash.equalsIgnoreCase(expectedSnapshotHash)) {
throw new SnapshotStructureException(snapshotUrl,
"with len(content) = %d had sha256(content) = %s, expected %s".formatted(snapshotBytes.length, realSnapshotHash, desiredSnapshotHash));
"with len(content) = %d had sha256(content) = %s, expected %s".formatted(snapshot.content().length, realSnapshotHash, expectedSnapshotHash));
}

return snapshotBytes;
return snapshot;
}

public FetchResult fetchObjects() {
Expand All @@ -82,7 +103,7 @@ public FetchResult fetchObjectsEx() {
try {
final DocumentBuilder documentBuilder = XML.newDocumentBuilder();

final byte[] notificationBytes = blockForHttpGetRequest(config.rrdpUrl(), config.requestTimeout());
final byte[] notificationBytes = blockForHttpGetRequest(config.rrdpUrl(), config.requestTimeout()).content();
final Document notificationXmlDoc = documentBuilder.parse(new ByteArrayInputStream(notificationBytes));

final int serial = Integer.parseInt(notificationXmlDoc.getDocumentElement().getAttribute("serial"));
Expand All @@ -100,16 +121,16 @@ public FetchResult fetchObjectsEx() {
}

long begin = System.currentTimeMillis();
final byte[] snapshotContent = loadSnapshot(snapshotUrl, desiredSnapshotHash);
var snapshot = loadSnapshot(snapshotUrl, desiredSnapshotHash);
long end = System.currentTimeMillis();
log.info("Downloaded snapshot in {}ms", (end - begin));

final Document snapshotXmlDoc = documentBuilder.parse(new ByteArrayInputStream(snapshotContent));
final Document snapshotXmlDoc = documentBuilder.parse(new ByteArrayInputStream(snapshot.content()));
var doc = snapshotXmlDoc.getDocumentElement();

validateSnapshotStructure(serial, snapshotUrl, doc);

var processPublishElementResult = processPublishElements(doc);
var processPublishElementResult = processPublishElements(doc, snapshot.lastModified());

return new SuccessfulFetch(processPublishElementResult.objects, sessionId, serial);
} catch (SnapshotStructureException | ParserConfigurationException | XPathExpressionException | SAXException |
Expand Down Expand Up @@ -156,18 +177,30 @@ private static void validateSnapshotStructure(int notificationSerial, String sna
}
}

private ProcessPublishElementResult processPublishElements(Element doc) throws XPathExpressionException {
private ProcessPublishElementResult processPublishElements(Element doc, Instant lastModified) throws XPathExpressionException {
var queryPublish = XPathFactory.newDefaultInstance().newXPath().compile("/snapshot/publish");
final NodeList publishedObjects = (NodeList) queryPublish.evaluate(doc, XPathConstants.NODESET);

// Generate timestamp that will be tracked per object and used as FS modification timestamp.
// Use last-modified header from the snapshot if available, otherwise truncate current time
// to the closest hour -- it is unlikely that different instances will have clocks off by a lot,
// so rounding down to an hour should generate the same timestamps _most of the time_.
//
var defaultTimestamp = lastModified != null ? lastModified : Instant.now().truncatedTo(ChronoUnit.HOURS);

// This timestamp is only needed for marking objects in the timestamp cache.
var now = Instant.now();
var collisionCount = new AtomicInteger();

var collisionCount = new AtomicInteger();
var decoder = Base64.getDecoder();

var t = Time.timed(() -> IntStream
var objectItems = IntStream
.range(0, publishedObjects.getLength())
.mapToObj(publishedObjects::item)
.toList();

var t = Time.timed(() -> objectItems
.parallelStream()
.map(item -> {
var objectUri = item.getAttributes().getNamedItem("uri").getNodeValue();
var content = item.getTextContent();
Expand All @@ -177,8 +210,17 @@ private ProcessPublishElementResult processPublishElements(Element doc) throws X
// off before decoding. See also:
// https://www.w3.org/TR/2004/PER-xmlschema-2-20040318/datatypes.html#base64Binary
var decoded = decoder.decode(content.trim());
var hash = Sha256.asString(decoded);
final Instant createAt = state.getOrUpdateCreatedAt(hash, now);

var hash = Sha256.asBytes(decoded);

// Try to get some creation timestamp from the object itself. If it's impossible to parse
// the object, use the default based on the last-modified header of the snapshot.
//
// Cache the timestamp per hash do avoid re-parsing every object in the snapshot every time.
//
final Instant createAt = state.cacheTimestamps(Sha256.asString(hash), now,
() -> spiceWithHash(getTimestampForObject(objectUri, decoded, defaultTimestamp), hash));

return new RpkiObject(URI.create(objectUri), decoded, createAt);
} catch (RuntimeException e) {
log.error("Cannot decode object data for URI {}\n{}", objectUri, content);
Expand All @@ -192,8 +234,9 @@ private ProcessPublishElementResult processPublishElements(Element doc) throws X
.entrySet().stream()
.map(item -> {
if (item.getValue().size() > 1) {
var collect = item.getValue().stream().map(coll ->
Sha256.asString(coll.bytes())).
var collect = item.getValue().
stream().
map(coll -> Sha256.asString(coll.bytes())).
collect(Collectors.joining(", "));
log.warn("Multiple objects for {}, keeping first element: {}", item.getKey(), collect);
collisionCount.addAndGet(item.getValue().size() - 1);
Expand All @@ -208,6 +251,62 @@ private ProcessPublishElementResult processPublishElements(Element doc) throws X
return new ProcessPublishElementResult(objects, collisionCount.get());
}

private static Instant extractSigningTime(RpkiSignedObject o) {
return Instant.ofEpochMilli(o.getSigningTime().getMillis());
}

private Instant getTimestampForObject(final String objectUri, final byte[] decoded, Instant lastModified) {
final RepositoryObjectType objectType = RepositoryObjectType.parse(objectUri);
try {
return switch (objectType) {
case Manifest -> {
ManifestCmsParser manifestCmsParser = new ManifestCmsParser();
manifestCmsParser.parse(ValidationResult.withLocation(objectUri), decoded);
yield extractSigningTime(manifestCmsParser.getManifestCms());
}
case Aspa -> {
var aspaCmsParser = new AspaCmsParser();
aspaCmsParser.parse(ValidationResult.withLocation(objectUri), decoded);
yield extractSigningTime(aspaCmsParser.getAspa());
}
case Roa -> {
RoaCmsParser roaCmsParser = new RoaCmsParser();
roaCmsParser.parse(ValidationResult.withLocation(objectUri), decoded);
yield extractSigningTime(roaCmsParser.getRoaCms());
}
case Certificate -> {
X509ResourceCertificateParser x509CertificateParser = new X509ResourceCertificateParser();
x509CertificateParser.parse(ValidationResult.withLocation(objectUri), decoded);
final var cert = x509CertificateParser.getCertificate().getCertificate();
yield Instant.ofEpochMilli(cert.getNotBefore().getTime());
}
case Crl -> {
final X509Crl x509Crl = X509Crl.parseDerEncoded(decoded, ValidationResult.withLocation(objectUri));
final var crl = x509Crl.getCrl();
yield Instant.ofEpochMilli(crl.getThisUpdate().getTime());
}
case Gbr -> {
GhostbustersCmsParser ghostbustersCmsParser = new GhostbustersCmsParser();
ghostbustersCmsParser.parse(ValidationResult.withLocation(objectUri), decoded);
yield extractSigningTime(ghostbustersCmsParser.getGhostbustersCms());
}
case Unknown -> lastModified;
};
} catch (Exception e) {
return lastModified;
}
}

/**
* Add artificial millisecond offset to the timestamp based on hash of the object.
* This MAY help for the corner case of objects having second-accuracy timestamps
* and the timestatmp in seconds being the same for multiple objects.
*/
private Instant spiceWithHash(Instant t, byte[] hash) {
final BigInteger ms = new BigInteger(hash).mod(BigInteger.valueOf(1000L));
return t.truncatedTo(ChronoUnit.SECONDS).plusMillis(ms.longValue());
}

record ProcessPublishElementResult(List<RpkiObject> objects, int collisionCount) {}

public sealed interface FetchResult permits SuccessfulFetch, NoUpdates, FailedFetch, Timeout {}
Expand All @@ -217,5 +316,7 @@ public record NoUpdates(String sessionId, Integer serial) implements FetchResult
public record FailedFetch(Exception exception) implements FetchResult {}
public record Timeout() implements FetchResult {}

public record Downloaded(byte[] content, Instant lastModified) {};

}

19 changes: 8 additions & 11 deletions src/main/java/net/ripe/rpki/rsyncit/rrdp/State.java
Expand Up @@ -10,16 +10,18 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

@Getter
public class State {

@Setter
RrdpState rrdpState;
Map<String, Times> times;
ConcurrentHashMap<String, Times> times;

public State() {
this.times = new HashMap<>();
this.times = new ConcurrentHashMap<>();
}

// Remove entries that were not mentioned in RRDP repository for a while
Expand All @@ -33,15 +35,10 @@ public synchronized void removeOldObject(Instant cutOffTime) {
hashesToDelete.forEach(times::remove);
}

public synchronized Instant getOrUpdateCreatedAt(String hash, Instant now) {
var ts = times.get(hash);
if (ts == null) {
times.put(hash, new Times(now, now));
return now;
} else {
ts.setLastMentioned(now);
return ts.getCreatedAt();
}
public Instant cacheTimestamps(String hash, Instant now, Supplier<Instant> createdAt) {
var ts = times.computeIfAbsent(hash, h -> new Times(createdAt.get(), now));
ts.setLastMentioned(now);
return ts.getCreatedAt();
}

@Data
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/net/ripe/rpki/rsyncit/service/SyncService.java
Expand Up @@ -24,7 +24,6 @@ public class SyncService {
private final WebClientBuilderFactory webClientFactory;
private final AppConfig appConfig;
private final State state;
private final MeterRegistry meterRegistry;
private final RRDPFetcherMetrics metrics;

@Autowired
Expand All @@ -33,7 +32,6 @@ public SyncService(WebClientBuilderFactory webClientFactory,
MeterRegistry meterRegistry) {
this.webClientFactory = webClientFactory;
this.appConfig = appConfig;
this.meterRegistry = meterRegistry;
this.metrics = new RRDPFetcherMetrics(meterRegistry);
this.state = new State();
}
Expand Down

0 comments on commit b2b730b

Please sign in to comment.