Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist store in DB instead of JSON #60

Closed
petkodes opened this issue Sep 4, 2023 · 1 comment
Closed

Persist store in DB instead of JSON #60

petkodes opened this issue Sep 4, 2023 · 1 comment
Assignees
Labels
meta:triaged This issue has been triaged (has a good description, as well as labels for priority, size and type) p2 Medium priority size:large Large

Comments

@petkodes
Copy link
Collaborator

petkodes commented Sep 4, 2023

Problem statement
In order to save the Radio's state between reruns, currently we persist the state in a JSON file. That does the job of seamlessly restarting the Radio, but as Radio traffic, features, message types, collected data, etc, increases we will need a more scaleable approach to data persistence.

Expectation proposal
We should adopt an approach, similar to the one in Listener Radio, which uses sqlx.

By default, it could use sqlite and still keep all the data in a local file, but more advanced users should be able to provide a postgres endpoint and store the Radio's data there.

Open questions
Should we still keep JSON as an option?

@hopeyen
Copy link
Collaborator

hopeyen commented Sep 11, 2023

  • Create migration file and embed the migration operations in the binary
  • Define sqlx schema for publicPOIMessage, attestations, comparisonResult, upgradeIntentMessage
  • Define query resolver (read and write sql operations to DB)
  • Currently we track validated messages in a PersistedState struct, and write to the stateful json with a fixed interval. We can consider writing to DB in a similar way with batch write operations
  • Make sure to swap out the resolvers for the API server
  • If time allows, we can utilize benchmarks to measure performance improvements

More details

Setting up the database

Below shows an example of the current state json file and the requirement it imposes on the table schemas

{
    "local_attestations": {
        "QmacQnSgia4iDPWHpeY6aWxesRFdb8o5DKZUx96zZqEWrB": {
            "9535300": {
                "ppoi": "0x4469add7c6e9bd2cb30bcb60ec14fcf6db7f184d7bb3e15c7e48f3fff9e86b49",
                "stake_weight": 0,
                "senders": [],
                "sender_group_hash": "a7ffc6f8bf1ed76651c14756a061d662f580ff4de43b49fa82d80a4b80f8434a",
                "timestamp": [
                    1692305224
                ]
            },
        }
    },
    "remote_ppoi_messages": [
        {
            "identifier": "QmacQnSgia4iDPWHpeY6aWxesRFdb8o5DKZUx96zZqEWrB",
            "nonce": 1692373744,
            "graph_account": "0xe9a1cabd57700b17945fd81feefba82340d9568f",
            "payload": {
                "identifier": "QmacQnSgia4iDPWHpeY6aWxesRFdb8o5DKZUx96zZqEWrB",
                "content": "0xb2fec4fab8efa64b67d378cc6a925012ca7b5fb76943dc5de833efa63fab842d",
                "nonce": 1692373744,
                "network": "goerli",
                "block_number": 9539860,
                "block_hash": "4f892929c6fa9e1dc573a80a7f1cff61feedfd153e5f48187e30c8dfc4f65306",
                "graph_account": "0xe9a1cabd57700b17945fd81feefba82340d9568f"
            },
            "signature": "af35edc72a1372f85738c13c728c64395c6b6db5ae4c3558de2fe19bde84f2f24dc38ba654be0db3623759e28836318131b559fd0928c887c0f2e4829f968b501b"
        },
    ],
    "upgrade_intent_messages": [
        {
            "identifier": "QmacQnSgia4iDPWHpeY6aWxesRFdb8o5DKZUx96zZqEWrB",
            "nonce": 1692307513,
            "graph_account": "0xe9a1cabd57700b17945fd81feefba82340d9568f",
            "payload": {
                "subgraph_id": "CnJMdCkW3pr619gsJVtUPAWxspALPdCMw6o7obzYBNp3",
                "new_hash": "QmVVfLWowm1xkqc41vcygKNwFUvpsDSMbHdHghxmDVmH9x",
                "nonce": 1692307513,
                "graph_account": "0xe9a1cabd57700b17945fd81feefba82340d9568f"
            },
            "signature": "2e859ec99d2093efa79d90800512c9cda3fb882a95b6cb64cb5850f3cfc47c7b6ad23ebeeeb6d658b9689c99cb1d9e8fd4595d4ac5c38363dbb80d78b178f4181c"
        }
    ],
    "comparison_results": {
        "QmacQnSgia4iDPWHpeY6aWxesRFdb8o5DKZUx96zZqEWrB": {
            "deployment": "QmacQnSgia4iDPWHpeY6aWxesRFdb8o5DKZUx96zZqEWrB",
            "block_number": 9539860,
            "result_type": "Match",
            "local_attestation": {
                "ppoi": "0xb2fec4fab8efa64b67d378cc6a925012ca7b5fb76943dc5de833efa63fab842d",
                "stake_weight": 0,
                "senders": [],
                "sender_group_hash": "a7ffc6f8bf1ed76651c14756a061d662f580ff4de43b49fa82d80a4b80f8434a",
                "timestamp": [
                    1692374023
                ]
            },
            "attestations": [
                {
                    "ppoi": "0xb2fec4fab8efa64b67d378cc6a925012ca7b5fb76943dc5de833efa63fab842d",
                    "stake_weight": 11227201,
                    "senders": [
                        "0xe9a1cabd57700b17945fd81feefba82340d9568f"
                    ],
                    "sender_group_hash": "e56c14a4e561fce225ea76c2379dfcc33c7082cdcbb07bd605be1328f397eb84",
                    "timestamp": [
                        1692373744
                    ]
                }
            ]
        }
    }
}

