Skip to content

Commit

Permalink
cleaning events
Browse files Browse the repository at this point in the history
  • Loading branch information
gostkin committed Mar 21, 2023
1 parent e25bbf1 commit d717e16
Show file tree
Hide file tree
Showing 12 changed files with 1,198 additions and 41 deletions.
3 changes: 3 additions & 0 deletions utxo-selection-benchmark/Cargo.toml
Expand Up @@ -8,8 +8,11 @@ edition = "2021"
[dependencies]
entity = { git = "https://github.com/dcSpark/carp.git", branch = "egostkin/generate-benchmark-data" }
dcspark-core = { path = "../core" }
utxo-selection = { path = "../utxo-selection" }
cardano-utils = { path = "../cardano-utils" }

anyhow = "1.0.53"
itertools = "0.10.5"
serde = {version = "1.0.144", features = ["derive", "rc"]}
tokio = { version = "1.16.1", features = ["full"] }
hex = "0.4.3"
Expand Down
11 changes: 6 additions & 5 deletions utxo-selection-benchmark/examples/carp_generation.rs
@@ -1,8 +1,5 @@
use std::path::PathBuf;

use utxo_selection_benchmark::mapper::DataMapper;
use utxo_selection_benchmark::tx_event::{TxAsset, TxEvent, TxOutput};
use utxo_selection_benchmark::utils::{dump_hashmap_to_file, dump_hashset_to_file};
use anyhow::{anyhow, Context};
use cardano_multiplatform_lib::address::StakeCredential;
use cardano_multiplatform_lib::crypto::TransactionHash;
Expand All @@ -22,6 +19,9 @@ use std::collections::{HashMap, HashSet, VecDeque};
use std::fs::File;
use std::io::{BufRead, BufReader, Write};
use tracing_subscriber::prelude::*;
use utxo_selection_benchmark::mapper::DataMapper;
use utxo_selection_benchmark::tx_event::{TxAsset, TxEvent, TxOutput};
use utxo_selection_benchmark::utils::{dump_hashmap_to_file, dump_hashset_to_file};

#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
Expand Down Expand Up @@ -356,6 +356,7 @@ fn dump_unparsed_transactions_to_file(
}
Ok(())
}

fn clean_events(
events_output_path: PathBuf,
cleaned_events_output_path: PathBuf,
Expand Down Expand Up @@ -489,7 +490,7 @@ fn get_input_intents(
let has_banned_addresses = parsed_inputs.iter().any(|input| {
input.address.is_none()
|| (input.address.is_some()
&& banned_addresses.contains(&input.address.clone().unwrap()))
&& banned_addresses.contains(&input.address.clone().unwrap()))
});

Ok((
Expand Down Expand Up @@ -599,7 +600,7 @@ mod tests {
101, 106, 250, 127, 137, 49, 211, 112, 238, 220, 189, 229, 84, 138, 171, 84, 242, 131,
186, 7, 51, 239, 48, 123, 135, 235, 45, 50, 19, 86, 67, 142,
])
.unwrap();
.unwrap();
println!("{}", tx_hash.to_hex());
}
}
212 changes: 212 additions & 0 deletions utxo-selection-benchmark/examples/finish_events_parsing.rs
@@ -0,0 +1,212 @@
use anyhow::{anyhow, Context};
use cardano_multiplatform_lib::address::{Address, StakeCredential};
use cardano_multiplatform_lib::error::JsError;
use clap::Parser;
use serde::Deserialize;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::io::{BufRead, BufReader, Write};
use std::path::PathBuf;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use utxo_selection_benchmark::mapper::DataMapper;
use utxo_selection_benchmark::tx_event::{TxEvent, TxOutput};
use utxo_selection_benchmark::utils::{
dump_hashmap_to_file, dump_hashset_to_file, read_hashmap_from_file, read_hashset_from_file,
};

#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
events_path: PathBuf,
cleaned_events_output_path: PathBuf,

unparsed_transaction_addresses: PathBuf,

payment_creds_mapping: PathBuf,
payment_creds_mapping_output: PathBuf,
staking_creds_mapping: PathBuf,
staking_creds_mapping_output: PathBuf,
address_to_mapping: PathBuf,
address_to_mapping_output: PathBuf,
banned_addresses: PathBuf,
banned_addresses_output: PathBuf,
}

#[derive(Parser, Debug)]
#[clap(version)]
pub struct Cli {
/// path to config file
#[clap(long, value_parser)]
config_path: PathBuf,
}

#[tokio::main]
async fn main() {
let result = _main().await;
result.unwrap();
}

