Skip to content

Commit

Permalink
Store metric collection timestamp and timegrain
Browse files Browse the repository at this point in the history
We keep track of the timestamp/timegrain when we collected a metric
value so we know when to collect it again.
  • Loading branch information
zmoog committed Oct 30, 2023
1 parent ceb3909 commit 432999d
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 12 deletions.
153 changes: 142 additions & 11 deletions x-pack/metricbeat/module/azure/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,56 @@ import (
"github.com/elastic/elastic-agent-libs/logp"
)

func NewMetricRegistry() *MetricRegistry {
return &MetricRegistry{
collectionsInfo: make(map[string]MetricCollectionInfo),
}
}

type MetricRegistry struct {
collectionsInfo map[string]MetricCollectionInfo
}

func (m *MetricRegistry) Register(metric Metric, info MetricCollectionInfo) {
m.collectionsInfo[m.buildMetricKey(metric)] = info
}

func (m *MetricRegistry) NeedsUpdate(metric Metric) bool {
metricKey := m.buildMetricKey(metric)

if info, exists := m.collectionsInfo[metricKey]; exists {
duration := convertTimegrainToDuration(info.timeGrain)
if info.timestamp.After(time.Now().Add(duration * (-1))) {
return false
}
}

return true
}

func (m *MetricRegistry) buildMetricKey(metric Metric) string {
keyComponents := []string{
metric.Namespace,
metric.ResourceId,
}
keyComponents = append(keyComponents, metric.Names...)

return strings.Join(keyComponents, ",")
}

type MetricCollectionInfo struct {
timestamp time.Time
timeGrain string
}

// Client represents the azure client which will make use of the azure sdk go metrics related clients
type Client struct {
AzureMonitorService Service
Config Config
ResourceConfigurations ResourceConfiguration
Log *logp.Logger
Resources []Resource
MetricRegistry *MetricRegistry
}

// mapResourceMetrics function type will map the configuration options to client metrics (depending on the metricset)
Expand All @@ -39,6 +82,7 @@ func NewClient(config Config) (*Client, error) {
AzureMonitorService: azureMonitorService,
Config: config,
Log: logp.NewLogger("azure monitor client"),
MetricRegistry: NewMetricRegistry(),
}

client.ResourceConfigurations.RefreshInterval = config.RefreshListInterval
Expand Down Expand Up @@ -97,26 +141,95 @@ func (client *Client) InitResources(fn mapResourceMetrics) error {
client.Log.Debug("no resources were found based on all the configurations options entered")
}
client.ResourceConfigurations.Metrics = metrics

return nil
}

