Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/support event db mgr #578

Merged
merged 30 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3c03fd0
feat: add doc store
imotai Jun 13, 2023
bf6b69b
Merge remote-tracking branch 'origin/main'
imotai Jun 14, 2023
50dd93a
Merge remote-tracking branch 'origin/main'
imotai Jun 19, 2023
99d8aad
Merge remote-tracking branch 'origin/main'
imotai Jun 21, 2023
ea84758
Merge remote-tracking branch 'origin/main'
imotai Jun 23, 2023
3dc6d3a
Merge remote-tracking branch 'origin/main'
imotai Jun 25, 2023
954d205
Merge remote-tracking branch 'origin/main'
imotai Jun 26, 2023
a70c835
Merge remote-tracking branch 'origin/main'
imotai Jun 29, 2023
4da2934
Merge remote-tracking branch 'origin/main'
imotai Jun 29, 2023
e2938c9
Merge remote-tracking branch 'origin/main'
imotai Jul 2, 2023
afaaa0c
Merge remote-tracking branch 'origin/main'
imotai Jul 3, 2023
3851509
Merge remote-tracking branch 'origin/main'
imotai Jul 4, 2023
20e1d28
Merge remote-tracking branch 'origin/main'
imotai Jul 4, 2023
7d74a39
Merge remote-tracking branch 'origin/main'
imotai Jul 8, 2023
e1d65c8
Merge remote-tracking branch 'origin/main'
imotai Jul 9, 2023
f55f1b8
Merge remote-tracking branch 'origin/main'
imotai Jul 11, 2023
73ac492
Merge remote-tracking branch 'origin/main'
imotai Jul 17, 2023
4ea9ea2
Merge remote-tracking branch 'origin/main'
imotai Jul 20, 2023
6d4dae2
Merge remote-tracking branch 'origin/main'
imotai Jul 20, 2023
a03b34f
Merge remote-tracking branch 'origin/main'
imotai Jul 21, 2023
a991f2c
Merge remote-tracking branch 'origin/main'
imotai Jul 24, 2023
91e980a
feat: add index
imotai Jul 24, 2023
1325a6f
test: add index test case
imotai Jul 24, 2023
770cdab
fix: update the error report
imotai Jul 25, 2023
bbd07d4
fix: merge from main
imotai Jul 25, 2023
129d5e9
feat: upgrade version
imotai Jul 25, 2023
b290d08
fix: revert the key and case
imotai Jul 25, 2023
f785921
feat: add delete event db
imotai Jul 26, 2023
17ba16b
feat: resolve the conflict
imotai Jul 26, 2023
1008dfc
fix: revert evm key
imotai Jul 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 6 additions & 7 deletions docker/start_localnet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,22 @@
echo "ADMIN ADDR ${ADMIN_ADDR}"
mkdir -p ./mutation_db ./state_db ./doc_db ./keys ./index_meta_db ./index_doc_db
echo "start store node..."
/usr/bin/db3 store --admin-addr=${ADMIN_ADDR} --public-host 0.0.0.0 --rollup-interval 30000 --contract-addr=0xb9709ce5e749b80978182db1bedfb8c7340039a9 --evm-node-url=https://polygon-mumbai.g.alchemy.com/v2/kiuid-hlfzpnletzqdvwo38iqn0giefr > store.log 2>&1 &
/usr/bin/db3 rollup --admin-addr=${ADMIN_ADDR} --bind-host 0.0.0.0 > rollup.log 2>&1 &
sleep 3
echo "start index node..."
/usr/bin/db3 indexer --admin-addr=${ADMIN_ADDR} --public-host 0.0.0.0 --contract-addr=0xb9709ce5e749b80978182db1bedfb8c7340039a9 --evm-node-url=https://polygon-mumbai.g.alchemy.com/v2/kiuid-hlfzpnletzqdvwo38iqn0giefr> indexer.log 2>&1 &
/usr/bin/db3 index --admin-addr=${ADMIN_ADDR} --bind-host 0.0.0.0 > index.log 2>&1 &
sleep 3

npx serve -l 26629 -s /pages > pages.log 2>&1 &

