Skip to content

Commit

Permalink
moving things to basic, cleaning up unused imports
Browse files Browse the repository at this point in the history
  • Loading branch information
MingweiSamuel committed Feb 7, 2020
1 parent a04dd49 commit 5e8db7e
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 114 deletions.
101 changes: 7 additions & 94 deletions src/main.rs
Expand Up @@ -54,17 +54,14 @@
#![deny(unused_must_use)]

#[macro_use] extern crate lazy_static;
// #[macro_use] extern crate tokio;
// #[macro_use] extern crate scan_fmt;

mod util;
mod model;
mod pipeline;

use std::collections::BTreeSet;
use std::collections::HashMap;
use std::error::Error;
use std::path::{ Path, PathBuf };
use std::path::{ PathBuf };
use std::vec::Vec;
use std::sync::Arc;

Expand All @@ -73,15 +70,14 @@ use chrono::offset::Utc;
use futures::future::join_all;
use itertools::Itertools;
use riven::{ RiotApi, RiotApiConfig };
use riven::consts::{ Region, Tier, Queue, QueueType };
use riven::consts::{ Region, Queue, QueueType };
use tokio::fs;
use tokio::task;

use model::summoner::Summoner;
use model::r#match::{ MatchFileKey, Match };
use model::league::League;
use pipeline::basic;
use pipeline::source_fs;
use pipeline::source_api;
use pipeline::mapping_api;
use util::time;
use util::hybitset::HyBitSet;
Expand Down Expand Up @@ -112,89 +108,6 @@ const QUEUE_TYPE: QueueType = QueueType::RANKED_SOLO_5x5;
const QUEUE: Queue = Queue::SUMMONERS_RIFT_5V5_RANKED_SOLO_GAMES;


async fn get_ranked_summoners(region: Region, path_data_local: &PathBuf, pull_ranks: bool)
-> Result<HashMap<String, (Tier, String)>, Box<dyn Error + Send>>
{
let pagination_batch_size: usize = 10;
if pull_ranks {
let future = tokio::spawn(source_api::get_ranked_summoners(
&RIOT_API, QUEUE_TYPE, region, pagination_batch_size));
let hashmap = future.await.map_err(dyn_err)?;
Ok(hashmap)
} else {
let path_data_local = path_data_local.clone();
let future = task::spawn_blocking(move || source_fs::get_ranked_summoners(path_data_local));
let hashmap = future.await
.map_err(dyn_err)?
.map_err(dyn_err)?;
Ok(hashmap)
}
}

fn write_league_ids<RS>(path_data: impl AsRef<Path>, ranked_summoners: RS)
-> std::io::Result<()>
where
RS: AsRef<HashMap<String, (Tier, String)>>
{
let mut leagues = BTreeSet::new();
for (tier, league_id) in ranked_summoners.as_ref().values() {
leagues.insert(League {
league_id: league_id.clone(), //TODO extra clone.
tier: *tier,
});
};
source_fs::write_leagues(path_data, leagues.into_iter().rev())
}

fn write_summoners<RS>(path: impl AsRef<Path>, update_summoner_ts: u64,
updated_summoners_by_id: &mut HashMap<String, Summoner>,
ranked_summoners: RS)
-> Result<(), Box<dyn Error + Send>>
where
RS: AsRef<HashMap<String, (Tier, String)>>
{
let all_summoners = source_fs::get_all_summoners(&path).map_err(dyn_err)?;

match all_summoners {
None => { // THERES NO SUMMONER .CSV.GZ TO READ FROM!
assert!(updated_summoners_by_id.is_empty(), "all_summoners empty but updated_summoners_by_id not empty.");

let summoner_models = ranked_summoners.as_ref().iter()
.map(|(summoner_id, (tier, league_id))| Summoner {
encrypted_summoner_id: summoner_id.clone(), // TODO extra clone.
encrypted_account_id: None,
league_id: Some(league_id.clone()), // TODO extra clone.
rank_tier: Some(*tier),
games_per_day: None,
ts: None,
});

source_fs::write_summoners(&path, summoner_models).map_err(dyn_err)?;
},
Some(all_summoners) => {
// Set timestamps on updated summoner.
let all_summoners = all_summoners.map(move |mut summoner| {
// Update timestamp and games per day (TODO).
if let Some(updated_summoner) = updated_summoners_by_id.remove(&summoner.encrypted_summoner_id) {
summoner.ts = Some(update_summoner_ts);
summoner.encrypted_account_id = updated_summoner.encrypted_account_id;
// TODO update any other things.
}
// Update tiers.
if let Some((tier, league_id)) = ranked_summoners.as_ref().get(&summoner.encrypted_summoner_id) {
summoner.rank_tier = Some(*tier);
summoner.league_id = Some(league_id.clone()); // TODO bad copy.
}
summoner
});

// Write summoners job.
source_fs::write_summoners(&path, all_summoners).map_err(dyn_err)?;
},
};
Ok(())
}


