Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,9 @@ func run() {
x.RemoveCidFile()
}()

st.zero.closer.AddRunning(1)
st.zero.closer.AddRunning(2)
go x.MonitorMemoryMetrics(st.zero.closer)
go x.MonitorDiskMetrics("wal_fs", opts.w, st.zero.closer)

glog.Infoln("Running Dgraph Zero...")
st.zero.closer.Wait()
Expand Down
20 changes: 19 additions & 1 deletion worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
ostats "go.opencensus.io/stats"

"github.com/golang/glog"
"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -112,6 +113,16 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest, forceFull
backupLock.Lock()
defer backupLock.Unlock()

backupSuccessful := false
ostats.Record(ctx, x.NumBackups.M(1), x.PendingBackups.M(1))
defer func() {
if backupSuccessful {
ostats.Record(ctx, x.NumBackupsSuccess.M(1), x.PendingBackups.M(-1))
} else {
ostats.Record(ctx, x.NumBackupsFailed.M(1), x.PendingBackups.M(-1))
}
}()

ts, err := Timestamps(ctx, &pb.Num{ReadOnly: true})
if err != nil {
glog.Errorf("Unable to retrieve readonly timestamp for backup: %s", err)
Expand Down Expand Up @@ -211,7 +222,14 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest, forceFull
m.Encrypted = (x.WorkerConfig.EncryptionKey != nil)

bp := NewBackupProcessor(nil, req)
return bp.CompleteBackup(ctx, &m)
err = bp.CompleteBackup(ctx, &m)

if err != nil {
return err
}

backupSuccessful = true
return nil
}

