Skip to content

Commit

Permalink
Merge branch 'alpha' into Diagnose-al2-premerge
Browse files Browse the repository at this point in the history
  • Loading branch information
rdlrt committed Jan 10, 2022
2 parents 8002d36 + fe254ca commit e381325
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 122 deletions.
37 changes: 37 additions & 0 deletions files/grest/cron/jobs/active-stake-cache-update.sh
@@ -0,0 +1,37 @@
#!/bin/bash
DB_NAME=cexplorer

echo "$(date +%F_%H:%M:%S) Running active stake cache update..."

# High level check in db to see if update needed at all (should be updated only once on epoch transition)
[[ $(psql ${DB_NAME} -qbt -c "SELECT grest.active_stake_cache_update_check();" | tail -2 | tr -cd '[:alnum:]') != 't' ]] &&
echo "No update needed, exiting..." &&
exit 0;

# This could break due to upstream changes on db-sync (based on log format)
last_epoch_stakes_log=$(grep -r 'Handling.*.stakes for epoch ' "$(dirname "$0")"/../../logs/dbsync-*.json "$(dirname "$0")"/../../logs/archive/dbsync-*.json 2>/dev/null | sed -e 's#.*.Handling ##' -e 's#stakes for epoch##' -e 's# slot .*.$##' | sort -k2 -n | tail -1)
[[ -z ${last_epoch_stakes_log} ]] &&
echo "Could not find any 'Handling stakes' log entries, exiting..." &&
exit 1;

logs_last_epoch_stakes_count=$(echo "${last_epoch_stakes_log}" | cut -d\ -f1)
logs_last_epoch_no=$(echo "${last_epoch_stakes_log}" | cut -d\ -f3)

db_last_epoch_no=$(psql ${DB_NAME} -qbt -c "SELECT grest.get_current_epoch();" | tr -cd '[:alnum:]')
[[ "${db_last_epoch_no}" != "${logs_last_epoch_no}" ]] &&
echo "Mismatch between last epoch in logs and database, exiting..." &&
exit 1;

# Count current epoch entries processed by db-sync
db_epoch_stakes_count=$(psql ${DB_NAME} -qbt -c "SELECT grest.get_epoch_stakes_count(${db_last_epoch_no});" | tr -cd '[:alnum:]')

# Check if db-sync completed handling stakes
[[ "${db_epoch_stakes_count}" != "${logs_last_epoch_stakes_count}" ]] &&
echo "Logs last epoch stakes count: ${logs_last_epoch_stakes_count}" &&
echo "DB last epoch stakes count: ${db_epoch_stakes_count}" &&
echo "db-sync stakes handling still incomplete, exiting..." &&
exit 0;

# Stakes have been validated, run the cache update
psql ${DB_NAME} -qbt -c "SELECT GREST.active_stake_cache_update(${db_last_epoch_no});" 2>&1 1>/dev/null
echo "$(date +%F_%H:%M:%S) Job done!"
265 changes: 151 additions & 114 deletions files/grest/rpc/01_cached_tables/active_stake_cache.sql
Expand Up @@ -8,129 +8,166 @@ CREATE TABLE IF NOT EXISTS GREST.POOL_ACTIVE_STAKE_CACHE (
PRIMARY KEY (POOL_ID, EPOCH_NO)
);

INSERT INTO GREST.POOL_ACTIVE_STAKE_CACHE
SELECT
POOL_HASH.VIEW,
EPOCH_STAKE.EPOCH_NO,
SUM(EPOCH_STAKE.AMOUNT) AS AMOUNT
FROM
EPOCH_STAKE
INNER JOIN POOL_HASH ON POOL_HASH.ID = EPOCH_STAKE.POOL_ID
GROUP BY
POOL_HASH.VIEW,
EPOCH_STAKE.EPOCH_NO
ON CONFLICT (POOL_ID,
EPOCH_NO)
DO UPDATE SET
AMOUNT = EXCLUDED.AMOUNT;

-- Trigger for inserting new pool active stake values on epoch transition
DROP FUNCTION IF EXISTS GREST.POOL_ACTIVE_STAKE_EPOCH_UPDATE CASCADE;

