Skip to content

Commit

Permalink
fix(sns-downloader): fixing limit and better hashing (#190)
Browse files Browse the repository at this point in the history
* fixing limit and better hashing

* implementing proper paging
  • Loading branch information
NikolaMilosa committed Feb 13, 2024
1 parent a01361f commit 8584071
Showing 1 changed file with 85 additions and 54 deletions.
139 changes: 85 additions & 54 deletions rs/ic-observability/sns-downloader/src/downloader_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub async fn run_downloader_loop(logger: Logger, cli: CliArgs, stop_signal: Rece
}

let mut current_hash: u64 = 0;
// Can be found: https://sns-api.internetcomputer.org/docs#/snses/list_snses_api_v1_snses_get
// Its the default maximum value
let limit: u64 = 100;

loop {
let tick = crossbeam::select! {
Expand All @@ -37,73 +40,92 @@ pub async fn run_downloader_loop(logger: Logger, cli: CliArgs, stop_signal: Rece
},
recv(interval) -> msg => msg.expect("tick failed!")
};
info!(logger, "Downloading from {} @ interval {:?}", cli.sd_url, tick);

let response = match client.get(cli.sd_url.clone()).send().await {
Ok(res) => res,
Err(e) => {
warn!(
logger,
"Failed to download from {} @ interval {:?}: {:?}", cli.sd_url, tick, e
);
continue;
}
};
let mut current_page: u64 = 0;
let mut snses = vec![];

if !response.status().is_success() {
warn!(
loop {
info!(
logger,
"Received failed status {} @ interval {:?}: {:?}", cli.sd_url, tick, response
"Downloading from {} page {} @ interval {:?}", cli.sd_url, current_page, tick
);
continue;
}
let response = match client
.get(cli.sd_url.clone())
.query(&[("limit", limit), ("offset", current_page * limit)])
.send()
.await
{
Ok(res) => res,
Err(e) => {
warn!(
logger,
"Failed to download from {} @ interval {:?}: {:?}", cli.sd_url, tick, e
);
continue;
}
};

let targets: serde_json::Value = match response.json().await {
Ok(targets) => targets,
Err(e) => {
if !response.status().is_success() {
warn!(
logger,
"Failed to parse response from {} @ interval {:?}: {:?}", cli.sd_url, tick, e
"Received failed status {} @ interval {:?}: {:?}", cli.sd_url, tick, response
);
continue;
}
};

let targets = match &targets["data"] {
serde_json::Value::Array(ar) => ar,
_ => {
warn!(logger, "Didn't receive expected structure of payload");
continue;
}
};
let mut snses = vec![];
for target in targets {
let mut sns = Sns {
description: target["description"].as_str().unwrap().to_string(),
enabled: target["enabled"].as_bool().unwrap(),
root_canister_id: target["root_canister_id"].as_str().unwrap().to_string(),
name: target["name"].as_str().unwrap().to_string(),
url: target["url"].as_str().unwrap().to_string(),
canisters: get_canisters(
&cli,
target["root_canister_id"].as_str().unwrap().to_string(),
&client,
logger.clone(),
)
.await,
let targets: serde_json::Value = match response.json().await {
Ok(targets) => targets,
Err(e) => {
warn!(
logger,
"Failed to parse response from {} @ interval {:?}: {:?}", cli.sd_url, tick, e
);
continue;
}
};

let targets = match &targets["data"] {
serde_json::Value::Array(ar) => ar,
_ => {
warn!(logger, "Didn't receive expected structure of payload");
continue;
}
};
sns.canisters.push(Canister {
canister_id: target["root_canister_id"].as_str().unwrap().to_string(),
canister_type: "root".to_string(),
module_hash: "".to_string(),
});

snses.push(sns)
for target in targets {
let mut sns = Sns {
description: target["description"].as_str().unwrap().to_string(),
enabled: target["enabled"].as_bool().unwrap(),
root_canister_id: target["root_canister_id"].as_str().unwrap().to_string(),
name: target["name"].as_str().unwrap().to_string(),
url: target["url"].as_str().unwrap().to_string(),
canisters: get_canisters(
&cli,
target["root_canister_id"].as_str().unwrap().to_string(),
&client,
logger.clone(),
)
.await,
};
sns.canisters.push(Canister {
canister_id: target["root_canister_id"].as_str().unwrap().to_string(),
canister_type: "root".to_string(),
module_hash: "".to_string(),
});

snses.push(sns)
}

if targets.len() < limit as usize {
break;
}
current_page += 1
}

let mut hasher = DefaultHasher::new();

let targets = snses.into_iter().filter(|f| filters.filter(f)).collect::<Vec<_>>();
let mut targets = snses.into_iter().filter(|f| filters.filter(f)).collect::<Vec<_>>();
targets.sort_by_key(|f| f.root_canister_id.to_string());

for target in &targets {
target.hash(&mut hasher);
Expand All @@ -112,7 +134,14 @@ pub async fn run_downloader_loop(logger: Logger, cli: CliArgs, stop_signal: Rece
let hash = hasher.finish();

if current_hash != hash {
info!(logger, "Received new targets from {} @ interval {:?}", cli.sd_url, tick);
info!(
logger,
"Received new targets from {} @ interval {:?}, old hash '{}' != '{}' new hash",
cli.sd_url,
tick,
current_hash,
hash
);
current_hash = hash;

generate_config(&cli, targets, logger.clone());
Expand Down Expand Up @@ -164,8 +193,7 @@ async fn get_canisters(cli: &CliArgs, root_canister_id: String, client: &Client,
return vec![];
}
};

match &contract["canisters"] {
let mut canisters = match &contract["canisters"] {
serde_json::Value::Array(ar) => ar
.iter()
.map(|val| Canister {
Expand All @@ -181,5 +209,8 @@ async fn get_canisters(cli: &CliArgs, root_canister_id: String, client: &Client,
);
vec![]
}
}
};

canisters.sort_by_key(|c| c.canister_id.to_string());
canisters
}

0 comments on commit 8584071

Please sign in to comment.