Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not keep a collector in no-update mode. #527

Merged
merged 3 commits into from May 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 3 additions & 8 deletions src/collector/base.rs
Expand Up @@ -74,21 +74,16 @@ impl Collector {

/// Creates a new collector.
///
/// Takes all necessary information from `config`. If `update` is `false`,
/// the collector will not be updated from upstream and only data that has
/// been collected previously will be used. This differs from disabling
/// transports as it will still use whatever is present on disk as
/// potentially updated data.
/// Takes all necessary information from `config`.
pub fn new(
config: &Config,
db: &sled::Db,
update: bool
) -> Result<Self, Failed> {
Self::init(config)?;
Ok(Collector {
cache_dir: config.cache_dir.clone(),
rrdp: rrdp::Collector::new(config, db, update)?,
rsync: rsync::Collector::new(config, update)?,
rrdp: rrdp::Collector::new(config, db)?,
rsync: rsync::Collector::new(config)?,
})
}

Expand Down
9 changes: 2 additions & 7 deletions src/collector/rrdp.rs
Expand Up @@ -79,20 +79,15 @@ pub struct Collector {
impl Collector {
/// Creates a new RRDP collector.
pub fn new(
config: &Config, db: &sled::Db, update: bool
config: &Config, db: &sled::Db,
) -> Result<Option<Self>, Failed> {
if config.disable_rrdp {
return Ok(None)
}

Ok(Some(Collector {
db: db.clone(),
http: if update {
Some(HttpClient::new(config)?)
}
else {
None
},
http: Some(HttpClient::new(config)?),
filter_dubious: !config.allow_dubious_hosts,
fallback_time: FallbackTime::from_config(config),
}))
Expand Down
10 changes: 2 additions & 8 deletions src/collector/rsync.rs
Expand Up @@ -89,10 +89,7 @@ impl Collector {
/// Creates a new rsync collector.
///
/// If use of rsync is disabled via the config, returns `Ok(None)`.
///
/// The collector will not actually run rsync but use whatever files are
/// present already in the working directory if `update` is `false`.
pub fn new(config: &Config, update: bool) -> Result<Option<Self>, Failed> {
pub fn new(config: &Config) -> Result<Option<Self>, Failed> {
if config.disable_rsync {
Ok(None)
}
Expand All @@ -101,10 +98,7 @@ impl Collector {
working_dir: WorkingDir::new(
Self::create_working_dir(config)?
),
command: if update {
Some(RsyncCommand::new(config)?)
}
else { None },
command: Some(RsyncCommand::new(config)?),
filter_dubious: !config.allow_dubious_hosts
}))
}
Expand Down
59 changes: 44 additions & 15 deletions src/engine.rs
Expand Up @@ -97,7 +97,9 @@ pub struct Engine {
db: sled::Db,

/// The collector to load updated data from.
collector: Collector,
///
/// If this is `None`, updates have been disabled.
collector: Option<Collector>,

/// The store to load stored data from.
store: Store,
Expand Down Expand Up @@ -190,7 +192,12 @@ impl Engine {
update: bool,
) -> Result<Self, Failed> {
let db = Self::open_db(&config.cache_dir, config.fresh)?;
let collector = Collector::new(config, &db, update)?;
let collector = if update {
Some(Collector::new(config, &db)?)
}
else {
None
};
let store = Store::new(db.clone());
let mut res = Engine {
tal_dir: config.tal_dir.clone(),
Expand Down Expand Up @@ -316,7 +323,10 @@ impl Engine {
/// This spawns threads and therefore needs to be done after a
/// possible fork.
pub fn ignite(&mut self) -> Result<(), Failed> {
self.collector.ignite()
if let Some(collector) = self.collector.as_mut() {
collector.ignite()?;
}
Ok(())
}

/// Starts a validation run.
Expand All @@ -329,7 +339,10 @@ impl Engine {
&self, processor: P
) -> Result<Run<P>, Failed> {
Ok(Run::new(
self, self.collector.start(), self.store.start(), processor
self,
self.collector.as_ref().map(Collector::start),
self.store.start(),
processor
))
}

Expand All @@ -349,15 +362,19 @@ impl Engine {
/// Cleans the collector and store owned by the engine.
pub fn cleanup(&self) -> Result<(), Failed> {
if !self.dirty_repository {
self.store.cleanup(self.collector.cleanup())?;
self.store.cleanup(
self.collector.as_ref().map(Collector::cleanup)
)?;
}
Ok(())
}

/// Dumps the content of the collector and store owned by the engine.
pub fn dump(&self, dir: &Path) -> Result<(), Failed> {
self.store.dump(dir)?;
self.collector.dump(dir)?;
if let Some(collector) = self.collector.as_ref() {
collector.dump(dir)?;
}
Ok(())
}
}
Expand All @@ -376,7 +393,7 @@ pub struct Run<'a, P> {
validation: &'a Engine,

/// The runner for the collector.
collector: collector::Run<'a>,
collector: Option<collector::Run<'a>>,

/// The runner for the store.
store: store::Run<'a>,
Expand All @@ -392,7 +409,7 @@ impl<'a, P> Run<'a, P> {
/// Creates a new runner from all the parts.
fn new(
validation: &'a Engine,
collector: collector::Run<'a>,
collector: Option<collector::Run<'a>>,
store: store::Run<'a>,
processor: P,
) -> Self {
Expand All @@ -408,7 +425,9 @@ impl<'a, P> Run<'a, P> {
/// value, instead.
pub fn done(self) -> Metrics {
let mut metrics = self.metrics;
self.collector.done(&mut metrics);
if let Some(collector) = self.collector {
collector.done(&mut metrics)
}
self.store.done(&mut metrics);
metrics
}
Expand Down Expand Up @@ -559,10 +578,12 @@ impl<'a, P: ProcessRun> Run<'a, P> {
_info: &TalInfo,
) -> Result<Option<Cert>, Failed> {
// Get the new version, store and return it if it decodes.
if let Some(bytes) = self.collector.load_ta(uri) {
if let Ok(cert) = Cert::decode(bytes.clone()) {
self.store.update_ta(uri, &bytes)?;
return Ok(Some(cert))
if let Some(collector) = self.collector.as_ref() {
if let Some(bytes) = collector.load_ta(uri) {
if let Ok(cert) = Cert::decode(bytes.clone()) {
self.store.update_ta(uri, &bytes)?;
return Ok(Some(cert))
}
}
}

Expand Down Expand Up @@ -638,7 +659,12 @@ impl<'a, P: ProcessRun> PubPoint<'a, P> {
processor: P::PubPoint,
repository_index: Option<usize>,
) -> Result<Self, Failed> {
let collector = run.collector.repository(cert)?;
let collector = if let Some(collector) = run.collector.as_ref() {
collector.repository(cert)?
}
else {
None
};
let store = run.store.repository(cert)?;
Ok(PubPoint {
run, cert, processor, collector, store,
Expand Down Expand Up @@ -1337,7 +1363,10 @@ impl<'a, P: ProcessRun> ValidPubPoint<'a, P> {

// Defer operation if we need to update the repository part where
// the CA lives.
let defer = !self.point.run.collector.was_updated(&cert);
let defer = match self.point.run.collector.as_ref() {
Some(collector) => !collector.was_updated(&cert),
None => false,
};

// If we switch repositories, we need to apply our metrics.
let repository_index = if cert.repository_switch() {
Expand Down
18 changes: 13 additions & 5 deletions src/store.rs
Expand Up @@ -124,7 +124,7 @@ impl Store {
/// started.
pub fn cleanup(
&self,
mut collector: collector::Cleanup,
mut collector: Option<collector::Cleanup>,
) -> Result<(), Failed> {
// Cleanup RRDP repositories
for name in self.db.tree_names() {
Expand All @@ -144,7 +144,9 @@ impl Store {
};

if Repository::new(self, &names, true)?.cleanup_rrdp()? {
collector.retain_rrdp_repository(&uri);
if let Some(collector) = collector.as_mut() {
collector.retain_rrdp_repository(&uri);
}
}
else {
names.drop_trees(&self.db)?;
Expand All @@ -158,7 +160,11 @@ impl Store {
)?.cleanup_rsync(&mut collector)?;

// Cleanup collector.
collector.commit()
if let Some(collector) = collector {
collector.commit()?;
}

Ok(())
}

/// Dumps the content of the store.
Expand Down Expand Up @@ -686,7 +692,7 @@ impl Repository {
/// Registers all rsync modules that have at least one non-expired
/// manifest to be retained by the collector.
fn cleanup_rsync(
self, collector: &mut collector::Cleanup
self, collector: &mut Option<collector::Cleanup>
) -> Result<(), Failed> {
let now = Time::now();

Expand All @@ -702,7 +708,9 @@ impl Repository {
Err(_) => None
};
if let Some(uri) = uri {
collector.retain_rsync_module(&uri);
if let Some(collector) = collector.as_mut() {
collector.retain_rsync_module(&uri);
}
}
else {
self._retain_objects(&key, |_| false)?;
Expand Down