CREATE FUNCTION GREST.POOL_ACTIVE_STAKE_EPOCH_UPDATE ()
RETURNS TRIGGER
AS $pool_active_stake_epoch_update$
DECLARE
_pool_id_bech32 varchar;
BEGIN
SELECT
ph.view
FROM
pool_hash ph
WHERE
ph.id = NEW.pool_id INTO _pool_id_bech32;
-- Insert or update cache table
<< insert_update >> LOOP
UPDATE
grest.POOL_ACTIVE_STAKE_CACHE
SET
amount = amount + NEW.amount
WHERE
pool_id = _pool_id_bech32
AND epoch_no = NEW.epoch_no;
EXIT insert_update
WHEN found;
BEGIN
INSERT INTO grest.POOL_ACTIVE_STAKE_CACHE (pool_id, epoch_no, amount)
VALUES (_pool_id_bech32, NEW.epoch_no, NEW.amount);
EXIT insert_update;
EXCEPTION
WHEN UNIQUE_VIOLATION THEN
RAISE NOTICE 'Unique violation for pool: %, epoch: %', _pool_id_bech32, NEW.epoch_no;
END;
END LOOP
insert_update;
RETURN NULL;
END;

$pool_active_stake_epoch_update$
LANGUAGE PLPGSQL;

DROP TRIGGER IF EXISTS POOL_ACTIVE_STAKE_EPOCH_UPDATE_TRIGGER ON PUBLIC.EPOCH_STAKE;

CREATE TRIGGER POOL_ACTIVE_STAKE_EPOCH_UPDATE_TRIGGER
AFTER INSERT ON PUBLIC.EPOCH_STAKE
FOR EACH ROW
EXECUTE FUNCTION GREST.POOL_ACTIVE_STAKE_EPOCH_UPDATE ();

--------------------------------------------------------------------------------
-- Epoch total active stake cache setup
--------------------------------------------------------------------------------
CREATE TABLE IF NOT EXISTS GREST.EPOCH_ACTIVE_STAKE_CACHE (
EPOCH_NO bigint NOT NULL,
AMOUNT LOVELACE NOT NULL,
PRIMARY KEY (EPOCH_NO)
);

INSERT INTO GREST.EPOCH_ACTIVE_STAKE_CACHE
SELECT
EPOCH_STAKE.EPOCH_NO,
SUM(EPOCH_STAKE.AMOUNT) AS AMOUNT
FROM
EPOCH_STAKE
GROUP BY
EPOCH_STAKE.EPOCH_NO
ON CONFLICT (EPOCH_NO)
DO UPDATE SET
AMOUNT = EXCLUDED.AMOUNT;

-- Trigger for inserting new epoch active stake totals on epoch transition
DROP FUNCTION IF EXISTS GREST.EPOCH_ACTIVE_STAKE_EPOCH_UPDATE CASCADE;

CREATE FUNCTION GREST.EPOCH_ACTIVE_STAKE_EPOCH_UPDATE ()
RETURNS TRIGGER
AS $epoch_active_stake_epoch_update$
BEGIN
-- Insert or update cache table
<< insert_update >> LOOP
UPDATE
grest.EPOCH_ACTIVE_STAKE_CACHE
SET
amount = amount + NEW.amount
WHERE
epoch_no = NEW.epoch_no;
EXIT insert_update
WHEN found;
-- For easier updates only:
DROP TRIGGER IF EXISTS POOL_ACTIVE_STAKE_EPOCH_UPDATE_TRIGGER ON PUBLIC.EPOCH_STAKE;
DROP TRIGGER IF EXISTS EPOCH_ACTIVE_STAKE_EPOCH_UPDATE_TRIGGER ON PUBLIC.EPOCH_STAKE;

DROP FUNCTION IF EXISTS GREST.POOL_ACTIVE_STAKE_EPOCH_UPDATE;
DROP FUNCTION IF EXISTS GREST.EPOCH_ACTIVE_STAKE_EPOCH_UPDATE;

/* HELPER FUNCTIONS */
DROP FUNCTION IF EXISTS grest.get_last_active_stake_validated_epoch ();

CREATE FUNCTION grest.get_last_active_stake_validated_epoch ()
RETURNS INTEGER
LANGUAGE plpgsql
AS
$$
BEGIN
RETURN (
SELECT
last_value --coalesce doesn't work if empty set
FROM
grest.control_table
WHERE
key = 'last_active_stake_validated_epoch'
);
END;
$$;

