Skip to content

Commit

Permalink
Feature flag for requiring global HPKE keys (#3268)
Browse files Browse the repository at this point in the history
  • Loading branch information
inahga committed Jul 8, 2024
1 parent a5905da commit 4bd3236
Show file tree
Hide file tree
Showing 13 changed files with 276 additions and 63 deletions.
101 changes: 55 additions & 46 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@ pub struct Config {
///
/// This option is not stable, and not subject to Janus' typical API/config stability promises.
pub log_forbidden_mutations: Option<PathBuf>,

// If set, always prefer to advertise global HPKE keys. This is implicitly enabled if taskprov
// is enabled.
//
// This will become on by default in a future version of Janus.
pub require_global_hpke_keys: bool,
}

impl Default for Config {
Expand All @@ -245,6 +251,7 @@ impl Default for Config {
task_cache_ttl: TASK_AGGREGATOR_CACHE_DEFAULT_TTL,
task_cache_capacity: TASK_AGGREGATOR_CACHE_DEFAULT_CAPACITY,
log_forbidden_mutations: None,
require_global_hpke_keys: false,
}
}
}
Expand Down Expand Up @@ -300,6 +307,7 @@ impl<C: Clock> Aggregator<C> {
let global_hpke_keypairs = GlobalHpkeKeypairCache::new(
datastore.clone(),
cfg.global_hpke_configs_refresh_interval,
cfg.require_global_hpke_keys || cfg.taskprov_config.enabled,
)
.await?;

Expand Down Expand Up @@ -331,57 +339,58 @@ impl<C: Clock> Aggregator<C> {
task_id_base64: Option<&[u8]>,
) -> Result<(Vec<u8>, Option<Signature>), Error> {
// Retrieve the appropriate HPKE config list.
let hpke_config_list = if self.cfg.taskprov_config.enabled {
// If we're running in taskprov mode, unconditionally provide the global keys and ignore
// the task_id parameter.
let configs = self.global_hpke_keypairs.configs();
if configs.is_empty() {
return Err(Error::Internal(
"this server is missing its global HPKE config".into(),
));
let hpke_config_list =
if self.cfg.taskprov_config.enabled || self.cfg.require_global_hpke_keys {
// If we're running in taskprov mode or requiring global keys, unconditionally
// provide the global keys and ignore the task_id parameter.
let configs = self.global_hpke_keypairs.configs();
if configs.is_empty() {
return Err(Error::Internal(
"this server is missing its global HPKE config".into(),
));
} else {
HpkeConfigList::new(configs.to_vec())
}
} else {
HpkeConfigList::new(configs.to_vec())
}
} else {
// Otherwise, try to get the task-specific key.
match task_id_base64 {
Some(task_id_base64) => {
let task_id_bytes = URL_SAFE_NO_PAD
.decode(task_id_base64)
.map_err(|_| Error::InvalidMessage(None, "task_id"))?;
let task_id = TaskId::get_decoded(&task_id_bytes)
.map_err(|_| Error::InvalidMessage(None, "task_id"))?;
let task_aggregator = self
.task_aggregators
.get(&task_id)
.await?
.ok_or(Error::UnrecognizedTask(task_id))?;

match task_aggregator.handle_hpke_config() {
Some(hpke_config_list) => hpke_config_list,
// Assuming something hasn't gone horribly wrong with the database, this
// should only happen in the case where the system has been moved from taskprov
// mode to non-taskprov mode. Thus there's still taskprov tasks in the database.
// This isn't a supported use case, so the operator needs to delete these tasks
// or move the system back into taskprov mode.
None => {
return Err(Error::Internal("task has no HPKE configs".to_string()))
// Otherwise, try to get the task-specific key.
match task_id_base64 {
Some(task_id_base64) => {
let task_id_bytes = URL_SAFE_NO_PAD
.decode(task_id_base64)
.map_err(|_| Error::InvalidMessage(None, "task_id"))?;
let task_id = TaskId::get_decoded(&task_id_bytes)
.map_err(|_| Error::InvalidMessage(None, "task_id"))?;
let task_aggregator = self
.task_aggregators
.get(&task_id)
.await?
.ok_or(Error::UnrecognizedTask(task_id))?;

match task_aggregator.handle_hpke_config() {
Some(hpke_config_list) => hpke_config_list,
// Assuming something hasn't gone horribly wrong with the database, this
// should only happen in the case where the system has been moved from taskprov
// mode to non-taskprov mode. Thus there's still taskprov tasks in the database.
// This isn't a supported use case, so the operator needs to delete these tasks
// or move the system back into taskprov mode.
None => {
return Err(Error::Internal("task has no HPKE configs".to_string()))
}
}
}
}
// No task ID present, try to fall back to a global config.
None => {
let configs = self.global_hpke_keypairs.configs();
if configs.is_empty() {
// This server isn't configured to provide global HPKE keys, the client
// should have given us a task ID.
return Err(Error::MissingTaskId);
} else {
HpkeConfigList::new(configs.to_vec())
// No task ID present, try to fall back to a global config.
None => {
let configs = self.global_hpke_keypairs.configs();
if configs.is_empty() {
// This server isn't configured to provide global HPKE keys, the client
// should have given us a task ID.
return Err(Error::MissingTaskId);
} else {
HpkeConfigList::new(configs.to_vec())
}
}
}
}
};
};

// Encode & (if configured to do so) sign the HPKE config list.
let encoded_hpke_config_list = hpke_config_list
Expand Down
60 changes: 59 additions & 1 deletion aggregator/src/aggregator/http_handlers/tests/hpke_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
HPKE_CONFIG_SIGNATURE_HEADER,
},
test_util::{hpke_config_signing_key, hpke_config_verification_key},
Config,
Aggregator, Config,
},
config::TaskprovConfig,
};
Expand Down Expand Up @@ -376,3 +376,61 @@ async fn verify_and_decode_hpke_config_list(test_conn: &mut TestConn) -> HpkeCon
.unwrap();
HpkeConfigList::get_decoded(&response_body).unwrap()
}

#[tokio::test]
async fn require_global_hpke_keys() {
let (clock, _ephemeral_datastore, datastore, _) = setup_http_handler_test().await;

// Insert an HPKE config, i.e. start the application with a keypair already
// in the database.
let keypair = generate_test_hpke_config_and_private_key_with_id(1);
datastore
.run_unnamed_tx(|tx| {
let keypair = keypair.clone();
Box::pin(async move {
tx.put_global_hpke_keypair(&keypair).await.unwrap();
tx.set_global_hpke_keypair_state(keypair.config().id(), &HpkeKeyState::Active)
.await
.unwrap();
Ok(())
})
})
.await
.unwrap();

let cfg = Config {
require_global_hpke_keys: true,
hpke_config_signing_key: Some(hpke_config_signing_key()),
..Default::default()
};

let aggregator = Arc::new(
Aggregator::new(
Arc::clone(&datastore),
clock.clone(),
TestRuntime::default(),
&noop_meter(),
cfg,
)
.await
.unwrap(),
);

let handler = aggregator_handler_with_aggregator(aggregator.clone(), &noop_meter())
.await
.unwrap();

let mut test_conn = get(&format!("/hpke_config?task_id={}", &random::<TaskId>()))
.run_async(&handler)
.await;
assert_eq!(test_conn.status(), Some(Status::Ok));
let hpke_config_list = verify_and_decode_hpke_config_list(&mut test_conn).await;
assert_eq!(hpke_config_list.hpke_configs(), &[keypair.config().clone()]);
check_hpke_config_is_usable(&hpke_config_list, &keypair);

let mut test_conn = get("/hpke_config").run_async(&handler).await;
assert_eq!(test_conn.status(), Some(Status::Ok));
let hpke_config_list = verify_and_decode_hpke_config_list(&mut test_conn).await;
assert_eq!(hpke_config_list.hpke_configs(), &[keypair.config().clone()]);
check_hpke_config_is_usable(&hpke_config_list, &keypair);
}
11 changes: 11 additions & 0 deletions aggregator/src/aggregator/key_rotator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ pub struct HpkeKeyRotatorConfig {
pub ciphersuites: HashSet<HpkeCiphersuite>,
}

impl Default for HpkeKeyRotatorConfig {
fn default() -> Self {
Self {
pending_duration: default_pending_duration(),
active_duration: default_active_duration(),
expired_duration: default_expired_duration(),
ciphersuites: default_hpke_ciphersuites(),
}
}
}

impl<C: Clock> KeyRotator<C> {
pub fn new(datastore: Arc<Datastore<C>>, hpke: HpkeKeyRotatorConfig) -> Self {
Self { datastore, hpke }
Expand Down
8 changes: 7 additions & 1 deletion aggregator/src/aggregator/taskprov_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use janus_aggregator_core::{
datastore::{
models::{
AggregateShareJob, AggregationJob, AggregationJobState, BatchAggregation,
BatchAggregationState, ReportAggregation, ReportAggregationState,
BatchAggregationState, HpkeKeyState, ReportAggregation, ReportAggregationState,
},
test_util::{ephemeral_datastore, EphemeralDatastore},
Datastore,
Expand Down Expand Up @@ -138,6 +138,12 @@ where
let peer_aggregator = peer_aggregator.clone();
Box::pin(async move {
tx.put_global_hpke_keypair(&global_hpke_key).await.unwrap();
tx.set_global_hpke_keypair_state(
global_hpke_key.config().id(),
&HpkeKeyState::Active,
)
.await
.unwrap();
tx.put_taskprov_peer_aggregator(&peer_aggregator)
.await
.unwrap();
Expand Down
7 changes: 7 additions & 0 deletions aggregator/src/binaries/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,11 @@ pub struct Config {
/// [`TASK_AGGREGATOR_CACHE_DEFAULT_CAPACITY`]. You shouldn't normally have to specify this.
#[serde(default)]
pub task_cache_capacity: Option<u64>,

/// Experimental. Always advertise global HPKE keys instead of per-task HPKE keys. This will
/// become on by default in a future version of Janus.
#[serde(default)]
pub require_global_hpke_keys: bool,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
Expand Down Expand Up @@ -467,6 +472,7 @@ impl Config {
.task_cache_capacity
.unwrap_or(TASK_AGGREGATOR_CACHE_DEFAULT_CAPACITY),
log_forbidden_mutations: self.log_forbidden_mutations.clone(),
require_global_hpke_keys: self.require_global_hpke_keys,
})
}
}
Expand Down Expand Up @@ -598,6 +604,7 @@ mod tests {
task_cache_ttl_seconds: None,
task_cache_capacity: None,
log_forbidden_mutations: Some(PathBuf::from("/tmp/events")),
require_global_hpke_keys: true,
})
}

Expand Down
2 changes: 1 addition & 1 deletion aggregator/src/binaries/key_rotator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl BinaryConfig for Config {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct KeyRotatorConfig {
#[serde(deserialize_with = "deserialize_hpke_key_rotator_config")]
pub hpke: HpkeKeyRotatorConfig,
Expand Down
Loading

0 comments on commit 4bd3236

Please sign in to comment.