func ProcessListBackups(ctx context.Context, location string, creds *x.MinioCredentials) (
Expand Down
3 changes: 2 additions & 1 deletion worker/server_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,11 @@ func (s *ServerState) initStorage() {
// Temp directory
x.Check(os.MkdirAll(x.WorkerConfig.TmpDir, 0700))

s.gcCloser = z.NewCloser(2)
s.gcCloser = z.NewCloser(3)
go x.RunVlogGC(s.Pstore, s.gcCloser)
// Commenting this out because Badger is doing its own cache checks.
go x.MonitorCacheHealth(s.Pstore, s.gcCloser)
go x.MonitorDiskMetrics("postings_fs", Config.PostingDir, s.gcCloser)
}

// Dispose stops and closes all the resources inside the server state.
Expand Down
47 changes: 47 additions & 0 deletions x/disk_metrics_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// +build linux

package x

// Only setting linux because some of the darwin/BSDs have a different struct for syscall.statfs_t

import (
"context"
"syscall"
"time"

"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
)

func MonitorDiskMetrics(dirTag string, dir string, lc *z.Closer) {
defer lc.Done()
ctx, err := tag.New(context.Background(), tag.Upsert(KeyDirType, dirTag))

fastTicker := time.NewTicker(10 * time.Second)
defer fastTicker.Stop()

if err != nil {
glog.Errorln("Invalid Tag", err)
return
}

for {
select {
case <-lc.HasBeenClosed():
return
case <-fastTicker.C:
s := syscall.Statfs_t{}
err = syscall.Statfs(dir, &s)
if err != nil {
continue
}
reservedBlocks := s.Bfree - s.Bavail
total := int64(s.Frsize) * int64(s.Blocks-reservedBlocks)
free := int64(s.Frsize) * int64(s.Bavail)
stats.Record(ctx, DiskFree.M(free), DiskUsed.M(total-free), DiskTotal.M(total))
}
}

}
13 changes: 13 additions & 0 deletions x/disk_metrics_others.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// +build !linux

package x

import (
"github.com/dgraph-io/ristretto/z"
"github.com/golang/glog"
)

func MonitorDiskMetrics(_ string, _ string, lc *z.Closer) {
defer lc.Done()
glog.Infoln("File system metrics are not currently supported on non-Linux platforms")
}
70 changes: 70 additions & 0 deletions x/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ var (
// NumEdges is the total number of edges created so far.
NumEdges = stats.Int64("num_edges_total",
"Total number of edges created", stats.UnitDimensionless)
// NumBackups is the number of backups requested
NumBackups = stats.Int64("num_backups_total",
"Total number of backups requested", stats.UnitDimensionless)
// NumBackupsSuccess is the number of backups successfully completed
NumBackupsSuccess = stats.Int64("num_backups_success_total",
"Total number of backups completed", stats.UnitDimensionless)
// NumBackupsFailed is the number of backups failed
NumBackupsFailed = stats.Int64("num_backups_failed_total",
"Total number of backups failed", stats.UnitDimensionless)
// LatencyMs is the latency of the various Dgraph operations.
LatencyMs = stats.Float64("latency",
"Latency of the various methods", stats.UnitMilliseconds)
Expand All @@ -71,6 +80,9 @@ var (
// PendingProposals records the current number of pending RAFT proposals.
PendingProposals = stats.Int64("pending_proposals_total",
"Number of pending proposals", stats.UnitDimensionless)
// PendingBackups records if a backup is currently in progress
PendingBackups = stats.Int64("pending_backups_total",
"Number of backups", stats.UnitDimensionless)
// MemoryAlloc records the amount of memory allocated via jemalloc
MemoryAlloc = stats.Int64("memory_alloc_bytes",
"Amount of memory allocated", stats.UnitBytes)
Expand All @@ -83,6 +95,15 @@ var (
// MemoryProc records the amount of memory used in processes.
MemoryProc = stats.Int64("memory_proc_bytes",
"Amount of memory used in processes", stats.UnitBytes)
// DiskFree records the number of bytes free on the disk
DiskFree = stats.Int64("disk_free_bytes",
"Total number of bytes free on disk", stats.UnitBytes)
// DiskUsed records the number of bytes free on the disk
DiskUsed = stats.Int64("disk_used_bytes",
"Total number of bytes used on disk", stats.UnitBytes)
// DiskTotal records the number of bytes free on the disk
DiskTotal = stats.Int64("disk_total_bytes",
"Total number of bytes on disk", stats.UnitBytes)
// ActiveMutations is the current number of active mutations.
ActiveMutations = stats.Int64("active_mutations_total",
"Number of active mutations", stats.UnitDimensionless)
Expand Down Expand Up @@ -141,6 +162,9 @@ var (
// KeyMethod is the tag key used to record the method (e.g read or mutate).
KeyMethod, _ = tag.NewKey("method")

// KeyDirType is the tag key used to record the group for FileSystem metrics
KeyDirType, _ = tag.NewKey("dir")

// Tag values.

// TagValueStatusOK is the tag value used to signal a successful operation.
Expand All @@ -161,6 +185,8 @@ var (

allRaftKeys = []tag.Key{KeyGroup}

allFSKeys = []tag.Key{KeyDirType}

allViews = []*view.View{
{
Name: LatencyMs.Name(),
Expand All @@ -183,6 +209,20 @@ var (
Aggregation: view.Count(),
TagKeys: allTagKeys,
},
{
Name: NumBackups.Name(),
Measure: NumBackups,
Description: NumBackups.Description(),
Aggregation: view.Count(),
TagKeys: nil,
},
{
Name: NumBackupsSuccess.Name(),
Measure: NumBackupsSuccess,
Description: NumBackupsSuccess.Description(),
Aggregation: view.Count(),
TagKeys: nil,
},
{
Name: TxnCommits.Name(),
Measure: TxnCommits,
Expand Down Expand Up @@ -227,6 +267,13 @@ var (
Aggregation: view.LastValue(),
TagKeys: nil,
},
{
Name: PendingBackups.Name(),
Measure: PendingBackups,
Description: PendingBackups.Description(),
Aggregation: view.Sum(),
TagKeys: nil,
},
{
Name: MemoryAlloc.Name(),
Measure: MemoryAlloc,
Expand Down Expand Up @@ -255,6 +302,27 @@ var (
Aggregation: view.LastValue(),
TagKeys: allTagKeys,
},
{
Name: DiskFree.Name(),
Measure: DiskFree,
Description: DiskFree.Description(),
Aggregation: view.LastValue(),
TagKeys: allFSKeys,
},
{
Name: DiskUsed.Name(),
Measure: DiskUsed,
Description: DiskUsed.Description(),
Aggregation: view.LastValue(),
TagKeys: allFSKeys,
},
{
Name: DiskTotal.Name(),
Measure: DiskTotal,
Description: DiskTotal.Description(),
Aggregation: view.LastValue(),
TagKeys: allFSKeys,
},
{
Name: AlphaHealth.Name(),
Measure: AlphaHealth,
Expand Down Expand Up @@ -370,6 +438,8 @@ func init() {
Checkf(err, "Failed to create OpenCensus Prometheus exporter: %v", err)
view.RegisterExporter(pe)

// Exposing metrics at /metrics, which is the usual standard, as well as at the old endpoint
http.Handle("/metrics", pe)
http.Handle("/debug/prometheus_metrics", pe)
}

Expand Down