Skip to content

Commit

Permalink
inlet/core: fix caching when setting interface name or description
Browse files Browse the repository at this point in the history
Also, move writing names and descriptions to flow schema in
interface/exporter classification.

Fix #586
  • Loading branch information
vincentbernat committed Mar 22, 2023
1 parent a8222e7 commit 59c0e25
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 20 deletions.
4 changes: 4 additions & 0 deletions console/data/docs/99-changelog.md
Expand Up @@ -11,6 +11,10 @@ identified with a specific icon:
- 🩹: bug fix
- 🌱: miscellaneous change

## Unreleased

- 🩹 *inlet*: fix caching when setting interface name or description

## 1.8.1 - 2023-03-04

- 🩹 *console*: fix subnet aggregation when IPv4 or IPv6 is set to its default value
Expand Down
42 changes: 22 additions & 20 deletions inlet/core/enricher.go
Expand Up @@ -85,10 +85,10 @@ func (c *Component) enrichFlow(exporterIP netip.Addr, exporterStr string, flow *
// Classification
if !c.classifyExporter(t, exporterStr, flowExporterName, flow) ||
!c.classifyInterface(t, exporterStr, flowExporterName, flow,
flowOutIfIndex, &flowOutIfName, &flowOutIfDescription, flowOutIfSpeed, flowOutIfVlan,
flowOutIfIndex, flowOutIfName, flowOutIfDescription, flowOutIfSpeed, flowOutIfVlan,
false) ||
!c.classifyInterface(t, exporterStr, flowExporterName, flow,
flowInIfIndex, &flowInIfName, &flowInIfDescription, flowInIfSpeed, flowInIfVlan,
flowInIfIndex, flowInIfName, flowInIfDescription, flowInIfSpeed, flowInIfVlan,
true) {
// Flow is rejected
return true
Expand Down Expand Up @@ -118,10 +118,6 @@ func (c *Component) enrichFlow(exporterIP netip.Addr, exporterStr string, flow *
}

c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnExporterName, []byte(flowExporterName))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnInIfName, []byte(flowInIfName))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnInIfDescription, []byte(flowInIfDescription))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnOutIfName, []byte(flowOutIfName))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnOutIfDescription, []byte(flowOutIfDescription))
c.d.Schema.ProtobufAppendVarint(flow, schema.ColumnInIfSpeed, uint64(flowInIfSpeed))
c.d.Schema.ProtobufAppendVarint(flow, schema.ColumnOutIfSpeed, uint64(flowOutIfSpeed))

Expand Down Expand Up @@ -186,8 +182,7 @@ func (c *Component) classifyExporter(t time.Time, ip string, name string, flow *
Str("exporter", name).
Msg("error executing classifier")
c.metrics.classifierErrors.WithLabelValues("exporter", strconv.Itoa(idx)).Inc()
c.classifierExporterCache.Put(t, si, classification)
return true // on error, we don't drop the flow
break
}
if classification.Group == "" || classification.Role == "" || classification.Site == "" || classification.Region == "" || classification.Tenant == "" {
continue
Expand All @@ -203,26 +198,34 @@ func (c *Component) writeInterface(flow *schema.FlowMessage, classification inte
return false
}
if directionIn {
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnInIfName, []byte(classification.Name))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnInIfDescription, []byte(classification.Description))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnInIfConnectivity, []byte(classification.Connectivity))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnInIfProvider, []byte(classification.Provider))
c.d.Schema.ProtobufAppendVarint(flow, schema.ColumnInIfBoundary, uint64(classification.Boundary))
} else {
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnOutIfName, []byte(classification.Name))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnOutIfDescription, []byte(classification.Description))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnOutIfConnectivity, []byte(classification.Connectivity))
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnOutIfProvider, []byte(classification.Provider))
c.d.Schema.ProtobufAppendVarint(flow, schema.ColumnOutIfBoundary, uint64(classification.Boundary))
}
return true
}

func (c *Component) classifyInterface(t time.Time, ip string, exporterName string, fl *schema.FlowMessage, ifIndex uint32, ifName, ifDescription *string, ifSpeed uint32, ifVlan uint16, directionIn bool) bool {
func (c *Component) classifyInterface(t time.Time, ip string, exporterName string, fl *schema.FlowMessage, ifIndex uint32, ifName, ifDescription string, ifSpeed uint32, ifVlan uint16, directionIn bool) bool {
if len(c.config.InterfaceClassifiers) == 0 {
c.writeInterface(fl, interfaceClassification{
Name: ifName,
Description: ifDescription,
}, directionIn)
return true
}
si := exporterInfo{IP: ip, Name: exporterName}
ii := interfaceInfo{
Index: ifIndex,
Name: *ifName,
Description: *ifDescription,
Name: ifName,
Description: ifDescription,
Speed: ifSpeed,
VLAN: ifVlan,
}
Expand All @@ -242,17 +245,10 @@ func (c *Component) classifyInterface(t time.Time, ip string, exporterName strin
Str("type", "interface").
Int("index", idx).
Str("exporter", exporterName).
Str("interface", *ifName).
Str("interface", ifName).
Msg("error executing classifier")
c.metrics.classifierErrors.WithLabelValues("interface", strconv.Itoa(idx)).Inc()
c.classifierInterfaceCache.Put(t, key, classification)
return true // on error, we don't drop the flow
}
if classification.Name != "" {
*ifName = classification.Name
}
if classification.Description != "" {
*ifDescription = classification.Description
break
}
if classification.Connectivity == "" || classification.Provider == "" {
continue
Expand All @@ -262,6 +258,12 @@ func (c *Component) classifyInterface(t time.Time, ip string, exporterName strin
}
break
}
if classification.Name == "" {
classification.Name = ifName
}
if classification.Description == "" {
classification.Description = ifDescription
}
c.classifierInterfaceCache.Put(t, key, classification)
return c.writeInterface(fl, classification, directionIn)
}
Expand Down

0 comments on commit 59c0e25

Please sign in to comment.