async fn run_async(region: Region, update_size: usize, pull_ranks: bool) -> Result<(), Box<dyn Error>> {
println!("Updating {} in region {:?}", update_size, region);
Expand Down Expand Up @@ -230,7 +143,7 @@ async fn run_async(region: Region, update_size: usize, pull_ranks: bool) -> Resu
move || source_fs::get_oldest_summoners(path_data_local, update_size))
};
// All ranked summoners.
let ranked_summoners = get_ranked_summoners(region, &path_data_local, pull_ranks);
let ranked_summoners = basic::get_ranked_summoners(&RIOT_API, QUEUE_TYPE, region, &path_data_local, pull_ranks);

// Join match bitset and oldest selected summoners.
let (match_hbs, oldest_summoners) = tokio::try_join!(match_hbs, oldest_summoners)?;
Expand Down Expand Up @@ -279,8 +192,8 @@ async fn run_async(region: Region, update_size: usize, pull_ranks: bool) -> Resu
println!("Writing updated summoners.");
let ranked_summoners = ranked_summoners.clone();
let path_data_local = path_data_local.clone();
task::spawn_blocking(move ||
write_summoners(path_data_local, update_summoner_ts, &mut updated_summoners_by_id, ranked_summoners))
task::spawn_blocking(move || basic::write_summoners(
path_data_local, update_summoner_ts, &mut updated_summoners_by_id, ranked_summoners))
};

// Write rank -> league csv
Expand All @@ -289,7 +202,7 @@ async fn run_async(region: Region, update_size: usize, pull_ranks: bool) -> Resu
// TODO: could optimize by onlying doing this when pull_ranks is true.
let ranked_summoners = ranked_summoners.clone();
let path_data = path_data.clone();
task::spawn_blocking(move || write_league_ids(path_data, ranked_summoners))
task::spawn_blocking(move || basic::write_league_ids(path_data, ranked_summoners))
};