We would need a separate table for each structure. This requires setting up the database (sqlx database create)and creating migrations files (sqlx migrate add -r migration_name, we could use one file for the initial setup of all current structs) in which the schemas of each struct follows something similar to

-- For local_attestations --
CREATE TABLE local_attestations (
    id SERIAL PRIMARY KEY,
    identifier VARCHAR(255),
    block_number BIGINT,
    ppoi VARCHAR(255),
    stake_weight BIGINT,
    sender_group_hash VARCHAR(255),
    timestamp BIGINT
);

-- For remote_ppoi_messages --
CREATE TABLE remote_ppoi_messages (
    id SERIAL PRIMARY KEY,
    identifier VARCHAR(255),
    nonce BIGINT,
    graph_account VARCHAR(255),
    content VARCHAR(255),
    network VARCHAR(255),
    block_number BIGINT,
    block_hash VARCHAR(255),
    signature VARCHAR(255)
);

-- For upgrade_intent_messages --
CREATE TABLE upgrade_intent_messages (
    id SERIAL PRIMARY KEY,
    identifier VARCHAR(255),
    nonce BIGINT,
    graph_account VARCHAR(255),
    subgraph_id VARCHAR(255),
    new_hash VARCHAR(255),
    signature VARCHAR(255)
);

-- For comparison_results --
CREATE TABLE comparison_results (
    id SERIAL PRIMARY KEY,
    identifier VARCHAR(255),
    deployment VARCHAR(255),
    block_number BIGINT,
    result_type VARCHAR(255),
    local_attestation_id INTEGER REFERENCES local_attestations(id)
);

and rollback with

DROP TABLE IF EXISTS comparison_results;
-- repeat for tables --

Embed migration

Enforce database to be updated with embedded migration runner

// Create db connection with database_url. Preferably we construct database_url with segregated config fields
let url = format!(
        "sqlite://{}:{}@{}:{}/{}",
        config.postgres_username,
        config.postgres_password,
        config.postgres_host,
        config.postgres_port,
        config.postgres_database
    );
// Establish connection
let db = PgPoolOptions::new()
    .max_connections(50)
    .acquire_timeout(Duration::from_secs(3))
    .connect(&url)
    .await
    .expect("Could not connect to the database");

// Macro to check for database migration
sqlx::migrate!()
    .run(&db)
    .await
    .expect("Could not run migration");

sqlx schema

Utilize sqlx::FromRow to automatically derive sql types for publicPOIMessage, attestations, comparisonResult, upgradeIntentMessage, ....

query/mutation resolvers

Create query and muatation resolvers to replace existing functions from PersistedState struct to keep the database updated.

Query example

pub async fn local_attestation(
    pool: &PgPool,
    identifier: &str,
    block_number: i64,
) -> Result<Option<LocalAttestation>, IndexerError> {
    let attestation = sqlx::query_as!(
        LocalAttestation,
        r#"
        SELECT identifier, block_number, ppoi, stake_weight, sender_group_hash, timestamp
        FROM local_attestations
        WHERE identifier = $1 AND block_number = $2
    "#,
        identifier,
        block_number
    )
    .fetch_optional(pool)
    .await
    .map_err(|e| IndexerError::from(e))?;
    
    Ok(attestation)
}

Insert example for when new local attestation or remote messages are found

pub async fn create_local_attestation(
    pool: &PgPool,
    new_attestation: NewLocalAttestation,
) -> Result<LocalAttestation, IndexerError> {
    sqlx::query!(
        r#"
        INSERT INTO local_attestations (identifier, block_number, ppoi, stake_weight, sender_group_hash, timestamp)
        VALUES ($1, $2, $3, $4, $5, $6)
        "#,
        new_attestation.identifier,
        new_attestation.block_number,
        new_attestation.ppoi,
        new_attestation.stake_weight,
        new_attestation.sender_group_hash,
        new_attestation.timestamp,
    )
    .execute(pool)
    .await
    .map_err(|e| IndexerError::from(e))?;
    
    Ok(new_attestation)
}

Delete example for pruning old data

pub async fn delete_old_remote_messages(
    pool: &PgPool,
    timestamp_threshold: i64, // timestamp to compare messages against
) -> Result<u64, IndexerError> { 
    let deleted_rows = sqlx::query!(
        r#"
        DELETE FROM remote_ppoi_messages
        WHERE timestamp < $1
        "#,
        timestamp_threshold
    )
    .execute(pool)
    .await
    .map_err(|e| IndexerError::from(e))?
    .rows_affected(); // returns the number of rows deleted
    
    Ok(deleted_rows)
}

Swap out existing PersistedState functions with database resolvers

@petkodes petkodes added the meta:triaged This issue has been triaged (has a good description, as well as labels for priority, size and type) label Sep 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
meta:triaged This issue has been triaged (has a good description, as well as labels for priority, size and type) p2 Medium priority size:large Large
Projects
None yet
Development

No branches or pull requests

2 participants