-
Notifications
You must be signed in to change notification settings - Fork 11.1k
/
main.rs
169 lines (151 loc) · 6.91 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use anyhow::Result;
use async_trait::async_trait;
use diesel::{dsl::sql, BoolExpressionMethods, Connection, ExpressionMethods, RunQueryDsl};
use mysten_service::metrics::start_basic_prometheus_server;
use prometheus::Registry;
use std::path::PathBuf;
use sui_data_ingestion_core::{
DataIngestionMetrics, FileProgressStore, IndexerExecutor, ReaderOptions, Worker, WorkerPool,
};
use sui_types::full_checkpoint_content::CheckpointData;
use tracing::info;
use suins_indexer::{
get_connection_pool,
indexer::{format_update_field_query, format_update_subdomain_wrapper_query, SuinsIndexer},
models::VerifiedDomain,
schema::domains,
PgConnectionPool,
};
use dotenvy::dotenv;
use std::env;
use tokio::sync::oneshot;
struct SuinsIndexerWorker {
pg_pool: PgConnectionPool,
indexer: SuinsIndexer,
}
impl SuinsIndexerWorker {
/// Creates a transcation that upserts the given name record updates,
/// and deletes the given name record deletions.
///
/// This is done using 1 or 2 queries, depending on whether there are any deletions/updates in the checkpoint.
///
/// - The first query is a bulk insert of all updates, with an upsert on conflict.
/// - The second query is a bulk delete of all deletions.
///
/// You can safely call this with empty updates/deletions as it will return Ok.
fn commit_to_db(
&self,
updates: &[VerifiedDomain],
removals: &[String],
checkpoint_seq_num: u64,
) -> Result<()> {
if updates.is_empty() && removals.is_empty() {
return Ok(());
}
let connection = &mut self.pg_pool.get().unwrap();
connection.transaction(|tx| {
if !updates.is_empty() {
// Bulk insert all updates and override with data.
diesel::insert_into(domains::table)
.values(updates)
.on_conflict(domains::name)
.do_update()
.set((
domains::expiration_timestamp_ms
.eq(sql(&format_update_field_query("expiration_timestamp_ms"))),
domains::nft_id.eq(sql(&format_update_field_query("nft_id"))),
domains::target_address
.eq(sql(&format_update_field_query("target_address"))),
domains::data.eq(sql(&format_update_field_query("data"))),
domains::last_checkpoint_updated
.eq(sql(&format_update_field_query("last_checkpoint_updated"))),
domains::field_id.eq(sql(&format_update_field_query("field_id"))),
// We always want to respect the subdomain_wrapper re-assignment, even if the checkpoint is older.
// That prevents a scenario where we first process a later checkpoint that did an update to the name record (e..g change target address),
// without first executing the checkpoint that created the subdomain wrapper.
// Since wrapper re-assignment can only happen every 2 days, we can't write invalid data here.
domains::subdomain_wrapper_id
.eq(sql(&format_update_subdomain_wrapper_query())),
))
.execute(tx)
.unwrap_or_else(|_| panic!("Failed to process updates: {:?}", updates));
}
if !removals.is_empty() {
// We want to remove from the database all name records that were removed in the checkpoint
// but only if the checkpoint is newer than the last time the name record was updated.
diesel::delete(domains::table)
.filter(
domains::field_id
.eq_any(removals)
.and(domains::last_checkpoint_updated.le(checkpoint_seq_num as i64)),
)
.execute(tx)
.unwrap_or_else(|_| panic!("Failed to process deletions: {:?}", removals));
}
Ok(())
})
}
}
#[async_trait]
impl Worker for SuinsIndexerWorker {
async fn process_checkpoint(&self, checkpoint: CheckpointData) -> Result<()> {
let checkpoint_seq_number = checkpoint.checkpoint_summary.sequence_number;
let (updates, removals) = self.indexer.process_checkpoint(&checkpoint);
// every 1000 checkpoints, we will print the checkpoint sequence number
// to the console to keep track of progress
if checkpoint_seq_number % 1000 == 0 {
info!("Checkpoint sequence number: {}", checkpoint_seq_number);
}
self.commit_to_db(&updates, &removals, checkpoint_seq_number)?;
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<()> {
dotenv().ok();
let (remote_storage, registry_id, subdomain_wrapper_type, name_record_type) = (
env::var("REMOTE_STORAGE").ok(),
env::var("REGISTRY_ID").ok(),
env::var("SUBDOMAIN_WRAPPER_TYPE").ok(),
env::var("NAME_RECORD_TYPE").ok(),
);
let backfill_progress_file_path =
env::var("BACKFILL_PROGRESS_FILE_PATH").unwrap_or("/tmp/backfill_progress".to_string());
let checkpoints_dir = env::var("CHECKPOINTS_DIR").unwrap_or("/tmp/checkpoints".to_string());
println!("Starting indexer with checkpoints dir: {}", checkpoints_dir);
let (_exit_sender, exit_receiver) = oneshot::channel();
let progress_store = FileProgressStore::new(PathBuf::from(backfill_progress_file_path));
let registry: Registry = start_basic_prometheus_server();
mysten_metrics::init_metrics(®istry);
let metrics = DataIngestionMetrics::new(®istry);
let mut executor = IndexerExecutor::new(progress_store, 1, metrics);
let indexer_setup =
if let (Some(registry_id), Some(subdomain_wrapper_type), Some(name_record_type)) =
(registry_id, subdomain_wrapper_type, name_record_type)
{
SuinsIndexer::new(registry_id, subdomain_wrapper_type, name_record_type)
} else {
SuinsIndexer::default()
};
let worker_pool = WorkerPool::new(
SuinsIndexerWorker {
pg_pool: get_connection_pool(),
indexer: indexer_setup,
},
"suins_indexing".to_string(), /* task name used as a key in the progress store */
100, /* concurrency */
);
executor.register(worker_pool).await?;
executor
.run(
PathBuf::from(checkpoints_dir), /* directory should exist but can be empty */
remote_storage, /* remote_read_endpoint: If set */
vec![], /* aws credentials */
ReaderOptions::default(), /* remote_read_batch_size */
exit_receiver,
)
.await?;
Ok(())
}