Skip to content
This repository has been archived by the owner on Mar 18, 2023. It is now read-only.

Commit

Permalink
🚧 Crawler: re-write Account::retrieve_sample
Browse files Browse the repository at this point in the history
  • Loading branch information
eigenein committed Sep 7, 2022
1 parent f303c2f commit 792da01
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 65 deletions.
1 change: 1 addition & 0 deletions src/crawler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ impl Crawler {
realm: self.realm,
last_battle_time: Some(account_info.last_battle_time),
partial_tank_stats,
prio: false,
};
let account_snapshot =
database::AccountSnapshot::new(self.realm, &account_info, tank_last_battle_times);
Expand Down
134 changes: 75 additions & 59 deletions src/database/mongodb/models/account.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Error;
use futures::stream::{iter, try_unfold};
use futures::{Stream, TryStreamExt};
use itertools::Itertools;
use mongodb::bson::{doc, Document};
use mongodb::options::*;
use mongodb::{bson, Database, IndexModel};
Expand All @@ -14,9 +14,8 @@ pub use self::partial_tank_stats::*;
pub use self::random::*;
pub use self::rating::*;
pub use self::tank_last_battle_time::*;
use crate::database::mongodb::traits::{Indexes, TypedDocument, Upsert};
use crate::database::mongodb::traits::*;
use crate::prelude::*;
use crate::{format_elapsed, wargaming};

mod id_projection;
mod partial_tank_stats;
Expand All @@ -41,6 +40,9 @@ pub struct Account {

#[serde(default, rename = "pts")]
pub partial_tank_stats: Vec<PartialTankStats>,

#[serde(default)]
pub prio: bool,
}