/* POSSIBLE VALIDATION FOR CACHE (COUNTING ENTRIES) INSTEAD OF JUST DB-SYNC PART (EPOCH_STAKE)
DROP FUNCTION IF EXISTS grest.get_last_active_stake_cache_address_count ();
CREATE FUNCTION grest.get_last_active_stake_cache_address_count ()
RETURNS INTEGER
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO grest.EPOCH_ACTIVE_STAKE_CACHE (epoch_no, amount)
VALUES (NEW.epoch_no, NEW.amount);
EXIT insert_update;
EXCEPTION
WHEN UNIQUE_VIOLATION THEN
RAISE NOTICE 'Unique violation for epoch: %', NEW.epoch_no;
RETURN (
SELECT count(*) from cache...
)
END;
END LOOP
insert_update;
RETURN NULL;
END;
$$;
*/

$epoch_active_stake_epoch_update$
LANGUAGE PLPGSQL;
DROP FUNCTION IF EXISTS grest.active_stake_cache_update_check ();

DROP TRIGGER IF EXISTS EPOCH_ACTIVE_STAKE_EPOCH_UPDATE_TRIGGER ON PUBLIC.EPOCH_STAKE;
CREATE FUNCTION grest.active_stake_cache_update_check ()
RETURNS BOOLEAN
LANGUAGE plpgsql
AS
$$
DECLARE
_current_epoch_no integer;
_last_active_stake_validated_epoch text;
BEGIN
SELECT
grest.get_last_active_stake_validated_epoch()
INTO
_last_active_stake_validated_epoch;

SELECT
grest.get_current_epoch()
INTO
_current_epoch_no;

RAISE NOTICE 'Current epoch: %',
_current_epoch_no;
RAISE NOTICE 'Last active stake validated epoch: %',
_last_active_stake_validated_epoch;

IF
_current_epoch_no > COALESCE(_last_active_stake_validated_epoch::integer, 0)
THEN
RETURN TRUE;
END IF;

RETURN FALSE;
END;
$$;

COMMENT ON FUNCTION grest.active_stake_cache_update_check
IS 'Internal function to determine whether active stake cache should be updated';

DROP FUNCTION IF EXISTS grest.active_stake_cache_update (integer);

/* UPDATE FUNCTION */
CREATE FUNCTION grest.active_stake_cache_update (_epoch_no integer)
RETURNS VOID
LANGUAGE plpgsql
AS
$$
DECLARE
_last_pool_active_stake_cache_epoch_no integer;
_last_epoch_active_stake_cache_epoch_no integer;
BEGIN
/* POOL ACTIVE STAKE CACHE */
SELECT
COALESCE(MAX(epoch_no), 0)
FROM
GREST.POOL_ACTIVE_STAKE_CACHE
INTO _last_pool_active_stake_cache_epoch_no;

-- no way to reinsert previous epoch data if it gets corrupted ATM
-- but that should be a manual task anyway
INSERT INTO GREST.POOL_ACTIVE_STAKE_CACHE
SELECT
POOL_HASH.VIEW AS POOL_ID,
EPOCH_STAKE.EPOCH_NO,
SUM(EPOCH_STAKE.AMOUNT) AS AMOUNT
FROM
EPOCH_STAKE
INNER JOIN POOL_HASH ON POOL_HASH.ID = EPOCH_STAKE.POOL_ID
WHERE
EPOCH_STAKE.EPOCH_NO > _last_pool_active_stake_cache_epoch_no -- no need to worry about epoch 0 as no stake then
AND
EPOCH_STAKE.EPOCH_NO <= _epoch_no
GROUP BY
POOL_HASH.VIEW,
EPOCH_STAKE.EPOCH_NO
ON CONFLICT (
POOL_ID,
EPOCH_NO
) DO UPDATE
SET AMOUNT = EXCLUDED.AMOUNT;

/* EPOCH ACTIVE STAKE CACHE */
SELECT
COALESCE(MAX(epoch_no), 0)
FROM
GREST.EPOCH_ACTIVE_STAKE_CACHE
INTO _last_epoch_active_stake_cache_epoch_no;

