Skip to content

Commit

Permalink
return idx from mapping table instead of full value
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Jul 3, 2024
1 parent ee075df commit 07eeb0b
Showing 1 changed file with 37 additions and 24 deletions.
61 changes: 37 additions & 24 deletions blockstore/src/block_stores/postgres/postgres_mappings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,30 +35,33 @@ pub fn build_create_transaction_mapping_table_statement(epoch: EpochRef) -> Stri
}

// note: sigantures might contain duplicates but that's quite rare and can be ignored for transactions
// TODO return &str
pub async fn perform_transaction_mapping(postgres_session: &PostgresSession, epoch: EpochRef, signatures: &[&str]) -> anyhow::Result<BiMap<String, i32>> {
let started_at = Instant::now();
let schema = PostgresEpoch::build_schema_name(epoch);
let statement = format!(
r#"
WITH
requested_sigs AS (
SELECT signature from unnest($1::text[]) tab(signature)
SELECT signature, row_number() over()::smallint as idx FROM unnest($1::text[]) tab(signature)
),
existed AS
(
SELECT * FROM {schema}.transaction_ids
SELECT signature, transaction_id, idx FROM {schema}.transaction_ids
INNER JOIN requested_sigs USING(signature)
),
inserted AS
(
INSERT INTO {schema}.transaction_ids(signature)
SELECT signature FROM requested_sigs
ON CONFLICT DO NOTHING
RETURNING *
EXCEPT
SELECT signature from existed
RETURNING transaction_id, signature
)
SELECT transaction_id, signature FROM inserted
SELECT transaction_id, idx FROM inserted
INNER JOIN requested_sigs USING(signature)
UNION ALL
SELECT transaction_id, signature FROM existed
SELECT transaction_id, idx FROM existed
"#,
schema = schema
);
Expand All @@ -68,7 +71,9 @@ pub async fn perform_transaction_mapping(postgres_session: &PostgresSession, epo
let mapping_pairs = mappings.iter()
.map(|row| {
let tx_id: i32 = row.get(0);
let tx_sig: String = row.get(1);
let idx: i16 = row.get(1); // 1-based index
assert!(idx < 20000, "idx is 16 bit - might overflow soon");
let tx_sig = signatures[(idx - 1) as usize].to_string();
(tx_sig, tx_id)
});

Expand Down Expand Up @@ -116,23 +121,25 @@ pub async fn perform_account_mapping(postgres_session: &PostgresSession, epoch:
r#"
WITH
requested_account_keys AS (
SELECT account_key from unnest($1::text[]) tab(account_key)
SELECT account_key, row_number() over()::smallint as idx FROM unnest($1::text[]) tab(account_key)
),
existed AS
(
SELECT * FROM {schema}.account_ids
SELECT account_key, acc_id, idx FROM {schema}.account_ids
INNER JOIN requested_account_keys USING(account_key)
),
inserted AS
(
INSERT INTO {schema}.account_ids(account_key)
SELECT account_key FROM requested_account_keys
ON CONFLICT DO NOTHING
RETURNING *
EXCEPT
SELECT account_key from existed
RETURNING account_key, acc_id
)
SELECT acc_id, account_key FROM inserted
SELECT acc_id, idx FROM inserted
INNER JOIN requested_account_keys USING(account_key)
UNION ALL
SELECT acc_id, account_key FROM existed
SELECT acc_id, idx FROM existed
"#,
schema = schema
);
Expand All @@ -142,8 +149,10 @@ pub async fn perform_account_mapping(postgres_session: &PostgresSession, epoch:
let mapping_pairs = mappings.iter()
.map(|row| {
let acc_id: i64 = row.get(0);
let account_key: String = row.get(1);
(account_key, acc_id)
let idx: i16 = row.get(1); // 1-based index
assert!(idx < 20000, "idx is 16 bit - might overflow soon");
let tx_sig = account_keys[(idx - 1) as usize].to_string();
(tx_sig, acc_id)
});

// pubkey <-> acc_id
Expand Down Expand Up @@ -188,23 +197,25 @@ pub async fn perform_blockhash_mapping(postgres_session: &PostgresSession, epoch
r#"
WITH
requested_blockhashes AS (
SELECT blockhash from unnest($1::text[]) tab(blockhash)
SELECT blockhash, row_number() over()::smallint as idx FROM unnest($1::text[]) tab(blockhash)
),
existed AS
(
SELECT * FROM {schema}.blockhash_ids
SELECT blockhash, blockhash_id, idx FROM {schema}.blockhash_ids
INNER JOIN requested_blockhashes USING(blockhash)
),
inserted AS
(
INSERT INTO {schema}.blockhash_ids(blockhash)
SELECT blockhash from requested_blockhashes
ON CONFLICT DO NOTHING
RETURNING *
SELECT blockhash FROM requested_blockhashes
EXCEPT
SELECT blockhash from existed
RETURNING blockhash_id, blockhash
)
SELECT blockhash_id, blockhash FROM inserted
SELECT blockhash_id, idx FROM inserted
INNER JOIN requested_blockhashes USING(blockhash)
UNION ALL
SELECT blockhash_id, blockhash FROM existed
SELECT blockhash_id, idx FROM existed
"#,
schema = schema
);
Expand All @@ -214,8 +225,10 @@ pub async fn perform_blockhash_mapping(postgres_session: &PostgresSession, epoch
let mapping_pairs = mappings.iter()
.map(|row| {
let blockhash_id: i32 = row.get(0);
let blockhash: String = row.get(1);
(blockhash, blockhash_id)
let idx: i16 = row.get(1); // 1-based index
assert!(idx < 20000, "idx is 16 bit - might overflow soon");
let tx_sig = blockhashes[(idx - 1) as usize].to_string();
(tx_sig, blockhash_id)
});

// blockhash <-> blockhash_id
Expand Down

0 comments on commit 07eeb0b

Please sign in to comment.