Skip to content

Commit

Permalink
Add timestamp to checkpoints
Browse files Browse the repository at this point in the history
- Add a new TimestampedCheckpoint type, which adds a unix timestamp to
  the basic checkpoint data.
- Update DataStores to store TimestampedCheckpoints.
- Update CoreService to generate new TimestampedCheckpoint periodically,
  even if the underlying checkpoint hasn't changed.

This also renames `MapCheckpoint` to just `Checkpoint`. `MapCheckpoint`
was slightly misleading since it covers both the verifiable map *and*
log.
  • Loading branch information
lann committed Aug 11, 2023
1 parent e1106f4 commit 8ec954d
Show file tree
Hide file tree
Showing 18 changed files with 196 additions and 117 deletions.
4 changes: 2 additions & 2 deletions crates/api/src/v1/proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde_with::{base64::Base64, serde_as};
use std::borrow::Cow;
use thiserror::Error;
use warg_crypto::hash::AnyHash;
use warg_protocol::registry::{LogId, LogLeaf, MapCheckpoint};
use warg_protocol::registry::{Checkpoint, LogId, LogLeaf};

/// Represents a consistency proof request.
#[derive(Serialize, Deserialize)]
Expand All @@ -33,7 +33,7 @@ pub struct ConsistencyResponse {
#[serde(rename_all = "camelCase")]
pub struct InclusionRequest<'a> {
/// The checkpoint to check for inclusion.
pub checkpoint: Cow<'a, MapCheckpoint>,
pub checkpoint: Cow<'a, Checkpoint>,
/// The log leafs to check for inclusion.
pub leafs: Cow<'a, [LogLeaf]>,
}
Expand Down
8 changes: 5 additions & 3 deletions crates/client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use warg_api::v1::{
};
use warg_crypto::hash::{AnyHash, HashError, Sha256};
use warg_protocol::{
registry::{LogId, LogLeaf, MapCheckpoint, MapLeaf, RecordId},
registry::{Checkpoint, LogId, LogLeaf, MapLeaf, RecordId, TimestampedCheckpoint},
SerdeEnvelope,
};
use warg_transparency::{
Expand Down Expand Up @@ -143,7 +143,9 @@ impl Client {
}

/// Gets the latest checkpoint from the registry.
pub async fn latest_checkpoint(&self) -> Result<SerdeEnvelope<MapCheckpoint>, ClientError> {
pub async fn latest_checkpoint(
&self,
) -> Result<SerdeEnvelope<TimestampedCheckpoint>, ClientError> {
let url = self.url.join(paths::fetch_checkpoint());
tracing::debug!("getting latest checkpoint at `{url}`");
into_result::<_, FetchError>(reqwest::get(url).await?).await
Expand Down Expand Up @@ -333,7 +335,7 @@ impl Client {

fn validate_inclusion_response(
response: InclusionResponse,
checkpoint: &MapCheckpoint,
checkpoint: &Checkpoint,
leafs: &[LogLeaf],
) -> Result<(), ClientError> {
let log_proof_bundle: LogProofBundle<Sha256, LogLeaf> =
Expand Down
15 changes: 8 additions & 7 deletions crates/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use warg_crypto::{
};
use warg_protocol::{
operator, package,
registry::{LogId, LogLeaf, MapCheckpoint, PackageId, RecordId},
registry::{LogId, LogLeaf, PackageId, RecordId, TimestampedCheckpoint},
ProtoEnvelope, SerdeEnvelope, Version, VersionReq,
};

Expand Down Expand Up @@ -350,10 +350,11 @@ impl<R: RegistryStorage, C: ContentStorage> Client<R, C> {

async fn update_checkpoint<'a>(
&self,
checkpoint: &SerdeEnvelope<MapCheckpoint>,
ts_checkpoint: &SerdeEnvelope<TimestampedCheckpoint>,
packages: impl IntoIterator<Item = &mut PackageInfo>,
) -> Result<(), ClientError> {
let checkpoint_id: AnyHash = Hash::<Sha256>::of(checkpoint.as_ref()).into();
let checkpoint = &ts_checkpoint.as_ref().checkpoint;
let checkpoint_id: AnyHash = Hash::<Sha256>::of(checkpoint).into();
tracing::info!("updating to checkpoint `{checkpoint_id}`");

let mut operator = self.registry.load_operator().await?.unwrap_or_default();
Expand Down Expand Up @@ -464,7 +465,7 @@ impl<R: RegistryStorage, C: ContentStorage> Client<R, C> {
if !leafs.is_empty() {
self.api
.prove_inclusion(InclusionRequest {
checkpoint: Cow::Borrowed(checkpoint.as_ref()),
checkpoint: Cow::Borrowed(checkpoint),
leafs: Cow::Borrowed(&leafs),
})
.await?;
Expand All @@ -473,8 +474,8 @@ impl<R: RegistryStorage, C: ContentStorage> Client<R, C> {
if let Some(from) = self.registry.load_checkpoint().await? {
self.api
.prove_log_consistency(ConsistencyRequest {
from: Cow::Borrowed(&from.as_ref().log_root),
to: Cow::Borrowed(&checkpoint.as_ref().log_root),
from: Cow::Borrowed(&from.as_ref().checkpoint.log_root),
to: Cow::Borrowed(&ts_checkpoint.as_ref().checkpoint.log_root),
})
.await?;
}
Expand All @@ -486,7 +487,7 @@ impl<R: RegistryStorage, C: ContentStorage> Client<R, C> {
self.registry.store_package(package).await?;
}

self.registry.store_checkpoint(checkpoint).await?;
self.registry.store_checkpoint(ts_checkpoint).await?;

Ok(())
}
Expand Down
11 changes: 7 additions & 4 deletions crates/client/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use warg_crypto::{
use warg_protocol::{
operator,
package::{self, PackageRecord, PACKAGE_RECORD_VERSION},
registry::{MapCheckpoint, PackageId, RecordId},
registry::{Checkpoint, PackageId, RecordId, TimestampedCheckpoint},
ProtoEnvelope, SerdeEnvelope, Version,
};

Expand All @@ -30,10 +30,13 @@ pub use fs::*;
#[async_trait]
pub trait RegistryStorage: Send + Sync {
/// Loads most recent checkpoint
async fn load_checkpoint(&self) -> Result<Option<SerdeEnvelope<MapCheckpoint>>>;
async fn load_checkpoint(&self) -> Result<Option<SerdeEnvelope<TimestampedCheckpoint>>>;

/// Stores most recent checkpoint
async fn store_checkpoint(&self, checkpoint: &SerdeEnvelope<MapCheckpoint>) -> Result<()>;
async fn store_checkpoint(
&self,
ts_checkpoint: &SerdeEnvelope<TimestampedCheckpoint>,
) -> Result<()>;

/// Loads the operator information from the storage.
///
Expand Down Expand Up @@ -116,7 +119,7 @@ pub struct PackageInfo {
pub id: PackageId,
/// The last known checkpoint of the package.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub checkpoint: Option<SerdeEnvelope<MapCheckpoint>>,
pub checkpoint: Option<Checkpoint>,
/// The current package log state
#[serde(default)]
pub state: package::LogState,
Expand Down
11 changes: 7 additions & 4 deletions crates/client/src/storage/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tokio_util::io::ReaderStream;
use walkdir::WalkDir;
use warg_crypto::hash::{AnyHash, Digest, Hash, Sha256};
use warg_protocol::{
registry::{LogId, MapCheckpoint, PackageId},
registry::{LogId, PackageId, TimestampedCheckpoint},
SerdeEnvelope,
};

Expand Down Expand Up @@ -85,12 +85,15 @@ impl FileSystemRegistryStorage {

#[async_trait]
impl RegistryStorage for FileSystemRegistryStorage {
async fn load_checkpoint(&self) -> Result<Option<SerdeEnvelope<MapCheckpoint>>> {
async fn load_checkpoint(&self) -> Result<Option<SerdeEnvelope<TimestampedCheckpoint>>> {
load(&self.base_dir.join("checkpoint")).await
}

async fn store_checkpoint(&self, checkpoint: &SerdeEnvelope<MapCheckpoint>) -> Result<()> {
store(&self.base_dir.join("checkpoint"), checkpoint).await
async fn store_checkpoint(
&self,
ts_checkpoint: &SerdeEnvelope<TimestampedCheckpoint>,
) -> Result<()> {
store(&self.base_dir.join("checkpoint"), ts_checkpoint).await
}

async fn load_packages(&self) -> Result<Vec<PackageInfo>> {
Expand Down
55 changes: 47 additions & 8 deletions crates/protocol/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,73 @@ use anyhow::bail;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::str::FromStr;
use std::time::SystemTime;
use warg_crypto::hash::{AnyHash, Hash, HashAlgorithm, SupportedDigest};
use warg_crypto::prefix::VisitPrefixEncode;
use warg_crypto::{prefix, ByteVisitor, Signable, VisitBytes};
use wasmparser::names::KebabStr;

#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct MapCheckpoint {
pub struct Checkpoint {
pub log_root: AnyHash,
pub log_length: u32,
pub map_root: AnyHash,
}

impl Signable for MapCheckpoint {
const PREFIX: &'static [u8] = b"WARG-MAP-CHECKPOINT-SIGNATURE-V0";
}

impl prefix::VisitPrefixEncode for MapCheckpoint {
impl prefix::VisitPrefixEncode for Checkpoint {
fn visit_pe<BV: ?Sized + ByteVisitor>(&self, visitor: &mut prefix::PrefixEncodeVisitor<BV>) {
visitor.visit_str_raw("WARG-MAP-CHECKPOINT-V0");
visitor.visit_str_raw("WARG-CHECKPOINT-V0");
visitor.visit_unsigned(self.log_length as u64);
visitor.visit_str(&self.log_root.to_string());
visitor.visit_str(&self.map_root.to_string());
}
}

// Manual impls of VisitBytes for VisitPrefixEncode to avoid conflict with blanket impls
impl VisitBytes for MapCheckpoint {
impl VisitBytes for Checkpoint {
fn visit<BV: ?Sized + ByteVisitor>(&self, visitor: &mut BV) {
self.visit_bv(visitor);
}
}

#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TimestampedCheckpoint {
#[serde(flatten)]
pub checkpoint: Checkpoint,
pub timestamp: u64,
}

impl TimestampedCheckpoint {
pub fn new(checkpoint: Checkpoint, time: SystemTime) -> anyhow::Result<Self> {
Ok(Self {
checkpoint,
timestamp: time.duration_since(std::time::UNIX_EPOCH)?.as_secs(),
})
}

pub fn now(checkpoint: Checkpoint) -> anyhow::Result<Self> {
Self::new(checkpoint, SystemTime::now())
}
}

impl Signable for TimestampedCheckpoint {
const PREFIX: &'static [u8] = b"WARG-CHECKPOINT-SIGNATURE-V0";
}

impl prefix::VisitPrefixEncode for TimestampedCheckpoint {
fn visit_pe<BV: ?Sized + ByteVisitor>(&self, visitor: &mut prefix::PrefixEncodeVisitor<BV>) {
visitor.visit_str_raw("WARG-TIMESTAMPED-CHECKPOINT-V0");
visitor.visit_unsigned(self.checkpoint.log_length as u64);
visitor.visit_str(&self.checkpoint.log_root.to_string());
visitor.visit_str(&self.checkpoint.map_root.to_string());
visitor.visit_unsigned(self.timestamp);
}
}

// Manual impls of VisitBytes for VisitPrefixEncode to avoid conflict with blanket impls
impl VisitBytes for TimestampedCheckpoint {
fn visit<BV: ?Sized + ByteVisitor>(&self, visitor: &mut BV) {
self.visit_bv(visitor);
}
Expand Down
2 changes: 1 addition & 1 deletion crates/server/src/api/debug/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async fn get_package_info(
.get_latest_checkpoint()
.await
.context("get_latest_checkpoint")?;
let checkpoint_id = Hash::<Sha256>::of(checkpoint.as_ref()).into();
let checkpoint_id = Hash::<Sha256>::of(&checkpoint.as_ref().checkpoint).into();

let log_id = LogId::package_log::<Sha256>(&package_id);
let records = store
Expand Down
4 changes: 2 additions & 2 deletions crates/server/src/api/v1/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use axum::{
use std::collections::HashMap;
use warg_api::v1::fetch::{FetchError, FetchLogsRequest, FetchLogsResponse};
use warg_crypto::hash::Sha256;
use warg_protocol::registry::{LogId, MapCheckpoint};
use warg_protocol::registry::{LogId, TimestampedCheckpoint};
use warg_protocol::{ProtoEnvelopeBody, SerdeEnvelope};

const DEFAULT_RECORDS_LIMIT: u16 = 100;
Expand Down Expand Up @@ -126,7 +126,7 @@ async fn fetch_logs(
#[debug_handler]
async fn fetch_checkpoint(
State(config): State<Config>,
) -> Result<Json<SerdeEnvelope<MapCheckpoint>>, FetchApiError> {
) -> Result<Json<SerdeEnvelope<TimestampedCheckpoint>>, FetchApiError> {
Ok(Json(
config.core_service.store().get_latest_checkpoint().await?,
))
Expand Down
25 changes: 13 additions & 12 deletions crates/server/src/datastore/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use warg_crypto::{hash::AnyHash, Signable};
use warg_protocol::{
operator,
package::{self, PackageEntry},
registry::{LogId, LogLeaf, MapCheckpoint, PackageId, RecordId},
registry::{LogId, LogLeaf, PackageId, RecordId, TimestampedCheckpoint},
ProtoEnvelope, SerdeEnvelope,
};

Expand Down Expand Up @@ -76,7 +76,7 @@ struct State {
operators: HashMap<LogId, Log<operator::LogState, operator::OperatorRecord>>,
packages: HashMap<LogId, Log<package::LogState, package::PackageRecord>>,
package_ids: BTreeSet<PackageId>,
checkpoints: IndexMap<AnyHash, SerdeEnvelope<MapCheckpoint>>,
checkpoints: IndexMap<AnyHash, SerdeEnvelope<TimestampedCheckpoint>>,
records: HashMap<LogId, HashMap<RecordId, RecordStatus>>,
}

Expand Down Expand Up @@ -105,7 +105,7 @@ impl DataStore for MemoryDataStore {
async fn get_all_checkpoints(
&self,
) -> Result<
Pin<Box<dyn Stream<Item = Result<MapCheckpoint, DataStoreError>> + Send>>,
Pin<Box<dyn Stream<Item = Result<TimestampedCheckpoint, DataStoreError>> + Send>>,
DataStoreError,
> {
Ok(Box::pin(futures::stream::empty()))
Expand Down Expand Up @@ -390,19 +390,20 @@ impl DataStore for MemoryDataStore {
async fn store_checkpoint(
&self,
checkpoint_id: &AnyHash,
checkpoint: SerdeEnvelope<MapCheckpoint>,
ts_checkpoint: SerdeEnvelope<TimestampedCheckpoint>,
) -> Result<(), DataStoreError> {
let mut state = self.0.write().await;

let (_, prev) = state
state
.checkpoints
.insert_full(checkpoint_id.clone(), checkpoint);
assert!(prev.is_none());
.insert(checkpoint_id.clone(), ts_checkpoint);

Ok(())
}

async fn get_latest_checkpoint(&self) -> Result<SerdeEnvelope<MapCheckpoint>, DataStoreError> {
async fn get_latest_checkpoint(
&self,
) -> Result<SerdeEnvelope<TimestampedCheckpoint>, DataStoreError> {
let state = self.0.read().await;
let checkpoint = state.checkpoints.values().last().unwrap();
Ok(checkpoint.clone())
Expand Down Expand Up @@ -433,7 +434,7 @@ impl DataStore for MemoryDataStore {
},
None => 0,
};
let end_registry_idx = checkpoint.as_ref().log_length as u64;
let end_registry_idx = checkpoint.as_ref().checkpoint.log_length as u64;

Ok(log
.entries
Expand Down Expand Up @@ -470,7 +471,7 @@ impl DataStore for MemoryDataStore {
},
None => 0,
};
let end_registry_idx = checkpoint.as_ref().log_length as u64;
let end_registry_idx = checkpoint.as_ref().checkpoint.log_length as u64;

Ok(log
.entries
Expand Down Expand Up @@ -513,7 +514,7 @@ impl DataStore for MemoryDataStore {
let published_length = state
.checkpoints
.last()
.map(|(_, c)| c.as_ref().log_length)
.map(|(_, c)| c.as_ref().checkpoint.log_length)
.unwrap_or_default() as u64;

(
Expand Down Expand Up @@ -567,7 +568,7 @@ impl DataStore for MemoryDataStore {
let published_length = state
.checkpoints
.last()
.map(|(_, c)| c.as_ref().log_length)
.map(|(_, c)| c.as_ref().checkpoint.log_length)
.unwrap_or_default() as u64;

(
Expand Down
10 changes: 6 additions & 4 deletions crates/server/src/datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use thiserror::Error;
use warg_crypto::{hash::AnyHash, signing::KeyID};
use warg_protocol::{
operator, package,
registry::{LogId, LogLeaf, MapCheckpoint, PackageId, RecordId},
registry::{LogId, LogLeaf, PackageId, RecordId, TimestampedCheckpoint},
ProtoEnvelope, SerdeEnvelope,
};

Expand Down Expand Up @@ -102,7 +102,7 @@ pub trait DataStore: Send + Sync {
async fn get_all_checkpoints(
&self,
) -> Result<
Pin<Box<dyn Stream<Item = Result<MapCheckpoint, DataStoreError>> + Send>>,
Pin<Box<dyn Stream<Item = Result<TimestampedCheckpoint, DataStoreError>> + Send>>,
DataStoreError,
>;

Expand Down Expand Up @@ -207,11 +207,13 @@ pub trait DataStore: Send + Sync {
async fn store_checkpoint(
&self,
checkpoint_id: &AnyHash,
checkpoint: SerdeEnvelope<MapCheckpoint>,
ts_checkpoint: SerdeEnvelope<TimestampedCheckpoint>,
) -> Result<(), DataStoreError>;

/// Gets the latest checkpoint.
async fn get_latest_checkpoint(&self) -> Result<SerdeEnvelope<MapCheckpoint>, DataStoreError>;
async fn get_latest_checkpoint(
&self,
) -> Result<SerdeEnvelope<TimestampedCheckpoint>, DataStoreError>;

/// Gets the operator records for the given registry checkpoint ID hash.
async fn get_operator_records(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE checkpoints
DROP COLUMN timestamp;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE checkpoints
ADD COLUMN timestamp BIGINT NOT NULL DEFAULT 0;
Loading

0 comments on commit 8ec954d

Please sign in to comment.