AR_ADDRESS=`cat /store.log | grep filestore | awk '{print $NF}'`
AR_ADDRESS=`less rollup.log | grep Arweave | awk '{print $NF}'`
STORE_EVM_ADDRESS=`less rollup.log | grep Evm | grep address | awk '{print $NF}'`
echo "the ar account address ${AR_ADDRESS}"
echo "start ar testnet ..."
bash /usr/bin/ar_miner.sh > miner.log 2>&1 &
sleep 2
curl http://127.0.0.1:1984/mint/${AR_ADDRESS}/10000000000000
echo "Start the local db3 nodes successfully"
echo "The storage node url: http://127.0.0.1:26619"
echo "The rollup node url: http://127.0.0.1:26619"
echo "The index node url: http://127.0.0.1:26639"
echo "The console node url: http://127.0.0.1:26629/console"
echo "The setup url: http://127.0.0.1:26629/welcome"
echo "The setup url: http://127.0.0.1:26629"
while true; do sleep 10 ; done
2 changes: 1 addition & 1 deletion sdk/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "db3.js",
"version": "0.4.0",
"version": "0.4.1",
"description": "DB3 Network Javascript API",
"author": "dbpunk labs",
"keywords": [
Expand Down
2 changes: 2 additions & 0 deletions sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ export {
showDatabase,
createCollection,
showCollection,
showCollectionFromIndex,
getDatabase,
getCollection,
addIndex,
deleteEventDatabase
} from './store/database_v2'

export { Index, IndexType } from './proto/db3_database_v2'
Expand Down
16 changes: 15 additions & 1 deletion sdk/src/provider/indexer_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { SystemClient } from '../proto/db3_system.client'
import {
RunQueryRequest,
GetContractSyncStatusRequest,
GetCollectionOfDatabaseRequest,
} from '../proto/db3_indexer'
import { SetupRequest, GetSystemStatusRequest } from '../proto/db3_system'
import { Query } from '../proto/db3_database_v2'
Expand Down Expand Up @@ -84,7 +85,6 @@ export class IndexerProvider {
}
async getContractSyncStatus() {
const request: GetContractSyncStatusRequest = {}

try {
const { response } = await this.client.getContractSyncStatus(
request
Expand All @@ -94,4 +94,18 @@ export class IndexerProvider {
throw new DB3Error(e as RpcError)
}
}

async getCollectionOfDatabase(db: string) {
const request: GetCollectionOfDatabaseRequest = {
dbAddr: db,
}
try {
const { response } = await this.client.getCollectionOfDatabase(
request
)
return response
} catch (e) {
throw new DB3Error(e)
}
}
}
72 changes: 72 additions & 0 deletions sdk/src/store/database_v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,58 @@ import {
DocumentDatabaseMutation,
EventDatabaseMutation,
AddIndexMutation,
DeleteEventDatabaseMutation,
} from '../proto/db3_mutation_v2'

import { Client, ReadClient } from '../client/types'
import { toHEX, fromHEX } from '../crypto/crypto_utils'
import { Index } from '../proto/db3_database_v2'

/**
*
* Delete the event database
*
* ```ts
* const result = await deleteEventDatabase(client,
* "0x....")
* ```
* @param client - the client instance
* @param address - the address of event database
* @returns the {@link MutationResult}
* @note only the owner of event database can delete the event database
*
**/
export async function deleteEventDatabase(client: Client, dbAddress: string) {
const mutation: DeleteEventDatabaseMutation = {}
const body: Mutation_BodyWrapper = {
body: {
oneofKind: 'deleteEventDatabaseMutation',
deleteEventDatabaseMutation: mutation,
},
dbAddress: fromHEX(dbAddress),
}

const dm: Mutation = {
action: MutationAction.DeleteEventDB,
bodies: [body],
}
const payload = Mutation.toBinary(dm)
const response = await client.provider.sendMutation(
payload,
client.nonce.toString()
)
if (response.code == 0) {
client.nonce += 1
return {
id: response.id,
block: response.block,
order: response.order,
} as MutationResult
} else {
throw new Error('fail to create database')
}
}

/**
*
* Create an event database to store contract events
Expand Down Expand Up @@ -424,3 +470,29 @@ export async function showCollection(db: Database) {
})
return collectionList
}

/**
*
* Query collections in the database from the index
*
* ```ts
* const collections = await showCollectionFromIndex(db)
* ```
*
* @param db - the instance of database
* @returns the {@link Collection[]}
*
**/
export async function showCollectionFromIndex(db: Database) {
const response = await db.client.indexer.getCollectionOfDatabase(db.addr)
const collectionList = response.collections.map((c, index) => {
return {
name: c.name,
db,
indexFields: c.indexFields,
internal: c,
state: response.states[index],
} as Collection
})
return collectionList
}
4 changes: 3 additions & 1 deletion src/error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,14 @@ pub enum DB3Error {
DatabaseNotFound(String),
#[error("database with addr {0} already exist")]
DatabaseAlreadyExist(String),
#[error("You have no permission to delete the database")]
DatabasePermissionDenied(),
#[error("collection with name {0} was not found in db {1}")]
CollectionNotFound(String, String),
#[error("collection {0} already exist in db {1}")]
CollectionAlreadyExist(String, String),
#[error("You have no permission to modify the collection")]
CollectionPermssionDenied(),
CollectionPermissionDenied(),
}

pub type Result<T> = std::result::Result<T, DB3Error>;
93 changes: 82 additions & 11 deletions src/node/src/indexer_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use db3_event::event_processor::EventProcessor;
use db3_event::event_processor::EventProcessorConfig;
use db3_proto::db3_indexer_proto::indexer_node_server::IndexerNode;
use db3_proto::db3_indexer_proto::{
ContractSyncStatus, GetContractSyncStatusRequest, GetContractSyncStatusResponse,
RunQueryRequest, RunQueryResponse,
ContractSyncStatus, GetCollectionOfDatabaseRequest, GetCollectionOfDatabaseResponse,
GetContractSyncStatusRequest, GetContractSyncStatusResponse, RunQueryRequest, RunQueryResponse,
};
use db3_proto::db3_mutation_v2_proto::MutationAction;
use db3_proto::db3_storage_proto::block_response::MutationWrapper;
Expand Down Expand Up @@ -201,9 +201,10 @@ impl IndexerNodeImpl {
contract_address: &str,
start_block: u64,
) -> Result<()> {
let db_addr = db.to_hex();
let config = EventProcessorConfig {
evm_node_url: evm_node_url.to_string(),
db_addr: db.to_hex(),
db_addr: db_addr.to_string(),
abi: abi.to_string(),
target_events: tables.iter().map(|t| t.to_string()).collect(),
contract_addr: contract_address.to_string(),
Expand All @@ -217,14 +218,10 @@ impl IndexerNodeImpl {
match self.processor_mapping.lock() {
Ok(mut mapping) => {
//TODO limit the total count
if mapping.contains_key(contract_address) {
warn!("contract addr {} exist", contract_address);
return Err(DB3Error::WriteStoreError(format!(
"contract_addr {} exist",
contract_address
)));
if mapping.contains_key(db_addr.as_str()) {
return Err(DB3Error::DatabaseAlreadyExist(db_addr.to_string()));
}
mapping.insert(contract_address.to_string(), processor.clone());
mapping.insert(db_addr.to_string(), processor.clone());
}
_ => todo!(),
}
Expand All @@ -241,6 +238,22 @@ impl IndexerNodeImpl {
Ok(())
}

fn close_event_task(&self, db: &DB3Address) -> Result<()> {
let addr = db.to_hex();
match self.processor_mapping.lock() {
Ok(mut mapping) => match mapping.remove(addr.as_str()) {
Some(task) => {
task.close();
}
None => {
return Err(DB3Error::DatabaseNotFound(addr.to_string()));
}
},
_ => todo!(),
}
Ok(())
}

async fn parse_and_apply_mutations(&self, mutations: &Vec<MutationWrapper>) -> Result<()> {
for mutation in mutations.iter() {
let header = mutation.header.as_ref().unwrap();
Expand All @@ -252,14 +265,16 @@ impl IndexerNodeImpl {
let action = MutationAction::from_i32(dm.action).ok_or(DB3Error::WriteStoreError(
"fail to convert action type".to_string(),
))?;

let (block, order, doc_ids_map_str) = match &mutation.header {
Some(header) => Ok((header.block_id, header.order_id, &header.doc_ids_map)),
_ => Err(DB3Error::WriteStoreError(
"invalid mutation header".to_string(),
)),
}?;

let doc_ids_map = MutationUtil::convert_doc_ids_map_to_vec(doc_ids_map_str)?;
self.db_store.apply_mutation(
let extra_items = self.db_store.apply_mutation(
action,
dm,
&address,
Expand All @@ -269,6 +284,39 @@ impl IndexerNodeImpl {
order,
&doc_ids_map,
)?;
match action {
MutationAction::CreateEventDb => {
if extra_items.len() > 0 && extra_items[0].key.as_str() == "db_addr" {
let addr = DB3Address::from_hex(extra_items[0].value.as_str())?;
let (collections, _) = self.db_store.get_collection_of_database(&addr)?;
let tables = collections.iter().map(|c| c.name.to_string()).collect();
if let Some(database) = self.db_store.get_event_db(&addr)? {
if let Err(e) = self
.start_an_event_task(
&addr,
database.evm_node_url.as_str(),
database.events_json_abi.as_str(),
&tables,
database.contract_address.as_str(),
0,
)
.await
{
info!("start the event db {} with error {e}", addr.to_hex());
} else {
info!("start event db {} done", addr.to_hex());
}
}
}
}
MutationAction::DeleteEventDb => {
if extra_items.len() > 0 && extra_items[0].key.as_str() == "db_addr" {
let addr = DB3Address::from_hex(extra_items[0].value.as_str())?;
self.close_event_task(&addr)?;
}
}
_ => {}
}
}
Ok(())
}
Expand All @@ -295,6 +343,29 @@ impl IndexerNode for IndexerNodeImpl {
Ok(Response::new(GetContractSyncStatusResponse { status_list }))
}

async fn get_collection_of_database(
&self,
request: Request<GetCollectionOfDatabaseRequest>,
) -> std::result::Result<Response<GetCollectionOfDatabaseResponse>, Status> {
let r = request.into_inner();
let addr = DB3Address::from_hex(r.db_addr.as_str())
.map_err(|e| Status::invalid_argument(format!("invalid database address {e}")))?;
let (collections, collection_states) = self
.db_store
.get_collection_of_database(&addr)
.map_err(|e| Status::internal(format!("fail to get collect of database {e}")))?;

info!(
"query collection count {} with database {}",
collections.len(),
r.db_addr.as_str()
);
Ok(Response::new(GetCollectionOfDatabaseResponse {
collections,
states: collection_states,
}))
}

async fn run_query(
&self,
request: Request<RunQueryRequest>,
Expand Down
10 changes: 10 additions & 0 deletions src/proto/proto/db3_indexer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,23 @@ message ContractSyncStatus {
uint64 event_number = 4;
}

message GetCollectionOfDatabaseResponse {
repeated db3_database_v2_proto.Collection collections = 1;
repeated db3_database_v2_proto.CollectionState states = 2;
}
message GetCollectionOfDatabaseRequest {
string db_addr = 1;
}

message GetContractSyncStatusResponse {
repeated ContractSyncStatus status_list = 1;
}

message GetContractSyncStatusRequest {}

service IndexerNode {
rpc GetContractSyncStatus(GetContractSyncStatusRequest) returns (GetContractSyncStatusResponse) {}
rpc GetCollectionOfDatabase(GetCollectionOfDatabaseRequest) returns (GetCollectionOfDatabaseResponse) {}
// method for query document
rpc RunQuery(RunQueryRequest) returns (RunQueryResponse) {}
}
4 changes: 4 additions & 0 deletions src/proto/proto/db3_mutation_v2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ message MintDocumentDatabaseMutation {
string sender = 6;
}

message DeleteEventDatabaseMutation {}

message AddIndexMutation {
string collection_name = 1;
repeated db3_database_v2_proto.Index index_fields = 2;
Expand Down Expand Up @@ -108,6 +110,7 @@ enum MutationAction {
MintDocumentDB = 6;
MintCollection = 7;
AddIndex = 8;
DeleteEventDB = 9;
}

enum MutationRollupStatus {
Expand All @@ -128,6 +131,7 @@ message Mutation {
MintDocumentDatabaseMutation mint_doc_database_mutation = 6;
MintCollectionMutation mint_collection_mutation = 7;
AddIndexMutation add_index_mutation = 8;
DeleteEventDatabaseMutation delete_event_database_mutation = 9;
}
}
repeated BodyWrapper bodies = 3;
Expand Down