Skip to content

Commit

Permalink
Fix compilation
Browse files Browse the repository at this point in the history
  • Loading branch information
mrferris committed Sep 11, 2023
1 parent fdb948b commit 47695dc
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 33 deletions.
73 changes: 42 additions & 31 deletions glados-cartographer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,14 @@ impl DHTCensus {
let finished = self.finished.read().await.len();
let errored = self.errored.read().await.len();

if known > 2 {
return true;
}

if known == 0 {
false
} else {
errored + finished == known
errored + finished > 3
}
}

Expand Down Expand Up @@ -208,7 +212,7 @@ impl DHTCensus {
///
/// 1. Start with a random node-id
/// 2. Use RFN with a random node-id to initialize our view of the network
/// 3. For each node-id, enumerate it's routing table entries until we find empty buckets.
/// 3. For each node-id, enumerate its routing table entries until we find empty buckets.
/// 4. Track all seen node-ids until we find no new ones.
async fn perform_dht_census(config: CartographerConfig, conn: DatabaseConnection) {
let client = match &config.transport {
Expand All @@ -227,10 +231,8 @@ async fn perform_dht_census(config: CartographerConfig, conn: DatabaseConnection
TransportConfig::IPC(_path) => panic!("not implemented"),
};

// let target = NodeId::random();
let target_enr = generate_random_remote_enr().1;
let target = target_enr.node_id();
let target_display = H256::from(target.raw());
let census = Arc::new(DHTCensus::new());

// Initial un-processed ENRs to be pinged
Expand All @@ -245,13 +247,19 @@ async fn perform_dht_census(config: CartographerConfig, conn: DatabaseConnection
);

// Initialize our search with a random-ish set of ENRs
let initial_enrs = match client.recursive_find_nodes(target).await {
Ok(initial_enrs) => initial_enrs,
Err(err) => {
error!(target.node_id=?H256::from(target.raw()), err=?err, "Error during census initialization");
return;
}
};
// let initial_enrs = match client.recursive_find_nodes(target).await {
// Ok(initial_enrs) => initial_enrs,
// Err(err) => {
// error!(target.node_id=?H256::from(target.raw()), err=?err, "Error during census initialization");
// return;
// }
// };

let initial_enrs = vec![
generate_random_remote_enr().1,
generate_random_remote_enr().1,
generate_random_remote_enr().1,
];

for enr in initial_enrs {
census.add_known(NodeId(enr.node_id().raw())).await;
Expand Down Expand Up @@ -327,13 +335,10 @@ async fn perform_dht_census(config: CartographerConfig, conn: DatabaseConnection
ping_handle.abort();
enumerate_handle.abort();

let census_model = match census::create(
census.started_at,
census.duration().num_seconds().try_into().unwrap(), // should always fit into u32
&conn,
)
.await
{
let duration: u32 = census.duration().num_seconds().try_into().unwrap();
info!("Duration: {}", duration);

let census_model = match census::create(census.started_at, duration, &conn).await {
Ok(census_model) => census_model,
Err(err) => {
error!(err=?err, "Error saving census model to database");
Expand Down Expand Up @@ -373,22 +378,24 @@ async fn perform_dht_census(config: CartographerConfig, conn: DatabaseConnection
/// Sub-component of perform_dht_census()
///
async fn orchestrate_liveliness_checks(
mut rx: mpsc::Receiver<Enr>,
tx: mpsc::Sender<Enr>,
mut to_ping_rx: mpsc::Receiver<Enr>,
to_enumerate_tx: mpsc::Sender<Enr>,
census: Arc<DHTCensus>,
config: CartographerConfig,
conn: DatabaseConnection,
limiter: Arc<Semaphore>,
) {
while let Some(enr) = rx.recv().await {
while let Some(enr) = to_ping_rx.recv().await {
error!("In liveness, acquiring permit");
let permit = limiter
.clone()
.acquire_owned()
.await
.expect("Unable to acquire permit");
error!("In liveness, acquired permit");
let handle = do_liveliness_check(
enr,
tx.clone(),
to_enumerate_tx.clone(),
census.clone(),
config.clone(),
conn.clone(),
Expand All @@ -402,7 +409,7 @@ async fn orchestrate_liveliness_checks(

async fn do_liveliness_check(
enr: Enr,
tx: mpsc::Sender<Enr>,
to_enumerate_tx: mpsc::Sender<Enr>,
census: Arc<DHTCensus>,
config: CartographerConfig,
conn: DatabaseConnection,
Expand Down Expand Up @@ -446,8 +453,8 @@ async fn do_liveliness_check(
.add_alive(enr.clone(), record_model.id, pong_info.data_radius)
.await;

// Send enr to process that enumerates it's routing table
match tx.send(enr.clone()).await {
// Send enr to process that enumerates its routing table
match to_enumerate_tx.send(enr.clone()).await {
Ok(_) => (),
Err(err) => {
error!(err=?err, "Error queueing enr for routing table enumeration");
Expand All @@ -465,19 +472,22 @@ async fn do_liveliness_check(
}

async fn orchestrate_routing_table_enumerations(
mut rx: mpsc::Receiver<Enr>,
tx: mpsc::Sender<Enr>,
mut to_enumerate_rx: mpsc::Receiver<Enr>,
to_ping_tx: mpsc::Sender<Enr>,
census: Arc<DHTCensus>,
config: CartographerConfig,
limiter: Arc<Semaphore>,
) {
while let Some(enr) = rx.recv().await {
while let Some(enr) = to_enumerate_rx.recv().await {
error!("In enumeration, acquiring permit");
let permit = limiter
.clone()
.acquire_owned()
.await
.expect("Unable to acquire permit");
let handle = do_routing_table_enumeration(enr, tx.clone(), census.clone(), config.clone());
error!("In enumeration, acquired permit");
let handle =
do_routing_table_enumeration(enr, to_ping_tx.clone(), census.clone(), config.clone());
tokio::spawn(async move {
handle.await;
drop(permit);
Expand All @@ -487,7 +497,7 @@ async fn orchestrate_routing_table_enumerations(

async fn do_routing_table_enumeration(
enr: Enr,
tx: mpsc::Sender<Enr>,
to_ping_tx: mpsc::Sender<Enr>,
census: Arc<DHTCensus>,
config: CartographerConfig,
) {
Expand Down Expand Up @@ -521,7 +531,8 @@ async fn do_routing_table_enumeration(
continue;
} else {
census.add_known(NodeId(found_enr.node_id().raw())).await;
tx.send(found_enr)
to_ping_tx
.send(found_enr)
.await
.expect("Error queuing liveliness check");
}
Expand Down
2 changes: 2 additions & 0 deletions migration/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub use sea_orm_migration::prelude::*;

mod m20230508_111707_create_census_tables;
mod m20230511_104804_create_node;
mod m20230511_104811_create_record;
mod m20230511_104814_create_content;
Expand All @@ -21,6 +22,7 @@ impl MigratorTrait for Migrator {
Box::new(m20230511_104830_create_content_audit::Migration),
Box::new(m20230511_104838_create_execution_metadata::Migration),
Box::new(m20230511_104937_create_key_value::Migration),
Box::new(m20230508_111707_create_census_tables::Migration),
]
}
}
13 changes: 11 additions & 2 deletions migration/src/m20230508_111707_create_census_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,17 @@ impl MigrationTrait for Migration {
.auto_increment()
.primary_key(),
)
.col(ColumnDef::new(Census::StartedAt).date_time().not_null())
.col(ColumnDef::new(Census::Duration).unsigned().not_null())
.col(
ColumnDef::new(Census::StartedAt)
.timestamp_with_time_zone()
.not_null(),
)
.col(
ColumnDef::new(Census::Duration)
.integer()
.unsigned()
.not_null(),
)
.to_owned(),
)
.await?;
Expand Down

0 comments on commit 47695dc

Please sign in to comment.