// GetMetricValues returns the specified metric data points for the specified resource ID/namespace.
func (client *Client) GetMetricValues(metrics []Metric, report mb.ReporterV2) []Metric {
var resultedMetrics []Metric

// Same end time for all metrics in the same batch
referenceTime := time.Now().UTC()

// loop over the set of metrics
for _, metric := range metrics {
// select period to collect metrics, will double the interval value in order to retrieve any missing values
//if timegrain is larger than intervalx2 then interval will be assigned the timegrain value
interval := client.Config.Period
if t := convertTimegrainToDuration(metric.TimeGrain); t > interval*2 {
interval = t
}
//duration := convertTimegrainToDuration(metric.TimeGrain)
//if t := convertTimegrainToDuration(metric.TimeGrain); t > interval*2 {
// interval = t
//}

// copy the reference time
endTime := referenceTime

// Fetch in the range [{-2xINTERVAL},{-INTERVAL}) with a delay of {INTERVAL}
// It results in one data point {-2xINTERVAL} per call
endTime := time.Now().UTC().Add(interval * (-1))
endTime = endTime.Add(interval * (-1))
startTime := endTime.Add(interval * (-1))
timespan := fmt.Sprintf("%s/%s", startTime.Format(time.RFC3339), endTime.Format(time.RFC3339))

if !client.MetricRegistry.NeedsUpdate(metric) {
continue
}

//metricKey := strings.Join(metric.Names, ",")
//metricKey += metric.ResourceId
//
//if metricInfo, exists := client.MetricRegistry[metricKey]; exists {
// duration := convertTimegrainToDuration(metricInfo.timeGrain)
// if metricInfo.timestamp.After(referenceTime.Add(duration * (-1))) {
// continue
// }
//}

// Interval math for timegrain > period
//if duration > client.Config.Period {
// inTimespan := false
//
// var diffSec = int64(endTime.Second() - startTime.Second())
//
// var diffMin = int64(endTime.Minute() - startTime.Minute())
// var diffMinDuration = time.Duration(diffMin) * time.Minute
//
// var diffHour = int64(endTime.Hour() - startTime.Hour())
// var diffHourDuration = time.Duration(diffHour) * time.Hour
//
// // If timegrain is unit 1 day, 1 hour or 1 min
// if duration == 24*time.Hour {
// startOfDay := endTime.Truncate(24 * time.Hour)
// if (startOfDay.Equal(startTime) || startOfDay.After(startTime)) && startOfDay.Before(endTime) {
// inTimespan = true
// }
//
// } else if duration >= time.Hour {
// if diffMin < 0 && diffHourDuration > 0 && diffHourDuration%duration == 0 {
// inTimespan = true
// }
// } else {
// if diffSec < 0 && diffMinDuration%duration == 0 {
// inTimespan = true
// }
// }
//
// // if the time grain mark is not within the sampling timespan,
// // remove that metric from the list in this batch and skip to the next one
// if !inTimespan {
// // Remove metric from list
// ind := 0
// for i, currentMetric := range client.ResourceConfigurations.Metrics {
// if matchMetrics(currentMetric, metric) {
// ind = i
// break
// }
// }
// client.ResourceConfigurations.Metrics = append(client.ResourceConfigurations.Metrics[:ind], client.ResourceConfigurations.Metrics[ind+1:]...)
// continue
// }
//}

// build the 'filter' parameter which will contain any dimensions configured
var filter string
if len(metric.Dimensions) > 0 {
Expand All @@ -126,7 +239,8 @@ func (client *Client) GetMetricValues(metrics []Metric, report mb.ReporterV2) []
}
filter = strings.Join(filterList, " AND ")
}
resp, timegrain, err := client.AzureMonitorService.GetMetricValues(

resp, timeGrain, err := client.AzureMonitorService.GetMetricValues(
metric.ResourceSubId,
metric.Namespace,
metric.TimeGrain,
Expand All @@ -140,23 +254,37 @@ func (client *Client) GetMetricValues(metrics []Metric, report mb.ReporterV2) []
client.Log.Error(err)
report.Error(err)
} else {

// Update the metric registry with the latest timestamp and time grain.
client.MetricRegistry.Register(metric, MetricCollectionInfo{
timeGrain: timeGrain,
timestamp: referenceTime,
})

//client.MetricRegistry[metricKey] = MetricCollectionInfo{
// timeGrain: timeGrain,
// timestamp: referenceTime,
//}

for i, currentMetric := range client.ResourceConfigurations.Metrics {
if matchMetrics(currentMetric, metric) {
current := mapMetricValues(resp, currentMetric.Values, endTime.Truncate(time.Minute).Add(interval*(-1)), endTime.Truncate(time.Minute))
//current := mapMetricValues(resp, currentMetric.Values, endTime.Truncate(time.Minute).Add(interval*(-1)), endTime.Truncate(time.Minute))
current := mapMetricValues(resp, currentMetric.Values)
client.ResourceConfigurations.Metrics[i].Values = current
if client.ResourceConfigurations.Metrics[i].TimeGrain == "" {
client.ResourceConfigurations.Metrics[i].TimeGrain = timegrain
client.ResourceConfigurations.Metrics[i].TimeGrain = timeGrain
}
resultedMetrics = append(resultedMetrics, client.ResourceConfigurations.Metrics[i])
}
}
}
}

return resultedMetrics
}