async fn _main() -> anyhow::Result<()> {
// Start logging setup block
let fmt_layer = tracing_subscriber::fmt::layer().with_test_writer();

tracing_subscriber::registry().with(fmt_layer).init();

let Cli { config_path } = Cli::parse();

tracing::info!("Config file {:?}", config_path);
let file = File::open(&config_path).with_context(|| {
format!(
"Cannot read config file {path}",
path = config_path.display()
)
})?;
let config: Config = serde_yaml::from_reader(file).with_context(|| {
format!(
"Cannot read config file {path}",
path = config_path.display()
)
})?;

let mut unparsed_addresses_file = if config.unparsed_transaction_addresses.exists()
&& config.unparsed_transaction_addresses.is_file()
{
File::open(config.unparsed_transaction_addresses.clone())?
} else {
return Err(anyhow!(
"can't open input file: {:?}",
config.unparsed_transaction_addresses.clone()
));
};

tracing::info!("loading mappings");

let mut stake_address_to_num =
DataMapper::<StakeCredential>::load_from_file(config.staking_creds_mapping)?;
tracing::info!("stake addresses loaded");

let mut payment_address_to_num =
DataMapper::<StakeCredential>::load_from_file(config.payment_creds_mapping)?;
tracing::info!("payment addresses loaded");

let mut banned_addresses: HashSet<(u64, Option<u64>)> =
read_hashset_from_file(config.banned_addresses)?;
tracing::info!("banned addresses loaded");

let mut address_to_mapping: HashMap<String, (u64, Option<u64>)> =
read_hashmap_from_file(config.address_to_mapping)?;
tracing::info!("address mapping loaded");

tracing::info!("successfully loaded mappings");

let unparsed_addresses_file_lines = BufReader::new(unparsed_addresses_file).lines();
for line in unparsed_addresses_file_lines {
let address = line?;
match cardano_multiplatform_lib::address::Address::from_bech32(address.as_str()) {
Ok(address) => match address.payment_cred() {
None => {
// this is byron output
}
Some(payment) => {
let payment_mapping = payment_address_to_num.add_if_not_presented(payment);
let staking_mapping = address
.staking_cred()
.map(|staking| stake_address_to_num.add_if_not_presented(staking));
address_to_mapping.insert(
address
.to_bech32(None)
.map_err(|err| anyhow!("Can't convert address to bech32: {:?}", err))?,
(payment_mapping, staking_mapping),
);
banned_addresses.insert((payment_mapping, staking_mapping));
}
},
Err(err) => {
tracing::error!("can't parse address: {:?}, addr={:?}", err, address);
}
}
}

tracing::info!("Parsing finished, dumping files");

payment_address_to_num.dump_to_file(config.payment_creds_mapping_output)?;
stake_address_to_num.dump_to_file(config.staking_creds_mapping_output)?;
dump_hashmap_to_file(&address_to_mapping, config.address_to_mapping_output)?;
dump_hashset_to_file(&banned_addresses, config.banned_addresses_output)?;

tracing::info!("Dumping finished, cleaning events");

clean_events(
config.events_path,
config.cleaned_events_output_path,
&banned_addresses,
)?;

tracing::info!("Cleaning finished");

Ok(())
}

fn clean_events(
events_output_path: PathBuf,
cleaned_events_output_path: PathBuf,
banned_addresses: &HashSet<(u64, Option<u64>)>,
) -> anyhow::Result<()> {
let file = File::open(events_output_path)?;
let mut cleaned_file = File::create(cleaned_events_output_path)?;

let reader = BufReader::new(file);
let lines = reader.lines();
for (num, line) in lines.enumerate() {
let event: TxEvent = serde_json::from_str(line?.as_str())?;
let event = match event {
TxEvent::Partial { to } => {
let to: Vec<TxOutput> = to
.into_iter()
.filter(|output| !output.is_byron() && !output.is_banned(&banned_addresses))
.collect();
if !to.is_empty() {
Some(TxEvent::Partial { to })
} else {
None
}
}
TxEvent::Full { mut to, fee, from } => {
if from
.iter()
.any(|input| input.is_byron() || input.is_banned(&banned_addresses))
{
to = to
.into_iter()
.filter(|output| !output.is_byron() && !output.is_banned(&banned_addresses))
.collect();
if !to.is_empty() {
Some(TxEvent::Partial { to })
} else {
None
}
} else {
to = to
.into_iter()
.map(|mut output| {
if output.is_banned(&banned_addresses) {
output.address = None;
}
output
})
.collect();
Some(TxEvent::Full { to, fee, from })
}
}
};
if let Some(event) = event {
cleaned_file.write_all(format!("{}\n", serde_json::to_string(&event)?).as_bytes())?;
}
if num % 100000 == 0 {
tracing::info!("Processed {:?} entries", num + 1);
}
}

Ok(())
}
51 changes: 51 additions & 0 deletions utxo-selection-benchmark/examples/print_tx_hashes.rs
@@ -0,0 +1,51 @@
use anyhow::anyhow;
use clap::Parser;
use entity::prelude::TransactionModel;
use hex;
use serde::Deserialize;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use std::str::FromStr;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

#[derive(Parser, Debug)]
#[clap(version)]
pub struct Cli {
/// path to config file
#[clap(long, value_parser)]
file_path: PathBuf,
}

#[tokio::main]
async fn main() {
let result = _main().await;
result.unwrap();
}

async fn _main() -> anyhow::Result<()> {
let fmt_layer = tracing_subscriber::fmt::layer().with_test_writer();

tracing_subscriber::registry().with(fmt_layer).init();

let Cli { file_path } = Cli::parse();

let mut unparsed_txs_file = if file_path.exists() && file_path.is_file() {
File::open(file_path.clone())?
} else {
return Err(anyhow!("can't open input file: {:?}", file_path.clone()));
};

let mut unparsed_addresses_file_lines = BufReader::new(unparsed_txs_file).lines();
let count = u64::from_str(unparsed_addresses_file_lines.next().unwrap()?.as_str())?;
let mut seen = 0;
for line in unparsed_addresses_file_lines {
let tx: TransactionModel = serde_json::from_str(line?.as_str())?;
println!("hash: {}", hex::encode(tx.hash.clone()));
seen += 1;
}

assert_eq!(seen, count);
Ok(())
}

0 comments on commit d717e16

Please sign in to comment.