// Get new match values.
Expand Down
1 change: 0 additions & 1 deletion src/model/match.rs
Expand Up @@ -12,7 +12,6 @@ pub struct Match {


use chrono::{ Datelike, DateTime };
use chrono::naive::{ NaiveDateTime, IsoWeek };
use chrono::offset::Utc;
use riven::models::match_v4;

Expand Down
96 changes: 96 additions & 0 deletions src/pipeline/basic.rs
@@ -0,0 +1,96 @@
use std::collections::{ BTreeSet, HashMap };
use std::error::Error;
use std::path::{ Path, PathBuf };

use riven::{ RiotApi };
use riven::consts::{ Region, Tier, QueueType };
use tokio::task;

use crate::dyn_err;
use crate::model::summoner::Summoner;
use crate::model::league::League;
use crate::pipeline::{ source_fs, source_api };

pub async fn get_ranked_summoners(riot_api: &'static RiotApi, queue_type: QueueType,
region: Region, path_data_local: &PathBuf, pull_ranks: bool)
-> Result<HashMap<String, (Tier, String)>, Box<dyn Error + Send>>
{
let pagination_batch_size: usize = 10;
if pull_ranks {
let future = tokio::spawn(source_api::get_ranked_summoners(
riot_api, queue_type, region, pagination_batch_size));
let hashmap = future.await.map_err(dyn_err)?;
Ok(hashmap)
} else {
let path_data_local = path_data_local.clone();
let future = task::spawn_blocking(move || source_fs::get_ranked_summoners(path_data_local));
let hashmap = future.await
.map_err(dyn_err)?
.map_err(dyn_err)?;
Ok(hashmap)
}
}

pub fn write_league_ids<RS>(path_data: impl AsRef<Path>, ranked_summoners: RS)
-> std::io::Result<()>
where
RS: AsRef<HashMap<String, (Tier, String)>>
{
let mut leagues = BTreeSet::new();
for (tier, league_id) in ranked_summoners.as_ref().values() {
leagues.insert(League {
league_id: league_id.clone(), //TODO extra clone.
tier: *tier,
});
};
source_fs::write_leagues(path_data, leagues.into_iter().rev())
}

pub fn write_summoners<RS>(path: impl AsRef<Path>, update_summoner_ts: u64,
updated_summoners_by_id: &mut HashMap<String, Summoner>,
ranked_summoners: RS)
-> Result<(), Box<dyn Error + Send>>
where
RS: AsRef<HashMap<String, (Tier, String)>>
{
let all_summoners = source_fs::get_all_summoners(&path).map_err(dyn_err)?;

match all_summoners {
None => { // THERES NO SUMMONER .CSV.GZ TO READ FROM!
assert!(updated_summoners_by_id.is_empty(), "all_summoners empty but updated_summoners_by_id not empty.");

let summoner_models = ranked_summoners.as_ref().iter()
.map(|(summoner_id, (tier, league_id))| Summoner {
encrypted_summoner_id: summoner_id.clone(), // TODO extra clone.
encrypted_account_id: None,
league_id: Some(league_id.clone()), // TODO extra clone.
rank_tier: Some(*tier),
games_per_day: None,
ts: None,
});

source_fs::write_summoners(&path, summoner_models).map_err(dyn_err)?;
},
Some(all_summoners) => {
// Set timestamps on updated summoner.
let all_summoners = all_summoners.map(move |mut summoner| {
// Update timestamp and games per day (TODO).
if let Some(updated_summoner) = updated_summoners_by_id.remove(&summoner.encrypted_summoner_id) {
summoner.ts = Some(update_summoner_ts);
summoner.encrypted_account_id = updated_summoner.encrypted_account_id;
// TODO update any other things.
}
// Update tiers.
if let Some((tier, league_id)) = ranked_summoners.as_ref().get(&summoner.encrypted_summoner_id) {
summoner.rank_tier = Some(*tier);
summoner.league_id = Some(league_id.clone()); // TODO bad copy.
}
summoner
});

// Write summoners job.
source_fs::write_summoners(&path, all_summoners).map_err(dyn_err)?;
},
};
Ok(())
}
4 changes: 1 addition & 3 deletions src/pipeline/hybitset.rs
@@ -1,14 +1,12 @@
use std::path::{ Path, PathBuf };
use std::path::{ Path };
use std::error::Error;

use tokio::fs::{ File, OpenOptions };
use tokio::io::{ AsyncReadExt, AsyncWriteExt };
use riven::consts::Region;

use crate::util::hybitset::HyBitSet;
use crate::util::time;
use crate::util::file_find;
use crate::util::error::PbwError;
use crate::dyn_err;

const FILE_TAG: &'static str = "match_hbs";
Expand Down
1 change: 1 addition & 0 deletions src/pipeline/mod.rs
@@ -1,3 +1,4 @@
pub mod basic;
pub mod filter;
pub mod mapping_api;
pub mod hybitset;
Expand Down
5 changes: 1 addition & 4 deletions src/pipeline/source_api.rs
@@ -1,12 +1,9 @@
use std::collections::HashMap;

use futures::future::join_all;
use riven::consts::{ Region, Tier, Division, QueueType };
use riven::consts::{ Region, Tier, QueueType };
use riven::RiotApi;

use crate::model::summoner::Summoner;
use crate::util::time;


#[allow(dead_code)]
pub async fn get_ranked_summoners(api: &RiotApi, queue_type: QueueType, region: Region, batch_size: usize)
Expand Down
4 changes: 0 additions & 4 deletions src/pipeline/source_fs.rs
@@ -1,12 +1,8 @@
use std::collections::HashMap;
use std::path::{ Path, PathBuf };

use chrono::DateTime;
use chrono::offset::Utc;
use riven::consts::Region;
use riven::consts::Tier;

use crate::util::hybitset::HyBitSet;
use crate::util::csvgz;
use crate::util::file_find;
use crate::util::time;
Expand Down
8 changes: 0 additions & 8 deletions src/util/file_find.rs
@@ -1,16 +1,8 @@
use std::error::Error;
use std::path::{ Path, PathBuf };

use chrono::{ DateTime, Duration };
use chrono::offset::Utc;
use glob::glob_with;
use glob::MatchOptions;

use riven::consts::Region;

use crate::dyn_err;
use crate::util;

lazy_static! {
static ref MATCH_OPTIONS: MatchOptions = MatchOptions {
case_sensitive: false,
Expand Down

0 comments on commit 5e8db7e

Please sign in to comment.