// CreateMetric function will create a client metric based on the resource and metrics configured
func (client *Client) CreateMetric(resourceId string, subResourceId string, namespace string, metrics []string, aggregations string, dimensions []Dimension, timegrain string) Metric {
func (client *Client) CreateMetric(resourceId string, subResourceId string, namespace string, metrics []string, aggregations string, dimensions []Dimension, timeGrain string) Metric {
if subResourceId == "" {
subResourceId = resourceId
}
Expand All @@ -167,18 +295,20 @@ func (client *Client) CreateMetric(resourceId string, subResourceId string, name
Names: metrics,
Dimensions: dimensions,
Aggregations: aggregations,
TimeGrain: timegrain,
TimeGrain: timeGrain,
}

for _, prevMet := range client.ResourceConfigurations.Metrics {
if len(prevMet.Values) != 0 && matchMetrics(prevMet, met) {
met.Values = prevMet.Values
}
}

return met
}

// MapMetricByPrimaryAggregation will map the primary aggregation of the metric definition to the client metric
func (client *Client) MapMetricByPrimaryAggregation(metrics []armmonitor.MetricDefinition, resourceId string, subResourceId string, namespace string, dim []Dimension, timegrain string) []Metric {
func (client *Client) MapMetricByPrimaryAggregation(metrics []armmonitor.MetricDefinition, resourceId string, subResourceId string, namespace string, dim []Dimension, timeGrain string) []Metric {
var clientMetrics []Metric

Check failure on line 312 in x-pack/metricbeat/module/azure/client.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Consider pre-allocating `clientMetrics` (prealloc)

Check failure on line 312 in x-pack/metricbeat/module/azure/client.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Consider pre-allocating `clientMetrics` (prealloc)

metricGroups := make(map[string][]armmonitor.MetricDefinition)
Expand All @@ -192,8 +322,9 @@ func (client *Client) MapMetricByPrimaryAggregation(metrics []armmonitor.MetricD
for _, metricName := range metricGroup {
metricNames = append(metricNames, *metricName.Name.Value)
}
clientMetrics = append(clientMetrics, client.CreateMetric(resourceId, subResourceId, namespace, metricNames, key, dim, timegrain))
clientMetrics = append(clientMetrics, client.CreateMetric(resourceId, subResourceId, namespace, metricNames, key, dim, timeGrain))
}

return clientMetrics
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/metricbeat/module/azure/client_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const DefaultTimeGrain = "PT5M"
var instanceIdRegex = regexp.MustCompile(`.*?(\d+)$`)

// mapMetricValues should map the metric values
func mapMetricValues(metrics []armmonitor.Metric, previousMetrics []MetricValue, startTime time.Time, endTime time.Time) []MetricValue {
func mapMetricValues(metrics []armmonitor.Metric, previousMetrics []MetricValue) []MetricValue {
var currentMetrics []MetricValue
// compare with the previously returned values and filter out any double records
for _, v := range metrics {
Expand Down
2 changes: 2 additions & 0 deletions x-pack/metricbeat/module/azure/monitor/client_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func mapMetrics(client *azure.Client, resources []*armresources.GenericResourceE
if err != nil {
return nil, fmt.Errorf("no metric definitions were found for resource %s and namespace %s %w", *resource.ID, metric.Namespace, err)
}

if len(metricDefinitions.Value) == 0 {
if metric.IgnoreUnsupported {
client.Log.Infof(missingNamespace, *resource.ID, metric.Namespace)
Expand Down Expand Up @@ -63,6 +64,7 @@ func mapMetrics(client *azure.Client, resources []*armresources.GenericResourceE
}
}
}

return metrics, nil
}

Expand Down

0 comments on commit 432999d

Please sign in to comment.