diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index 182c4e10ebb..bb0a896cca5 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -341,8 +341,9 @@ func run() { x.RemoveCidFile() }() - st.zero.closer.AddRunning(1) + st.zero.closer.AddRunning(2) go x.MonitorMemoryMetrics(st.zero.closer) + go x.MonitorDiskMetrics("wal_fs", opts.w, st.zero.closer) glog.Infoln("Running Dgraph Zero...") st.zero.closer.Wait() diff --git a/worker/backup_ee.go b/worker/backup_ee.go index 852e4b55052..e827f750d09 100644 --- a/worker/backup_ee.go +++ b/worker/backup_ee.go @@ -22,6 +22,7 @@ import ( "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" + ostats "go.opencensus.io/stats" "github.com/golang/glog" "github.com/golang/protobuf/proto" @@ -112,6 +113,16 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest, forceFull backupLock.Lock() defer backupLock.Unlock() + backupSuccessful := false + ostats.Record(ctx, x.NumBackups.M(1), x.PendingBackups.M(1)) + defer func() { + if backupSuccessful { + ostats.Record(ctx, x.NumBackupsSuccess.M(1), x.PendingBackups.M(-1)) + } else { + ostats.Record(ctx, x.NumBackupsFailed.M(1), x.PendingBackups.M(-1)) + } + }() + ts, err := Timestamps(ctx, &pb.Num{ReadOnly: true}) if err != nil { glog.Errorf("Unable to retrieve readonly timestamp for backup: %s", err) @@ -211,7 +222,14 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest, forceFull m.Encrypted = (x.WorkerConfig.EncryptionKey != nil) bp := NewBackupProcessor(nil, req) - return bp.CompleteBackup(ctx, &m) + err = bp.CompleteBackup(ctx, &m) + + if err != nil { + return err + } + + backupSuccessful = true + return nil } func ProcessListBackups(ctx context.Context, location string, creds *x.MinioCredentials) ( diff --git a/worker/server_state.go b/worker/server_state.go index 923b63ccc4c..60b61a9f98b 100644 --- a/worker/server_state.go +++ b/worker/server_state.go @@ -129,10 +129,11 @@ func (s *ServerState) initStorage() { // Temp directory x.Check(os.MkdirAll(x.WorkerConfig.TmpDir, 0700)) - s.gcCloser = z.NewCloser(2) + s.gcCloser = z.NewCloser(3) go x.RunVlogGC(s.Pstore, s.gcCloser) // Commenting this out because Badger is doing its own cache checks. go x.MonitorCacheHealth(s.Pstore, s.gcCloser) + go x.MonitorDiskMetrics("postings_fs", Config.PostingDir, s.gcCloser) } // Dispose stops and closes all the resources inside the server state. diff --git a/x/disk_metrics_linux.go b/x/disk_metrics_linux.go new file mode 100644 index 00000000000..fd9ead79d1a --- /dev/null +++ b/x/disk_metrics_linux.go @@ -0,0 +1,47 @@ +// +build linux + +package x + +// Only setting linux because some of the darwin/BSDs have a different struct for syscall.statfs_t + +import ( + "context" + "syscall" + "time" + + "github.com/dgraph-io/ristretto/z" + "github.com/golang/glog" + "go.opencensus.io/stats" + "go.opencensus.io/tag" +) + +func MonitorDiskMetrics(dirTag string, dir string, lc *z.Closer) { + defer lc.Done() + ctx, err := tag.New(context.Background(), tag.Upsert(KeyDirType, dirTag)) + + fastTicker := time.NewTicker(10 * time.Second) + defer fastTicker.Stop() + + if err != nil { + glog.Errorln("Invalid Tag", err) + return + } + + for { + select { + case <-lc.HasBeenClosed(): + return + case <-fastTicker.C: + s := syscall.Statfs_t{} + err = syscall.Statfs(dir, &s) + if err != nil { + continue + } + reservedBlocks := s.Bfree - s.Bavail + total := int64(s.Frsize) * int64(s.Blocks-reservedBlocks) + free := int64(s.Frsize) * int64(s.Bavail) + stats.Record(ctx, DiskFree.M(free), DiskUsed.M(total-free), DiskTotal.M(total)) + } + } + +} diff --git a/x/disk_metrics_others.go b/x/disk_metrics_others.go new file mode 100644 index 00000000000..04ab4ba8b40 --- /dev/null +++ b/x/disk_metrics_others.go @@ -0,0 +1,13 @@ +// +build !linux + +package x + +import ( + "github.com/dgraph-io/ristretto/z" + "github.com/golang/glog" +) + +func MonitorDiskMetrics(_ string, _ string, lc *z.Closer) { + defer lc.Done() + glog.Infoln("File system metrics are not currently supported on non-Linux platforms") +} diff --git a/x/metrics.go b/x/metrics.go index 91936530fa2..9e54867d5da 100644 --- a/x/metrics.go +++ b/x/metrics.go @@ -59,6 +59,15 @@ var ( // NumEdges is the total number of edges created so far. NumEdges = stats.Int64("num_edges_total", "Total number of edges created", stats.UnitDimensionless) + // NumBackups is the number of backups requested + NumBackups = stats.Int64("num_backups_total", + "Total number of backups requested", stats.UnitDimensionless) + // NumBackupsSuccess is the number of backups successfully completed + NumBackupsSuccess = stats.Int64("num_backups_success_total", + "Total number of backups completed", stats.UnitDimensionless) + // NumBackupsFailed is the number of backups failed + NumBackupsFailed = stats.Int64("num_backups_failed_total", + "Total number of backups failed", stats.UnitDimensionless) // LatencyMs is the latency of the various Dgraph operations. LatencyMs = stats.Float64("latency", "Latency of the various methods", stats.UnitMilliseconds) @@ -71,6 +80,9 @@ var ( // PendingProposals records the current number of pending RAFT proposals. PendingProposals = stats.Int64("pending_proposals_total", "Number of pending proposals", stats.UnitDimensionless) + // PendingBackups records if a backup is currently in progress + PendingBackups = stats.Int64("pending_backups_total", + "Number of backups", stats.UnitDimensionless) // MemoryAlloc records the amount of memory allocated via jemalloc MemoryAlloc = stats.Int64("memory_alloc_bytes", "Amount of memory allocated", stats.UnitBytes) @@ -83,6 +95,15 @@ var ( // MemoryProc records the amount of memory used in processes. MemoryProc = stats.Int64("memory_proc_bytes", "Amount of memory used in processes", stats.UnitBytes) + // DiskFree records the number of bytes free on the disk + DiskFree = stats.Int64("disk_free_bytes", + "Total number of bytes free on disk", stats.UnitBytes) + // DiskUsed records the number of bytes free on the disk + DiskUsed = stats.Int64("disk_used_bytes", + "Total number of bytes used on disk", stats.UnitBytes) + // DiskTotal records the number of bytes free on the disk + DiskTotal = stats.Int64("disk_total_bytes", + "Total number of bytes on disk", stats.UnitBytes) // ActiveMutations is the current number of active mutations. ActiveMutations = stats.Int64("active_mutations_total", "Number of active mutations", stats.UnitDimensionless) @@ -141,6 +162,9 @@ var ( // KeyMethod is the tag key used to record the method (e.g read or mutate). KeyMethod, _ = tag.NewKey("method") + // KeyDirType is the tag key used to record the group for FileSystem metrics + KeyDirType, _ = tag.NewKey("dir") + // Tag values. // TagValueStatusOK is the tag value used to signal a successful operation. @@ -161,6 +185,8 @@ var ( allRaftKeys = []tag.Key{KeyGroup} + allFSKeys = []tag.Key{KeyDirType} + allViews = []*view.View{ { Name: LatencyMs.Name(), @@ -183,6 +209,20 @@ var ( Aggregation: view.Count(), TagKeys: allTagKeys, }, + { + Name: NumBackups.Name(), + Measure: NumBackups, + Description: NumBackups.Description(), + Aggregation: view.Count(), + TagKeys: nil, + }, + { + Name: NumBackupsSuccess.Name(), + Measure: NumBackupsSuccess, + Description: NumBackupsSuccess.Description(), + Aggregation: view.Count(), + TagKeys: nil, + }, { Name: TxnCommits.Name(), Measure: TxnCommits, @@ -227,6 +267,13 @@ var ( Aggregation: view.LastValue(), TagKeys: nil, }, + { + Name: PendingBackups.Name(), + Measure: PendingBackups, + Description: PendingBackups.Description(), + Aggregation: view.Sum(), + TagKeys: nil, + }, { Name: MemoryAlloc.Name(), Measure: MemoryAlloc, @@ -255,6 +302,27 @@ var ( Aggregation: view.LastValue(), TagKeys: allTagKeys, }, + { + Name: DiskFree.Name(), + Measure: DiskFree, + Description: DiskFree.Description(), + Aggregation: view.LastValue(), + TagKeys: allFSKeys, + }, + { + Name: DiskUsed.Name(), + Measure: DiskUsed, + Description: DiskUsed.Description(), + Aggregation: view.LastValue(), + TagKeys: allFSKeys, + }, + { + Name: DiskTotal.Name(), + Measure: DiskTotal, + Description: DiskTotal.Description(), + Aggregation: view.LastValue(), + TagKeys: allFSKeys, + }, { Name: AlphaHealth.Name(), Measure: AlphaHealth, @@ -370,6 +438,8 @@ func init() { Checkf(err, "Failed to create OpenCensus Prometheus exporter: %v", err) view.RegisterExporter(pe) + // Exposing metrics at /metrics, which is the usual standard, as well as at the old endpoint + http.Handle("/metrics", pe) http.Handle("/debug/prometheus_metrics", pe) }