Skip to content

Commit

Permalink
feat: update inscription transfer logic
Browse files Browse the repository at this point in the history
  • Loading branch information
lgalabru committed Mar 23, 2023
1 parent 2ac3022 commit 9d0d106
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 192 deletions.
232 changes: 115 additions & 117 deletions components/chainhook-event-observer/src/indexer/ordinals/db/mod.rs
Expand Up @@ -16,6 +16,8 @@ use crate::{
utils::Context,
};

use super::ord::height::Height;

fn get_default_ordinals_db_file_path(base_dir: &PathBuf) -> PathBuf {
let mut destination_path = base_dir.clone();
destination_path.push("bitcoin_block_traversal.sqlite");
Expand Down Expand Up @@ -54,9 +56,9 @@ pub fn initialize_ordinal_state_storage(path: &PathBuf, ctx: &Context) -> Connec
"CREATE TABLE IF NOT EXISTS inscriptions (
inscription_id TEXT NOT NULL PRIMARY KEY,
outpoint_to_watch TEXT NOT NULL,
satoshi_id TEXT NOT NULL,
ordinal_number INTEGER NOT NULL,
inscription_number INTEGER NOT NULL,
offset NOT NULL
offset INTEGER NOT NULL
)",
[],
) {
Expand All @@ -69,7 +71,7 @@ pub fn initialize_ordinal_state_storage(path: &PathBuf, ctx: &Context) -> Connec
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
}
if let Err(e) = conn.execute(
"CREATE INDEX IF NOT EXISTS index_inscriptions_on_satoshi_id ON inscriptions(satoshi_id);",
"CREATE INDEX IF NOT EXISTS index_inscriptions_on_ordinal_number ON inscriptions(ordinal_number);",
[],
) {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
Expand Down Expand Up @@ -240,7 +242,7 @@ pub fn retrieve_compacted_block_from_index(
) -> Option<CompactedBlock> {
let args: &[&dyn ToSql] = &[&block_id.to_sql().unwrap()];
let mut stmt = storage_conn
.prepare("SELECT compacted_bytes FROM blocks WHERE id = ?1")
.prepare("SELECT compacted_bytes FROM blocks WHERE id = ?")
.unwrap();
let result_iter = stmt
.query_map(args, |row| {
Expand All @@ -255,35 +257,14 @@ pub fn retrieve_compacted_block_from_index(
return None;
}

pub fn scan_existing_inscriptions_id(
inscription_id: &str,
storage_conn: &Connection,
) -> Option<String> {
let args: &[&dyn ToSql] = &[&inscription_id.to_sql().unwrap()];
let mut stmt = storage_conn
.prepare("SELECT inscription_id FROM inscriptions WHERE inscription_id = ?1")
.unwrap();
let result_iter = stmt
.query_map(args, |row| {
let inscription_id: String = row.get(0).unwrap();
Ok(inscription_id)
})
.unwrap();

for result in result_iter {
return Some(result.unwrap());
}
return None;
}

pub fn store_new_inscription(
inscription_data: &OrdinalInscriptionRevealData,
storage_conn: &Connection,
ctx: &Context,
) {
if let Err(e) = storage_conn.execute(
"INSERT INTO inscriptions (inscription_id, outpoint_to_watch, satoshi_id, inscription_number) VALUES (?1, ?2)",
rusqlite::params![&inscription_data.inscription_id, &inscription_data.outpoint_post_inscription, &inscription_data.inscription_id, &inscription_data.inscription_id],
"INSERT INTO inscriptions (inscription_id, outpoint_to_watch, ordinal_number, inscription_number, offset) VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![&inscription_data.inscription_id, &inscription_data.satpoint_post_inscription[0..inscription_data.satpoint_post_inscription.len()-2], &inscription_data.ordinal_number, &inscription_data.inscription_number, 0],
) {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
}
Expand All @@ -297,7 +278,7 @@ pub fn update_transfered_inscription(
ctx: &Context,
) {
if let Err(e) = storage_conn.execute(
"UPDATE inscriptions SET outpoint_to_watch = ?1, offset = ?2 WHERE inscription_id = ?3",
"UPDATE inscriptions SET outpoint_to_watch = ?, offset = ? WHERE inscription_id = ?",
rusqlite::params![&outpoint_post_transfer, &offset, &inscription_id],
) {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
Expand All @@ -314,61 +295,48 @@ pub fn find_last_inscription_number(
"SELECT inscription_number FROM inscriptions ORDER BY inscription_number DESC LIMIT 1",
)
.unwrap();
let result_iter = stmt
.query_map(args, |row| {
let inscription_number: u64 = row.get(0).unwrap();
Ok(inscription_number)
})
.unwrap();

for result in result_iter {
return Ok(result.unwrap());
let mut rows = stmt.query(args).unwrap();
while let Ok(Some(row)) = rows.next() {
let inscription_number: u64 = row.get(0).unwrap();
return Ok(inscription_number);
}
return Ok(0);
Ok(0)
}

pub fn find_inscription_with_satoshi_id(
satoshi_id: &str,
pub fn find_inscription_with_ordinal_number(
ordinal_number: &u64,
storage_conn: &Connection,
ctx: &Context,
) -> Option<String> {
let args: &[&dyn ToSql] = &[&satoshi_id.to_sql().unwrap()];
let args: &[&dyn ToSql] = &[&ordinal_number.to_sql().unwrap()];
let mut stmt = storage_conn
.prepare("SELECT inscription_id FROM inscriptions WHERE satoshi_id = ?1")
.prepare("SELECT inscription_id FROM inscriptions WHERE ordinal_number = ?")
.unwrap();
let result_iter = stmt
.query_map(args, |row| {
let inscription_id: String = row.get(0).unwrap();
Ok(inscription_id)
})
.unwrap();

for result in result_iter {
return Some(result.unwrap());
let mut rows = stmt.query(args).unwrap();
while let Ok(Some(row)) = rows.next() {
let inscription_id: String = row.get(0).unwrap();
return Some(inscription_id);
}
return None;
}

pub fn find_inscriptions_at_wached_outpoint(
txin: &str,
outpoint: &str,
storage_conn: &Connection,
) -> Vec<(String, u64, String, u64)> {
let args: &[&dyn ToSql] = &[&txin.to_sql().unwrap()];
) -> Vec<(String, u64, u64, u64)> {
let args: &[&dyn ToSql] = &[&outpoint.to_sql().unwrap()];
let mut stmt = storage_conn
.prepare("SELECT inscription_id, inscription_number, satoshi_id, offset FROM inscriptions WHERE outpoint_to_watch = ?1 ORDER BY offset ASC")
.prepare("SELECT inscription_id, inscription_number, ordinal_number, offset FROM inscriptions WHERE outpoint_to_watch = ? ORDER BY offset ASC")
.unwrap();
let mut results = vec![];
let result_iter = stmt
.query_map(args, |row| {
let inscription_id: String = row.get(0).unwrap();
let inscription_number: u64 = row.get(1).unwrap();
let satoshi_id: String = row.get(2).unwrap();
let offset: u64 = row.get(1).unwrap();
results.push((inscription_id, inscription_number, satoshi_id, offset));
Ok(())
})
.unwrap();

let mut rows = stmt.query(args).unwrap();
while let Ok(Some(row)) = rows.next() {
let inscription_id: String = row.get(0).unwrap();
let inscription_number: u64 = row.get(1).unwrap();
let ordinal_number: u64 = row.get(2).unwrap();
let offset: u64 = row.get(3).unwrap();
results.push((inscription_id, inscription_number, ordinal_number, offset));
}
return results;
}

Expand Down Expand Up @@ -430,51 +398,61 @@ pub async fn build_bitcoin_traversal_local_storage(
let bitcoin_config = bitcoin_config.clone();
let moved_ctx = ctx.clone();
let block_data_tx_moved = block_data_tx.clone();
let handle_1 = hiro_system_kit::thread_named("Block data retrieval").spawn(move || {
while let Ok(Some((block_height, block_hash))) = block_hash_rx.recv() {
let moved_bitcoin_config = bitcoin_config.clone();
let block_data_tx = block_data_tx_moved.clone();
let moved_ctx = moved_ctx.clone();
retrieve_block_data_pool.execute(move || {
moved_ctx.try_log(|logger| slog::info!(logger, "Fetching block #{block_height}"));
let future = retrieve_full_block_breakdown_with_retry(
&moved_bitcoin_config,
&block_hash,
&moved_ctx,
);
let block_data = hiro_system_kit::nestable_block_on(future).unwrap();
let _ = block_data_tx.send(Some(block_data));
});
let res = retrieve_block_data_pool.join();
res
}
}).expect("unable to spawn thread");

let handle_2 = hiro_system_kit::thread_named("Block data compression").spawn(move || {
while let Ok(Some(block_data)) = block_data_rx.recv() {
let block_compressed_tx_moved = block_compressed_tx.clone();
compress_block_data_pool.execute(move || {
let compressed_block = CompactedBlock::from_full_block(&block_data);
let _ = block_compressed_tx_moved
.send(Some((block_data.height as u32, compressed_block)));
});
let handle_1 = hiro_system_kit::thread_named("Block data retrieval")
.spawn(move || {
while let Ok(Some((block_height, block_hash))) = block_hash_rx.recv() {
let moved_bitcoin_config = bitcoin_config.clone();
let block_data_tx = block_data_tx_moved.clone();
let moved_ctx = moved_ctx.clone();
retrieve_block_data_pool.execute(move || {
moved_ctx
.try_log(|logger| slog::info!(logger, "Fetching block #{block_height}"));
let future = retrieve_full_block_breakdown_with_retry(
&moved_bitcoin_config,
&block_hash,
&moved_ctx,
);
let block_data = hiro_system_kit::nestable_block_on(future).unwrap();
let _ = block_data_tx.send(Some(block_data));
});
let res = retrieve_block_data_pool.join();
res
}
})
.expect("unable to spawn thread");

let handle_2 = hiro_system_kit::thread_named("Block data compression")
.spawn(move || {
while let Ok(Some(block_data)) = block_data_rx.recv() {
let block_compressed_tx_moved = block_compressed_tx.clone();
compress_block_data_pool.execute(move || {
let compressed_block = CompactedBlock::from_full_block(&block_data);
let _ = block_compressed_tx_moved
.send(Some((block_data.height as u32, compressed_block)));
});

let res = compress_block_data_pool.join();
// let _ = block_compressed_tx.send(None);
res
}
}).expect("unable to spawn thread");
let res = compress_block_data_pool.join();
// let _ = block_compressed_tx.send(None);
res
}
})
.expect("unable to spawn thread");

let mut blocks_stored = 0;
while let Ok(Some((block_height, compacted_block))) = block_compressed_rx.recv() {
ctx.try_log(|logger| slog::info!(logger, "Storing block #{block_height}"));
write_compacted_block_to_index(block_height, &compacted_block, &storage_conn, &ctx);
blocks_stored+= 1;
blocks_stored += 1;
if blocks_stored == end_block - start_block {
let _ = block_data_tx.send(None);
let _ = block_hash_tx.send(None);
ctx.try_log(|logger| slog::info!(logger, "Local ordinals storage successfully seeded with #{blocks_stored} blocks"));
return Ok(())
ctx.try_log(|logger| {
slog::info!(
logger,
"Local ordinals storage successfully seeded with #{blocks_stored} blocks"
)
});
return Ok(());
}
}

Expand All @@ -488,7 +466,7 @@ pub fn retrieve_satoshi_point_using_local_storage(
block_identifier: &BlockIdentifier,
transaction_identifier: &TransactionIdentifier,
ctx: &Context,
) -> Result<(u64, u64), String> {
) -> Result<(u64, u64, u64), String> {
let mut ordinal_offset = 0;
let mut ordinal_block_number = block_identifier.index as u32;
let txid = {
Expand All @@ -505,20 +483,23 @@ pub fn retrieve_satoshi_point_using_local_storage(
}
};

let coinbase_txid = &res.0 .0 .0;
let txid = tx_cursor.0;

ctx.try_log(|logger| {
slog::debug!(
slog::info!(
logger,
"{ordinal_block_number}:{:?}:{:?}",
hex::encode(&res.0 .0 .0),
hex::encode(txid)
hex::encode(&coinbase_txid),
hex::encode(&txid)
)
});

// to remove
std::thread::sleep(std::time::Duration::from_millis(300));

// evaluate exit condition: did we reach a coinbase transaction?
let coinbase_txid = &res.0 .0 .0;
if coinbase_txid.eq(&tx_cursor.0) {
// evaluate exit condition: did we reach the **final** coinbase transaction
if coinbase_txid.eq(&txid) {
let coinbase_value = &res.0 .0 .1;
if ordinal_offset.lt(coinbase_value) {
break;
Expand All @@ -527,7 +508,7 @@ pub fn retrieve_satoshi_point_using_local_storage(
// loop over the transaction fees to detect the right range
let cut_off = ordinal_offset - coinbase_value;
let mut accumulated_fees = 0;
for (txid, inputs, outputs) in res.0 .1 {
for (_, inputs, outputs) in res.0 .1 {
let mut total_in = 0;
for (_, _, _, input_value) in inputs.iter() {
total_in += input_value;
Expand Down Expand Up @@ -559,14 +540,14 @@ pub fn retrieve_satoshi_point_using_local_storage(
}
} else {
// isolate the target transaction
for (txid, inputs, outputs) in res.0 .1 {
for (txid_n, inputs, outputs) in res.0 .1 {
// we iterate over the transactions, looking for the transaction target
if !txid.eq(&tx_cursor.0) {
if !txid_n.eq(&txid) {
continue;
}

ctx.try_log(|logger| {
slog::debug!(logger, "Evaluating {}: {:?}", hex::encode(&txid), outputs)
slog::info!(logger, "Evaluating {}: {:?}", hex::encode(&txid_n), outputs)
});

let mut sats_out = 0;
Expand All @@ -575,21 +556,35 @@ pub fn retrieve_satoshi_point_using_local_storage(
break;
}
ctx.try_log(|logger| {
slog::debug!(logger, "Adding {} from output #{}", output_value, index)
slog::info!(logger, "Adding {} from output #{}", output_value, index)
});
sats_out += output_value;
}
sats_out += ordinal_offset;
ctx.try_log(|logger| {
slog::info!(
logger,
"Adding offset {ordinal_offset} to sats_out {sats_out}"
)
});

let mut sats_in = 0;
for (txin, block_height, vout, txin_value) in inputs.into_iter() {
sats_in += txin_value;
ctx.try_log(|logger| {
slog::info!(
logger,
"Adding txin_value {txin_value} to sats_in {sats_in} (txin: {})",
hex::encode(&txin)
)
});

if sats_in >= sats_out {
ordinal_offset = sats_out - (sats_in - txin_value);
ordinal_block_number = block_height;

ctx.try_log(|logger| slog::debug!(logger, "Block {ordinal_block_number} / Tx {} / [in:{sats_in}, out:{sats_out}]: {block_height} -> {ordinal_block_number}:{ordinal_offset} -> {}:{vout}",
hex::encode(&txid),
hex::encode(&txid_n),
hex::encode(&txin)));
tx_cursor = (txin, vout as usize);
break;
Expand All @@ -598,6 +593,9 @@ pub fn retrieve_satoshi_point_using_local_storage(
}
}
}
Ok((ordinal_block_number.into(), ordinal_offset))
}

let height = Height(ordinal_block_number.into());
let ordinal_number = height.starting_sat().0 + ordinal_offset;

Ok((ordinal_block_number.into(), ordinal_offset, ordinal_number))
}

0 comments on commit 9d0d106

Please sign in to comment.