Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 29 additions & 13 deletions internal/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -42,10 +43,11 @@ func precompute(reportCh chan report.Message, ingestorRepo *repository.Assets) (
}

// Compute generates OpenConfig data for each device.
func compute(reportCh chan<- report.Message, ingestorRepo *repository.Assets, devices map[string]*device.Device) error {
func compute(reportCh chan<- report.Message, ingestorRepo *repository.Assets, devices map[string]*device.Device) (uint32, error) {
wg := sync.WaitGroup{}

failed := false
var builtCount atomic.Uint32
var mutex sync.Mutex

for _, dev := range ingestorRepo.DeviceInventory {
Expand All @@ -70,25 +72,32 @@ func compute(reportCh chan<- report.Message, ingestorRepo *repository.Assets, de
mutex.Lock()
failed = true
mutex.Unlock()
} else {
mutex.Lock()
builtCount.Add(1)
mutex.Unlock()
}
}(devices[dev.Hostname])
}

wg.Wait()

successfullyBuilt := builtCount.Load()

if failed {
return errors.New("OpenConfig conversion failed")
return successfullyBuilt, errors.New("OpenConfig conversion failed")
}

return nil
return successfullyBuilt, nil
}

// RunBuild start the build pipeline to convert CMDB data to OpenConfig for each devices.
// One build is composed are three steps:
// - fetch data using ingestors (one ingestor = one data source API endpoint)
// - precompute data to make them usable
// - compute to OpenConfig
func RunBuild(reportCh chan report.Message) (map[string]*device.Device, report.PerformanceStats, error) {
stats := report.PerformanceStats{}
func RunBuild(reportCh chan report.Message) (map[string]*device.Device, report.Stats, error) {
stats := report.Stats{}
startTime := time.Now()

// Fetch data from CMDB
Expand All @@ -99,12 +108,12 @@ func RunBuild(reportCh chan report.Message) (map[string]*device.Device, report.P
ingestorRepo.PrintStats()
ingestorRepo.ReportStats(reportCh)
ingestorFetchFinishTime := time.Now()
stats.DataFetchingDuration = ingestorFetchFinishTime.Sub(startTime)
stats.Performance.DataFetchingDuration = ingestorFetchFinishTime.Sub(startTime)

// Precompute data per device
devices, precomputeError := precompute(reportCh, ingestorRepo)
precomputeFinishTime := time.Now()
stats.PrecomputeDuration = precomputeFinishTime.Sub(ingestorFetchFinishTime)
stats.Performance.PrecomputeDuration = precomputeFinishTime.Sub(ingestorFetchFinishTime)

// We stop here if the user decided all device configuration must have been built with success
if precomputeError != nil {
Expand All @@ -119,11 +128,12 @@ func RunBuild(reportCh chan report.Message) (map[string]*device.Device, report.P
}

// Generate openconfig for all devices
computeError := compute(reportCh, ingestorRepo, devices)
successfullyBuilt, computeError := compute(reportCh, ingestorRepo, devices)
computeTime := time.Now()
stats.ComputeDuration = computeTime.Sub(precomputeFinishTime)
stats.BuildDuration = computeTime.Sub(startTime)
stats.Performance.ComputeDuration = computeTime.Sub(precomputeFinishTime)
stats.Performance.BuildDuration = computeTime.Sub(startTime)

stats.BuiltDevicesCount = successfullyBuilt
stats.Log()

if computeError != nil {
Expand All @@ -150,16 +160,22 @@ func StartBuildLoop(deviceRepo router.DevicesRepository, reports *report.Reposit
reports.UpdateStatus(report.InProgress)
if devs, stats, err := RunBuild(reportCh); err != nil {
metricsRegistry.BuildFailed()

reports.UpdateStatus(report.Failed)
reports.UpdatePerformanceStats(stats)
reports.UpdateStats(stats)

log.Error().Err(err).Msg("build failed")
} else {
deviceRepo.Set(devs)
log.Info().Msg("build successful")

metricsRegistry.BuildSuccessful()
metricsRegistry.SetBuiltDevices(stats.BuiltDevicesCount)

reports.UpdateStatus(report.Success)
reports.UpdatePerformanceStats(stats)
reports.UpdateStats(stats)
reports.MarkAsSuccessful()

log.Info().Msg("build successful")
}

reports.MarkAsComplete()
Expand Down
18 changes: 16 additions & 2 deletions internal/metrics/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,21 @@ import (
)

type Registry struct {
lastBuildStatus *prometheus.GaugeVec
buildTotal *prometheus.CounterVec
BuiltDevicesNumber *prometheus.GaugeVec
lastBuildStatus *prometheus.GaugeVec
buildTotal *prometheus.CounterVec
}

func NewRegistry() Registry {
return Registry{
BuiltDevicesNumber: promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "built_devices_number",
Help: "Number of devices built during last successful build",
},
[]string{},
),

lastBuildStatus: promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "build_status",
Expand Down Expand Up @@ -47,3 +56,8 @@ func (r *Registry) BuildFailed() {
r.lastBuildStatus.WithLabelValues().Set(0)
r.buildTotal.WithLabelValues("false").Inc()
}

// SetBuiltDevices updates the `built_devices` gauge.
func (r *Registry) SetBuiltDevices(count uint32) {
r.BuiltDevicesNumber.WithLabelValues().Set(float64(count))
}
9 changes: 5 additions & 4 deletions internal/report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import (
"github.com/rs/zerolog/log"
)

type Report struct {
type Report struct { //nolint:govet // for JSON order
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`

mutex *sync.Mutex

Logs map[MessageType][]Message `json:"logs"`
Status jobStatus `json:"status"`
Performance PerformanceStats `json:"performance"`
Status jobStatus `json:"status"`
Stats Stats `json:"stats"`

Logs map[MessageType][]Message `json:"logs"`
}

// NewReport creates and initializes a new Report.
Expand Down
4 changes: 2 additions & 2 deletions internal/report/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ func (r *Repository) UpdateStatus(status jobStatus) {
r.last.Status = status
}

func (r *Repository) UpdatePerformanceStats(stats PerformanceStats) {
func (r *Repository) UpdateStats(stats Stats) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.last.Performance = stats
r.last.Stats = stats
}

func (r *Repository) MarkAsComplete() {
Expand Down
14 changes: 12 additions & 2 deletions internal/report/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,20 @@ import (
"github.com/rs/zerolog/log"
)

type Stats struct {
BuiltDevicesCount uint32 `json:"built_devices"`
Performance PerformanceStats `json:"performance"`
}

func (s Stats) Log() {
log.Info().Uint32("successfully_built", s.BuiltDevicesCount).Send()
s.Performance.Log()
}

// PerformanceStats contains durations of each step of the build pipeline.
type PerformanceStats struct {
DataFetchingDuration time.Duration `json:"data_fetching_duration"`
PrecomputeDuration time.Duration `json:"precomputeduration"`
PrecomputeDuration time.Duration `json:"precompute_duration"`
ComputeDuration time.Duration `json:"compute_duration"`
BuildDuration time.Duration `json:"build_duration"`
}
Expand All @@ -20,7 +30,7 @@ type PerformanceStats struct {
func (p *PerformanceStats) MarshalJSON() ([]byte, error) {
return json.Marshal(&struct {
DataFetchingDuration string `json:"data_fetching_duration"`
PrecomputeDuration string `json:"precomputeduration"`
PrecomputeDuration string `json:"precompute_duration"`
ComputeDuration string `json:"compute_duration"`
BuildDuration string `json:"build_duration"`
}{
Expand Down