INSERT INTO GREST.EPOCH_ACTIVE_STAKE_CACHE
SELECT
EPOCH_STAKE.EPOCH_NO,
SUM(EPOCH_STAKE.AMOUNT) AS AMOUNT
FROM
EPOCH_STAKE
WHERE
EPOCH_STAKE.EPOCH_NO > _last_epoch_active_stake_cache_epoch_no -- no need to worry about epoch 0 as no stake then
AND
EPOCH_STAKE.EPOCH_NO <= _epoch_no
GROUP BY
EPOCH_STAKE.EPOCH_NO
ON CONFLICT (
EPOCH_NO
) DO UPDATE
SET AMOUNT = EXCLUDED.AMOUNT;

CREATE TRIGGER EPOCH_ACTIVE_STAKE_EPOCH_UPDATE_TRIGGER
AFTER INSERT ON PUBLIC.EPOCH_STAKE
FOR EACH ROW
EXECUTE FUNCTION GREST.EPOCH_ACTIVE_STAKE_EPOCH_UPDATE ();
PERFORM grest.update_control_table(
'last_active_stake_validated_epoch',
_epoch_no::text
);
END;
$$;

COMMENT ON FUNCTION grest.active_stake_cache_update
IS 'Internal function to update active stake cache (both epoch and pool tables).';
18 changes: 14 additions & 4 deletions files/grest/rpc/01_cached_tables/pool_history_cache.sql
Expand Up @@ -205,7 +205,10 @@ begin
when 0 then
null
else
ROUND(actf.pool_fee_fixed + (((COALESCE(m.memtotal, 0) + COALESCE(l.leadertotal, 0)) - actf.pool_fee_fixed) * actf.pool_fee_variable))
case
when COALESCE(l.leadertotal, 0) < actf.pool_fee_fixed then COALESCE(l.leadertotal, 0)
else ROUND(actf.pool_fee_fixed + (((COALESCE(m.memtotal, 0) + COALESCE(l.leadertotal, 0)) - actf.pool_fee_fixed) * actf.pool_fee_variable))
end
end
end pool_fees,
case COALESCE(b.block_cnt, 0)
Expand All @@ -217,7 +220,10 @@ begin
when 0 then
null
else
ROUND(COALESCE(m.memtotal, 0) + (COALESCE(l.leadertotal, 0) - (actf.pool_fee_fixed + (((COALESCE(m.memtotal, 0) + COALESCE(l.leadertotal, 0)) - actf.pool_fee_fixed) * actf.pool_fee_variable))))
case
when COALESCE(l.leadertotal, 0) < actf.pool_fee_fixed then COALESCE(m.memtotal, 0)
else ROUND(COALESCE(m.memtotal, 0) + (COALESCE(l.leadertotal, 0) - (actf.pool_fee_fixed + (((COALESCE(m.memtotal, 0) + COALESCE(l.leadertotal, 0)) - actf.pool_fee_fixed) * actf.pool_fee_variable))))
end
end
end deleg_rewards,
case COALESCE(b.block_cnt, 0)
Expand All @@ -229,9 +235,13 @@ begin
when 0 then
null
else
case
when COALESCE(l.leadertotal, 0) < actf.pool_fee_fixed then
ROUND((((POW(( LEAST(( (COALESCE(m.memtotal, 0)) / (NULLIF(actf.active_stake,0)) ), 1000) + 1), 73) - 1)) * 100)::numeric, 9)
-- using LEAST as a way to prevent overflow, in case of dodgy database data (e.g. giant rewards / tiny active stake)
ROUND((((POW(( LEAST(( ((COALESCE(m.memtotal, 0) + (COALESCE(l.leadertotal, 0) - (actf.pool_fee_fixed + (((COALESCE(m.memtotal, 0) +
COALESCE(l.leadertotal, 0)) - actf.pool_fee_fixed) * actf.pool_fee_variable))))) / (NULLIF(actf.active_stake,0)) ), 1000) + 1), 73) - 1)) * 100)::numeric, 2)
else ROUND((((POW(( LEAST(( ((COALESCE(m.memtotal, 0) + (COALESCE(l.leadertotal, 0) - (actf.pool_fee_fixed + (((COALESCE(m.memtotal, 0) +
COALESCE(l.leadertotal, 0)) - actf.pool_fee_fixed) * actf.pool_fee_variable))))) / (NULLIF(actf.active_stake,0)) ), 1000) + 1), 73) - 1)) * 100)::numeric, 9)
end
end
end epoch_ros
from
Expand Down

0 comments on commit e381325

Please sign in to comment.