Skip to content

Commit

Permalink
Added prometheus metrics
Browse files Browse the repository at this point in the history
Fix notifications to close response
  • Loading branch information
tiger5226 committed Oct 21, 2020
1 parent 88d9618 commit eb0f738
Show file tree
Hide file tree
Showing 17 changed files with 488 additions and 15 deletions.
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/lbryio/chainquery/daemon/processing"
"github.com/lbryio/chainquery/global"
"github.com/lbryio/chainquery/lbrycrd"
server "github.com/lbryio/chainquery/swagger/apiserver/go"
"github.com/lbryio/chainquery/twilio"

"github.com/lbryio/lbry.go/extras/errors"
Expand Down Expand Up @@ -61,6 +62,8 @@ const ( // config setting keys
maxparalleltxprocessing = "maxparalleltxprocessing"
maxparallelvinprocessing = "maxparallelvinprocessing"
maxparallelvoutprocessing = "maxparallelvoutprocessing"
promuser = "promuser"
prompass = "prompass"
)

const (
Expand Down Expand Up @@ -202,6 +205,8 @@ func processConfiguration() {
jobs.ChainSyncDelay = viper.GetInt(chainsyncdelay)
jobs.ChainSyncRunDuration = viper.GetInt(chainsyncrunduration)
apiactions.MaxSQLAPITimeout = viper.GetInt(maxsqlapitimeout)
server.PromUser = viper.GetString(promuser)
server.PromPassword = viper.GetString(prompass)

//Flags last so they override everything before, even config
if viper.IsSet(debugmodeflag) {
Expand Down
10 changes: 9 additions & 1 deletion config/default/chainqueryconfig.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,12 @@
# auth_token="mytoken"
#[[subscriber.newclaim]]
# url= "http://localhost:8080/event/claim"
# auth_token="mytoken"
# auth_token="mytoken"

#Prometheus User - user for basic authentication on metrics endpoint
#DEFAULT: <none>
#promuser=

#Prometheus Password - password for basic authentication on metrics endpoint
#DEFAULT: <none>
#prompass=
5 changes: 5 additions & 0 deletions daemon/jobs/certificatesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package jobs

import (
"context"
"time"

"github.com/lbryio/chainquery/global"
"github.com/lbryio/chainquery/metrics"
"github.com/lbryio/chainquery/model"

"github.com/lbryio/lbry.go/extras/errors"
Expand All @@ -25,6 +27,9 @@ const certsProcessedPerIteration = 1000
// signed by the channels certificate. This ensure that the channel owner actually published this claim.
func CertificateSync() {
if !certificateSyncRunning {
metrics.JobLoad.WithLabelValues("certificate_sync").Inc()
defer metrics.JobLoad.WithLabelValues("certificate_sync").Dec()
defer metrics.Job(time.Now(), "certificate_sync")
logrus.Debug("Running Certificate Sync...")
certificateSyncRunning = true
defer endCertificateSync()
Expand Down
4 changes: 4 additions & 0 deletions daemon/jobs/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/lbryio/chainquery/daemon/processing"
"github.com/lbryio/chainquery/datastore"
"github.com/lbryio/chainquery/lbrycrd"
"github.com/lbryio/chainquery/metrics"
"github.com/lbryio/chainquery/model"
"github.com/lbryio/lbry.go/extras/errors"

Expand Down Expand Up @@ -50,6 +51,9 @@ func endChainSync() {

// ChainSync synchronizes the chain data when it does not match lbrycrd. It runs for x duration before it stores state.
func ChainSync() {
metrics.JobLoad.WithLabelValues("chain_sync").Inc()
defer metrics.JobLoad.WithLabelValues("chain_sync").Dec()
defer metrics.Job(time.Now(), "chain_sync")
defer endChainSync()
if chainSync == nil {
chainSync = &chainSyncStatus{}
Expand Down
4 changes: 4 additions & 0 deletions daemon/jobs/chainvalidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/lbryio/chainquery/lbrycrd"
"github.com/lbryio/chainquery/metrics"
"github.com/lbryio/chainquery/model"

"github.com/lbryio/lbry.go/extras/errors"
Expand All @@ -24,6 +25,9 @@ const chainValidationJob = "chainvalidationjob"
func ValidateChain() {
if !validatingChain {
go func() {
metrics.JobLoad.WithLabelValues("validate_chain").Inc()
defer metrics.JobLoad.WithLabelValues("validate_chain").Dec()
defer metrics.Job(time.Now(), "validate_chain")
var job *model.JobStatus
exists, err := model.JobStatuses(qm.Where(model.JobStatusColumns.JobName+"=?", chainValidationJob)).ExistsG()
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions daemon/jobs/claimtriesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/lbryio/chainquery/datastore"
"github.com/lbryio/chainquery/lbrycrd"
"github.com/lbryio/chainquery/metrics"
"github.com/lbryio/chainquery/model"

"github.com/lbryio/lbry.go/extras/errors"
Expand Down Expand Up @@ -49,6 +50,9 @@ func ClaimTrieSyncAsync() {

// ClaimTrieSync syncs the claim trie bidstate, effective amount and effective height
func ClaimTrieSync() {
metrics.JobLoad.WithLabelValues("claimtrie_sync").Inc()
defer metrics.JobLoad.WithLabelValues("claimtrie_sync").Dec()
defer metrics.Job(time.Now(), "claimtrie_sync")
//defer util.TimeTrack(time.Now(), "ClaimTrieSync", "always")
printDebug("ClaimTrieSync: started... ")
if lastSync == nil {
Expand Down
4 changes: 4 additions & 0 deletions daemon/jobs/mempoolsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/lbryio/chainquery/daemon/processing"
"github.com/lbryio/chainquery/lbrycrd"
"github.com/lbryio/chainquery/metrics"
"github.com/lbryio/chainquery/model"

"github.com/lbryio/lbry.go/extras/errors"
Expand All @@ -29,6 +30,9 @@ var mempoolBlock *model.Block
func MempoolSync() {
if !mempoolSyncIsRunning {
mempoolSyncIsRunning = true
metrics.JobLoad.WithLabelValues("mempool_sync").Inc()
defer metrics.JobLoad.WithLabelValues("mempool_sync").Dec()
defer metrics.Job(time.Now(), "mempool_sync")
// Need to lock block processing to avoid race condition where we are saving a mempool transaction after a block
// has already started processing transactions. The mempool transaction could overwrite the block transaction
// incorrectly.
Expand Down
12 changes: 12 additions & 0 deletions daemon/jobs/valuesync.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package jobs

import (
"time"

"github.com/lbryio/chainquery/metrics"
"github.com/lbryio/chainquery/model"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/sirupsen/logrus"
Expand All @@ -16,6 +19,9 @@ const syncClaimsInChannel = "SyncClaimsInChannel: "
//SyncAddressBalancesJob runs the SyncAddressBalances as a background job.
func SyncAddressBalancesJob() {
go func() {
metrics.JobLoad.WithLabelValues("address_balance_sync").Inc()
defer metrics.JobLoad.WithLabelValues("address_balance_sync").Dec()
defer metrics.Job(time.Now(), "address_balance_sync")
_, err := SyncAddressBalances()
if err != nil {
logrus.Error(syncAddressBalances, err)
Expand All @@ -26,6 +32,9 @@ func SyncAddressBalancesJob() {
// SyncClaimsInChannelJob runs the SyncClaimsInChannel as a background job.
func SyncClaimsInChannelJob() {
go func() {
metrics.JobLoad.WithLabelValues("claims_in_channel_sync").Inc()
defer metrics.JobLoad.WithLabelValues("claims_in_channel_sync").Dec()
defer metrics.Job(time.Now(), "claims_in_channel_sync")
err := SyncClaimCntInChannel()
if err != nil {
logrus.Error(syncClaimsInChannel, err)
Expand All @@ -36,6 +45,9 @@ func SyncClaimsInChannelJob() {
//TransactionValueSync synchronizes the transaction value column due to a bug in mysql related to triggers.
//https://bugs.mysql.com/bug.php?id=11472
func TransactionValueSync() {
metrics.JobLoad.WithLabelValues("transaction_value_sync").Inc()
defer metrics.JobLoad.WithLabelValues("transaction_value_sync").Dec()
defer metrics.Job(time.Now(), "transaction_value_sync")
_, err := SyncTransactionValue()
if err != nil {
logrus.Error(syncTransactionValues, err)
Expand Down
3 changes: 3 additions & 0 deletions daemon/processing/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/lbryio/chainquery/lbrycrd"
"github.com/lbryio/chainquery/metrics"
"github.com/lbryio/chainquery/model"
"github.com/lbryio/chainquery/twilio"
"github.com/lbryio/chainquery/util"
Expand All @@ -30,6 +31,7 @@ var ManualShutDownError = errors.Err("Daemon stopped manually!")
// important to note that if the previous block is not processed it will panic to prevent corruption because blocks
// must be processed in order.
func RunBlockProcessing(stopper *stop.Group, height uint64) uint64 {
defer metrics.Processing(time.Now(), "block")
defer util.TimeTrack(time.Now(), "runBlockProcessing", "daemonprofile")
if height == 0 {
err := processGenesisBlock()
Expand Down Expand Up @@ -61,6 +63,7 @@ func RunBlockProcessing(stopper *stop.Group, height uint64) uint64 {

block, err := ProcessBlock(height, stopper, jsonBlock)
if err != nil {
metrics.ProcessingFailures.WithLabelValues("block").Inc()
rollBackHeight := height - 1
blockRemovalError := block.DeleteG()
if blockRemovalError != nil {
Expand Down
6 changes: 4 additions & 2 deletions daemon/processing/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import (
"encoding/hex"
"encoding/json"
"fmt"

"github.com/lbryio/chainquery/notifications"
"time"

"github.com/lbryio/lbryschema.go/address/base58"
pb "github.com/lbryio/types/v2/go"
Expand All @@ -15,7 +14,9 @@ import (
"github.com/lbryio/chainquery/datastore"
"github.com/lbryio/chainquery/global"
"github.com/lbryio/chainquery/lbrycrd"
"github.com/lbryio/chainquery/metrics"
"github.com/lbryio/chainquery/model"
"github.com/lbryio/chainquery/notifications"

"github.com/lbryio/lbry.go/extras/errors"
util "github.com/lbryio/lbry.go/lbrycrd"
Expand All @@ -26,6 +27,7 @@ import (
)

func processAsClaim(script []byte, vout model.Output, tx model.Transaction, blockHeight uint64) (address *string, claimID *string, err error) {
defer metrics.Processing(time.Now(), "claim")
var pubkeyscript []byte
var name string
var claimid string
Expand Down
16 changes: 13 additions & 3 deletions daemon/processing/outpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"runtime"
"strconv"
"strings"

"github.com/lbryio/chainquery/notifications"
"time"

ds "github.com/lbryio/chainquery/datastore"
"github.com/lbryio/chainquery/lbrycrd"
"github.com/lbryio/chainquery/metrics"
m "github.com/lbryio/chainquery/model"
"github.com/lbryio/chainquery/notifications"
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/extras/stop"

Expand Down Expand Up @@ -53,6 +54,9 @@ func vinProcessor(worker int, jobs <-chan vinToProcess, results chan<- error) {
for job := range jobs {
q(strconv.Itoa(worker) + " - WORKER VIN start new job " + strconv.Itoa(int(job.jsonVin.Sequence)))
result := ProcessVin(job.jsonVin, job.tx, job.txDC, job.vin)
if result != nil {
metrics.ProcessingFailures.WithLabelValues("vin").Inc()
}
q(strconv.Itoa(worker) + " - WORKER VIN passing result " + strconv.Itoa(int(job.jsonVin.Sequence)))
results <- result
q(strconv.Itoa(worker) + " - WORKER VIN passed result " + strconv.Itoa(int(job.jsonVin.Sequence)))
Expand All @@ -72,13 +76,18 @@ func initVoutWorkers(s *stop.Group, nrWorkers int, jobs <-chan voutToProcess, re

func voutProcessor(worker int, jobs <-chan voutToProcess, results chan<- error) {
for job := range jobs {
results <- processVout(job.jsonVout, job.tx, job.txDC, job.blockHeight)
err := processVout(job.jsonVout, job.tx, job.txDC, job.blockHeight)
if err != nil {
metrics.ProcessingFailures.WithLabelValues("vin").Inc()
}
results <- err
}
q(strconv.Itoa(worker) + " - WORKER VOUT finished all jobs")
}

//ProcessVin handles the processing of an input to a transaction.
func ProcessVin(jsonVin *lbrycrd.Vin, tx *m.Transaction, txDC *txDebitCredits, n uint64) error {
defer metrics.Processing(time.Now(), "vin")
vin := &m.Input{}
foundVin := ds.GetInput(tx.Hash, len(jsonVin.Coinbase) > 0, jsonVin.TxID, uint(jsonVin.Vout))
if foundVin != nil {
Expand Down Expand Up @@ -170,6 +179,7 @@ func processCoinBaseVin(jsonVin *lbrycrd.Vin, vin *m.Input) error {
}

func processVout(jsonVout *lbrycrd.Vout, tx *m.Transaction, txDC *txDebitCredits, blockHeight uint64) error {
defer metrics.Processing(time.Now(), "vout")
vout := &m.Output{}
foundVout := ds.GetOutput(tx.Hash, uint(jsonVout.N))
if foundVout != nil {
Expand Down
5 changes: 5 additions & 0 deletions daemon/processing/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/lbryio/chainquery/datastore"
"github.com/lbryio/chainquery/lbrycrd"
"github.com/lbryio/chainquery/metrics"
"github.com/lbryio/chainquery/model"
"github.com/lbryio/chainquery/util"

Expand Down Expand Up @@ -55,6 +56,9 @@ func txProcessor(s *stop.Group, jobs <-chan txToProcess, results chan<- txProces
case job := <-jobs:
q(strconv.Itoa(worker) + " - WORKER TX - Start new job " + job.tx.Txid)
err := ProcessTx(job.tx, job.blockTime, job.blockHeight)
if err != nil {
metrics.ProcessingFailures.WithLabelValues("transaction").Inc()
}
result := txProcessResult{
tx: job.tx,
blockTime: job.blockTime,
Expand Down Expand Up @@ -125,6 +129,7 @@ func (txDC *txDebitCredits) add(address string, value float64) {

// ProcessTx processes an individual transaction from a block.
func ProcessTx(jsonTx *lbrycrd.TxRawResult, blockTime uint64, blockHeight uint64) error {
defer metrics.Processing(time.Now(), "transaction")
defer util.TimeTrack(time.Now(), "processTx "+jsonTx.Txid+" -- ", "daemonprofile")

//Save transaction before the id is used any where else otherwise it will be 0
Expand Down
13 changes: 5 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ replace github.com/btcsuite/btcd => github.com/lbryio/lbrycrd.go v0.0.0-20200203
require (
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/fatih/color v1.7.0
github.com/fsnotify/fsnotify v1.4.7
github.com/go-ini/ini v1.48.0
Expand All @@ -16,7 +15,6 @@ require (
github.com/gorilla/mux v1.7.3
github.com/gorilla/schema v1.0.2 // indirect
github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jmoiron/sqlx v0.0.0-20170430194603-d9bd385d68c0
github.com/johntdyer/slack-go v0.0.0-20180213144715-95fac1160b22 // indirect
github.com/johntdyer/slackrus v0.0.0-20170926115001-3992f319fd0a
Expand All @@ -28,17 +26,16 @@ require (
github.com/lbryio/types v0.0.0-20191009145016-1bb8107e04f8
github.com/lib/pq v1.1.1 // indirect
github.com/magiconair/properties v1.8.0 // indirect
github.com/mattn/go-colorable v0.0.9 // indirect
github.com/mattn/go-isatty v0.0.3 // indirect
github.com/mattn/go-sqlite3 v1.10.0 // indirect
github.com/mitchellh/mapstructure v1.1.2
github.com/pelletier/go-toml v1.2.0 // indirect
github.com/pkg/errors v0.8.1
github.com/pkg/errors v0.9.1
github.com/pkg/profile v1.3.0
github.com/prometheus/client_golang v1.8.0
github.com/rubenv/sql-migrate v0.0.0-20170330050058-38004e7a77f2
github.com/sfreiberg/gotwilio v0.0.0-20180612161623-8fb7259ba8bf
github.com/shopspring/decimal v0.0.0-20191009025716-f1972eb1d1f5
github.com/sirupsen/logrus v1.4.2
github.com/sirupsen/logrus v1.6.0
github.com/spf13/afero v1.1.1 // indirect
github.com/spf13/cast v1.3.0
github.com/spf13/cobra v0.0.3
Expand All @@ -49,8 +46,8 @@ require (
github.com/volatiletech/null v8.0.0+incompatible
github.com/volatiletech/sqlboiler v3.4.0+incompatible
github.com/ziutek/mymysql v1.5.4 // indirect
golang.org/x/net v0.0.0-20191009170851-d66e71096ffb
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be
golang.org/x/net v0.0.0-20200625001655-4c5254603344
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 // indirect
gopkg.in/gorp.v1 v1.7.1 // indirect
)
Loading

0 comments on commit eb0f738

Please sign in to comment.