impl TypedDocument for Account {
Expand All @@ -49,7 +51,7 @@ impl TypedDocument for Account {

#[async_trait]
impl Indexes for Account {
type I = [IndexModel; 2];
type I = [IndexModel; 3];

fn indexes() -> Self::I {
[
Expand All @@ -60,7 +62,14 @@ impl Indexes for Account {
.keys(doc! { "rlm": 1, "aid": 1 })
.options(IndexOptions::builder().unique(true).build())
.build(),
// TODO: priority flag, sparse.
IndexModel::builder()
.keys(doc! { "rlm": 1, "prio": 1 })
.options(
IndexOptions::builder()
.partial_filter_expression(doc! { "prio": {"$eq": true} })
.build(),
)
.build(),
]
}
}
Expand All @@ -72,6 +81,7 @@ impl Account {
realm,
last_battle_time: None,
partial_tank_stats: Vec::new(),
prio: false,
}
}
}
Expand All @@ -96,7 +106,7 @@ impl Account {
pub fn get_sampled_stream(
database: Database,
realm: wargaming::Realm,
sample_size: u32,
sample_size: usize,
min_offset: Duration,
offset_scale: time::Duration,
) -> Result<impl Stream<Item = Result<Self>>> {
Expand Down Expand Up @@ -127,7 +137,7 @@ impl Account {
let filter = doc! { "rlm": realm.to_str(), "aid": account_id };
let update = doc! {
"$setOnInsert": { "lbts": null, "pts": [] },
// TODO: priority flag.
"$set": { "prio": true },
};
let options = UpdateOptions::builder().upsert(true).build();
Self::collection(in_)
Expand All @@ -141,38 +151,67 @@ impl Account {
pub async fn retrieve_sample(
from: &Database,
realm: wargaming::Realm,
after: DateTime,
sample_size: u32,
before: DateTime,
sample_size: usize,
) -> Result<Vec<Account>> {
let filter = doc! {
"rlm": realm.to_str(),
"$or": [ { "lbts": null }, { "lbts": { "$gte": after } } ], // TODO: or priority flag set.
debug!(sample_size, "retrieving…");
let start_instant = Instant::now();

// Retrieve new accounts which have never been crawled yet (their last battle time is `null`):
let mut accounts: Vec<Account> = {
debug!("querying new accounts…");
let options = FindOptions::builder().limit(sample_size as i64).build();
let new_accounts =
Self::find_vec(from, doc! { "rlm": realm.to_str(), "lbts": null }, options).await?;
debug!(n_new_accounts = new_accounts.len(), elapsed = ?start_instant.elapsed());
new_accounts
};

// Retrieve marked accounts:
if accounts.len() != sample_size {
debug!("querying marked high-prio accounts…");
let filter = doc! { "rlm": realm.to_str(), "prio": true };
let options = FindOptions::builder()
.limit((sample_size - accounts.len()) as i64)
.build();
let prio_accounts = Self::find_vec(from, filter, options).await?;
debug!(
n_prio_accounts = prio_accounts.len(),
elapsed = ?start_instant.elapsed(),
);
if !prio_accounts.is_empty() {
debug!(n_prio_accounts = prio_accounts.len(), "clearing the prio flags…");
let query = doc! {
"rlm": realm.to_str(),
"aid": { "$in": prio_accounts.iter().map(|account| account.id).collect_vec() },
};
Self::collection(from)
.update_many(query, doc! { "$set": { "prio": false } }, None)
.await?;
debug!(elapsed = ?start_instant.elapsed(), "cleared the prio flags");
}
accounts.extend(prio_accounts);
}

// Retrieve random selection of accounts:
if accounts.len() != sample_size {
debug!("querying random accounts…");
let filter = doc! {
"rlm": realm.to_str(),
"$and": [ { "lbts": { "$ne": null } }, { "lbts": { "$lte": before } } ],
};
let options = FindOptions::builder()
.sort(doc! { "lbts": -1 })
.limit((sample_size - accounts.len()) as i64)
.build();
let random_accounts = Self::find_vec(from, filter, options).await?;
debug!(
n_random_accounts = random_accounts.len(),
elapsed = ?start_instant.elapsed(),
);
accounts.extend(random_accounts);
};
let options = FindOptions::builder()
.sort(doc! { "lbts": 1 })
.limit(sample_size as i64)
.build();

let start_instant = Instant::now();
debug!(sample_size, "retrieving a sample…");
let accounts: Vec<Account> = Self::collection(from)
.find(filter, options)
.await?
.try_collect()
.await?;
/* TODO: reset priority flags.
let reset_updated_at_ids = accounts
.iter()
.filter_map(|account| account.updated_at.is_none().then_some(account.id))
.collect_vec();
Self::reset_updated_at(from, realm, &reset_updated_at_ids).await?;
*/

debug!(
n_accounts = accounts.len(),
elapsed = format_elapsed(start_instant).as_str(),
"done",
);
Ok(accounts)
}

Expand All @@ -189,27 +228,4 @@ impl Account {
debug!(elapsed = ?start_instant.elapsed());
Ok(account)
}

#[instrument(level = "info", skip_all)]
async fn reset_updated_at(
from: &Database,
realm: wargaming::Realm,
account_ids: &[wargaming::AccountId],
) -> Result {
debug!(n_accounts = account_ids.len());
if account_ids.is_empty() {
return Ok(());
}
Self::collection(from)
.update_many(
doc! {
"rlm": realm.to_str(),
"aid": { "$in": account_ids },
},
vec![doc! { "$set": { "u": "$$NOW" } }], // TODO: `$unset` priority flag.
None,
)
.await?;
Ok(())
}
}
22 changes: 20 additions & 2 deletions src/database/mongodb/traits.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,38 @@
use std::fmt::Debug;

use futures::TryStreamExt;
use mongodb::bson::Document;
use mongodb::options::{UpdateModifications, UpdateOptions, WriteConcern};
use mongodb::options::{FindOptions, UpdateModifications, UpdateOptions, WriteConcern};
use mongodb::{Collection, Database, IndexModel};
use serde::de::DeserializeOwned;
use tokio::spawn;
use tokio::time::timeout;

use crate::prelude::*;

pub trait TypedDocument: 'static + Sized + Send {
#[async_trait]
pub trait TypedDocument: 'static + Sized + Send + Sync + DeserializeOwned + Unpin {
const NAME: &'static str;

#[inline]
fn collection(in_: &Database) -> Collection<Self> {
in_.collection(Self::NAME)
}

#[inline]
async fn find_vec(
in_: &Database,
filter: impl Into<Option<Document>> + Send,
options: impl Into<Option<FindOptions>> + Send,
) -> Result<Vec<Self>> {
Self::collection(in_)
.find(filter, options)
.await
.with_context(|| format!("failed to search in `{}`", Self::NAME))?
.try_collect()
.await
.with_context(|| format!("failed to collect a vector from `{}`", Self::NAME))
}
}

#[async_trait]
Expand Down
1 change: 0 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use sentry::integrations::anyhow::capture_anyhow;

use crate::opts::{Opts, Subcommand};
use crate::prelude::*;
use crate::tracing::format_elapsed;

mod crawler;
pub mod database;
Expand Down
6 changes: 3 additions & 3 deletions src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub struct CrawlerOpts {
/// Minimum last battle time offset.
#[clap(
long,
default_value = "8hours",
default_value = "0s",
parse(try_from_str = humantime::parse_duration),
env = "BLITZ_DASHBOARD_CRAWLER_MIN_OFFSET",
)]
Expand All @@ -99,10 +99,10 @@ pub struct CrawlerOpts {
#[clap(
long,
default_value = "100",
parse(try_from_str = parsers::non_zero_u32),
parse(try_from_str = parsers::non_zero_usize),
env = "BLITZ_DASHBOARD_CRAWLER_SAMPLE_SIZE",
)]
pub sample_size: u32,
pub sample_size: usize,

#[clap(long, env = "BLITZ_DASHBOARD_CRAWLER_HEARTBEAT_URL")]
pub heartbeat_url: Option<String>,
Expand Down
1 change: 1 addition & 0 deletions src/opts/parsers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub fn non_zero_usize(value: &str) -> Result<usize> {
}
}

#[allow(dead_code)]
pub fn non_zero_u32(value: &str) -> Result<u32> {
match FromStr::from_str(value)? {
value if value >= 1 => Ok(value),
Expand Down

0 comments on commit 792da01

Please sign in to comment.