Skip to content

Commit

Permalink
Merge pull request #676 from exaring/feature/netmask-preparation
Browse files Browse the repository at this point in the history
feat: prepare data structure to fetch prefix len/net mask from BMP
  • Loading branch information
vincentbernat committed May 27, 2023
2 parents a5a9752 + d15cdf2 commit 694a40a
Show file tree
Hide file tree
Showing 16 changed files with 212 additions and 53 deletions.
2 changes: 2 additions & 0 deletions common/schema/protobuf.go
Expand Up @@ -102,6 +102,8 @@ func (schema *Schema) ProtobufMarshal(bf *FlowMessage) []byte {
schema.ProtobufAppendIP(bf, ColumnExporterAddress, bf.ExporterAddress)
schema.ProtobufAppendVarint(bf, ColumnSrcAS, uint64(bf.SrcAS))
schema.ProtobufAppendVarint(bf, ColumnDstAS, uint64(bf.DstAS))
schema.ProtobufAppendVarint(bf, ColumnSrcNetMask, uint64(bf.SrcNetMask))
schema.ProtobufAppendVarint(bf, ColumnDstNetMask, uint64(bf.DstNetMask))
schema.ProtobufAppendIP(bf, ColumnSrcAddr, bf.SrcAddr)
schema.ProtobufAppendIP(bf, ColumnDstAddr, bf.DstAddr)
if !schema.IsDisabled(ColumnGroupL2) {
Expand Down
3 changes: 3 additions & 0 deletions common/schema/types.go
Expand Up @@ -91,6 +91,9 @@ type FlowMessage struct {
DstAS uint32
GotASPath bool

SrcNetMask uint8
DstNetMask uint8

// protobuf is the protobuf representation for the information not contained above.
protobuf []byte
protobufSet bitset.BitSet
Expand Down
6 changes: 6 additions & 0 deletions console/data/docs/02-configuration.md
Expand Up @@ -199,6 +199,12 @@ The following configuration keys are accepted:
from flow except if the ASN is private), `geoip`, `bmp`, and
`bmp-except-private`. The default value is `flow`, `bmp`, and
`geoip`.
- `net-providers` defines the sources for NetMasks/Prefixes.
`flow` uses the value provided by the flow message (if any),
while `bmp` looks it up using the BMP component (currently work in progress).
If multiple sources are provided, the value of the first source
providing a non-default route is taken.
The default value is `flow` and `bmp`.

Classifier rules are written using [expr][].

Expand Down
12 changes: 6 additions & 6 deletions demoexporter/flows/nfdata_test.go
Expand Up @@ -113,6 +113,8 @@ func TestGetNetflowData(t *testing.T) {
OutIf: 20,
SrcAS: 65201,
DstAS: 65202,
SrcNetMask: 24,
DstNetMask: 23,
ProtobufDebug: map[schema.ColumnKey]interface{}{
schema.ColumnBytes: 1500,
schema.ColumnPackets: 1,
Expand All @@ -121,8 +123,6 @@ func TestGetNetflowData(t *testing.T) {
schema.ColumnSrcPort: 443,
schema.ColumnDstPort: 34974,
schema.ColumnForwardingStatus: 64,
schema.ColumnSrcNetMask: 24,
schema.ColumnDstNetMask: 23,
},
},
&schema.FlowMessage{
Expand All @@ -134,6 +134,8 @@ func TestGetNetflowData(t *testing.T) {
OutIf: 20,
SrcAS: 65201,
DstAS: 65202,
SrcNetMask: 24,
DstNetMask: 24,
ProtobufDebug: map[schema.ColumnKey]interface{}{
schema.ColumnBytes: 1339,
schema.ColumnPackets: 1,
Expand All @@ -142,8 +144,6 @@ func TestGetNetflowData(t *testing.T) {
schema.ColumnSrcPort: 443,
schema.ColumnDstPort: 33199,
schema.ColumnForwardingStatus: 64,
schema.ColumnSrcNetMask: 24,
schema.ColumnDstNetMask: 24,
},
},
},
Expand All @@ -157,6 +157,8 @@ func TestGetNetflowData(t *testing.T) {
OutIf: 10,
SrcAS: 65201,
DstAS: 65202,
SrcNetMask: 48,
DstNetMask: 48,
ProtobufDebug: map[schema.ColumnKey]interface{}{
schema.ColumnBytes: 1300,
schema.ColumnPackets: 1,
Expand All @@ -165,8 +167,6 @@ func TestGetNetflowData(t *testing.T) {
schema.ColumnSrcPort: 33179,
schema.ColumnDstPort: 443,
schema.ColumnForwardingStatus: 64,
schema.ColumnSrcNetMask: 48,
schema.ColumnDstNetMask: 48,
},
},
},
Expand Down
2 changes: 2 additions & 0 deletions inlet/bmp/events.go
Expand Up @@ -300,6 +300,7 @@ func (c *Component) handleRouteMonitoring(pkey peerKey, body *bmp.BMPRouteMonito
plen += 96
}
p, _ := netip.AddrFromSlice(prefix)
rta.plen = uint8(plen)
added += c.rib.addPrefix(p, plen, route{
peer: pinfo.reference,
nlri: c.rib.nlris.Put(nlri{
Expand Down Expand Up @@ -393,6 +394,7 @@ func (c *Component) handleRouteMonitoring(pkey peerKey, body *bmp.BMPRouteMonito
}
switch attr.(type) {
case *bgp.PathAttributeMpReachNLRI:
rta.plen = uint8(plen)
added += c.rib.addPrefix(p, plen, route{
peer: pinfo.reference,
nlri: c.rib.nlris.Put(nlri{
Expand Down
7 changes: 7 additions & 0 deletions inlet/bmp/lookup.go
Expand Up @@ -16,6 +16,7 @@ type LookupResult struct {
ASPath []uint32
Communities []uint32
LargeCommunities []bgp.LargeCommunity
NetMask uint8
}

// Lookup lookups a route for the provided IP address. It favors the
Expand Down Expand Up @@ -55,10 +56,16 @@ func (c *Component) Lookup(ip netip.Addr, nh netip.Addr) LookupResult {
return LookupResult{}
}
attributes := c.rib.rtas.Get(routes[len(routes)-1].attributes)
// prefix len is v6 coded in the bmp rib. We need to substract 96 if it's a v4 prefix
plen := attributes.plen
if ip.Is4() || ip.Is4In6() {
plen = plen - 96
}
return LookupResult{
ASN: attributes.asn,
ASPath: attributes.asPath,
Communities: attributes.communities,
LargeCommunities: attributes.largeCommunities,
NetMask: plen,
}
}
1 change: 1 addition & 0 deletions inlet/bmp/rib.go
Expand Up @@ -77,6 +77,7 @@ type routeAttributes struct {
asn uint32
asPath []uint32
communities []uint32
plen uint8
// extendedCommunities []uint64
largeCommunities []bgp.LargeCommunity
}
Expand Down
6 changes: 5 additions & 1 deletion inlet/bmp/tests.go
Expand Up @@ -52,6 +52,7 @@ func (c *Component) PopulateRIB(t *testing.T) {
asPath: []uint32{64200, 1299, 174},
communities: []uint32{100, 200, 400},
largeCommunities: []bgp.LargeCommunity{{ASN: 64200, LocalData1: 2, LocalData2: 3}},
plen: 96 + 27,
}),
})
c.rib.addPrefix(netip.MustParseAddr("::ffff:192.0.2.0"), 96+27, route{
Expand All @@ -62,6 +63,7 @@ func (c *Component) PopulateRIB(t *testing.T) {
asn: 174,
asPath: []uint32{64200, 174, 174, 174},
communities: []uint32{100},
plen: 96 + 27,
}),
})
c.rib.addPrefix(netip.MustParseAddr("::ffff:192.0.2.128"), 96+27, route{
Expand All @@ -72,14 +74,16 @@ func (c *Component) PopulateRIB(t *testing.T) {
asn: 1299,
asPath: []uint32{64200, 1299},
communities: []uint32{500},
plen: 96 + 27,
}),
})
c.rib.addPrefix(netip.MustParseAddr("::ffff:1.0.0.0"), 96+24, route{
peer: pinfo.reference,
nlri: c.rib.nlris.Put(nlri{family: bgp.RF_FS_IPv4_UC}),
nextHop: c.rib.nextHops.Put(nextHop(netip.MustParseAddr("::ffff:198.51.100.8"))),
attributes: c.rib.rtas.Put(routeAttributes{
asn: 65300,
asn: 65300,
plen: 96 + 24,
}),
})
}
Expand Down
49 changes: 46 additions & 3 deletions inlet/core/config.go
Expand Up @@ -31,7 +31,8 @@ type Configuration struct {
OverrideSamplingRate helpers.SubnetMap[uint]
// ASNProviders defines the source used to get AS numbers
ASNProviders []ASNProvider `validate:"dive"`

// NetProviders defines the source used to get Prefix/Network Information
NetProviders []NetProvider `validate:"dive"`
// Old configuration settings
classifierCacheSize uint
}
Expand All @@ -44,11 +45,16 @@ func DefaultConfiguration() Configuration {
InterfaceClassifiers: []InterfaceClassifierRule{},
ClassifierCacheDuration: 5 * time.Minute,
ASNProviders: []ASNProvider{ASNProviderFlow, ASNProviderBMP, ASNProviderGeoIP},
NetProviders: []NetProvider{NetProviderFlow, NetProviderBMP},
}
}

// ASNProvider describes one AS number provider.
type ASNProvider int
type (
// ASNProvider describes one AS number provider.
ASNProvider int
// NetProvider describes one network mask provider.
NetProvider int
)

const (
// ASNProviderFlow uses the AS number embedded in flows.
Expand Down Expand Up @@ -96,6 +102,43 @@ func (ap *ASNProvider) UnmarshalText(input []byte) error {
return errors.New("unknown provider")
}

const (
// NetProviderFlow uses the network mask embedded in flows, if any
NetProviderFlow NetProvider = iota
// NetProviderBMP uses looks the netmask up with BMP
NetProviderBMP
)

var netProviderMap = bimap.New(map[NetProvider]string{
NetProviderFlow: "flow",
NetProviderBMP: "bmp",
})

// MarshalText turns an AS provider to text.
func (np NetProvider) MarshalText() ([]byte, error) {
got, ok := netProviderMap.LoadValue(np)
if ok {
return []byte(got), nil
}
return nil, errors.New("unknown field")
}

// String turns an AS provider to string.
func (np NetProvider) String() string {
got, _ := netProviderMap.LoadValue(np)
return got
}

// UnmarshalText provides an AS provider from a string.
func (np *NetProvider) UnmarshalText(input []byte) error {
got, ok := netProviderMap.LoadKey(string(input))
if ok {
*np = got
return nil
}
return errors.New("unknown provider")
}

// ConfigurationUnmarshallerHook normalize core configuration:
// - replace ignore-asn-from-flow by asn-providers
func ConfigurationUnmarshallerHook() mapstructure.DecodeHookFunc {
Expand Down
11 changes: 11 additions & 0 deletions inlet/core/config_test.go
Expand Up @@ -70,6 +70,17 @@ func TestConfigurationUnmarshallerHook(t *testing.T) {
Expected: Configuration{
ASNProviders: []ASNProvider{ASNProviderFlowExceptPrivate, ASNProviderGeoIP, ASNProviderFlow},
},
}, {
Description: "net-providers",
Initial: func() interface{} { return Configuration{} },
Configuration: func() interface{} {
return gin.H{
"net-providers": []string{"flow", "bmp"},
}
},
Expected: Configuration{
NetProviders: []NetProvider{NetProviderFlow, NetProviderBMP},
},
},
})
}
21 changes: 21 additions & 0 deletions inlet/core/enricher.go
Expand Up @@ -96,6 +96,11 @@ func (c *Component) enrichFlow(exporterIP netip.Addr, exporterStr string, flow *

sourceBMP := c.d.BMP.Lookup(flow.SrcAddr, netip.Addr{})
destBMP := c.d.BMP.Lookup(flow.DstAddr, flow.NextHop)
// set prefix len according to user config
flow.SrcNetMask = c.getNetMask(flow.SrcNetMask, sourceBMP.NetMask)
flow.DstNetMask = c.getNetMask(flow.DstNetMask, destBMP.NetMask)

// set asns according to user config
flow.SrcAS = c.getASNumber(flow.SrcAddr, flow.SrcAS, sourceBMP.ASN)
flow.DstAS = c.getASNumber(flow.DstAddr, flow.DstAS, destBMP.ASN)
c.d.Schema.ProtobufAppendBytes(flow, schema.ColumnSrcCountry, []byte(c.d.GeoIP.LookupCountry(flow.SrcAddr)))
Expand Down Expand Up @@ -152,6 +157,22 @@ func (c *Component) getASNumber(flowAddr netip.Addr, flowAS, bmpAS uint32) (asn
return asn
}

// getPfxLen retrieves the prefix length for a flow, depending on user preferences.
func (c *Component) getNetMask(flowMask, bmpMask uint8) (mask uint8) {
for _, provider := range c.config.NetProviders {
if mask != 0 {
break
}
switch provider {
case NetProviderFlow:
mask = flowMask
case NetProviderBMP:
mask = bmpMask
}
}
return mask
}

func (c *Component) writeExporter(flow *schema.FlowMessage, classification exporterClassification) bool {
if classification.Reject {
return false
Expand Down
60 changes: 60 additions & 0 deletions inlet/core/enricher_test.go
Expand Up @@ -485,6 +485,8 @@ ClassifyProviderRegex(Interface.Description, "^Transit: ([^ ]+)", "$1")`,
schema.ColumnDstLargeCommunitiesASN: []int32{64200},
schema.ColumnDstLargeCommunitiesLocalData1: []int32{2},
schema.ColumnDstLargeCommunitiesLocalData2: []int32{3},
schema.ColumnSrcNetMask: 27,
schema.ColumnDstNetMask: 27,
},
},
},
Expand Down Expand Up @@ -633,3 +635,61 @@ func TestGetASNumber(t *testing.T) {
})
}
}

func TestGetNetMask(t *testing.T) {
cases := []struct {
// Addr string
FlowNetMask uint8
BMPNetMask uint8
Providers []NetProvider
Expected uint8
}{
// Flow
{0, 0, []NetProvider{NetProviderFlow}, 0},
{32, 0, []NetProvider{NetProviderFlow}, 32},
{0, 16, []NetProvider{NetProviderFlow}, 0},
// BMP
{0, 0, []NetProvider{NetProviderBMP}, 0},
{32, 12, []NetProvider{NetProviderBMP}, 12},
{0, 16, []NetProvider{NetProviderBMP}, 16},
{24, 0, []NetProvider{NetProviderBMP}, 0},
// Both, the first provider with a non-default route is taken
{0, 0, []NetProvider{NetProviderBMP, NetProviderFlow}, 0},
{12, 0, []NetProvider{NetProviderBMP, NetProviderFlow}, 12},
{0, 13, []NetProvider{NetProviderBMP, NetProviderFlow}, 13},
{12, 0, []NetProvider{NetProviderBMP, NetProviderFlow}, 12},
{12, 24, []NetProvider{NetProviderBMP, NetProviderFlow}, 24},

{0, 0, []NetProvider{NetProviderFlow, NetProviderBMP}, 0},
{12, 0, []NetProvider{NetProviderFlow, NetProviderBMP}, 12},
{0, 13, []NetProvider{NetProviderFlow, NetProviderBMP}, 13},
{12, 0, []NetProvider{NetProviderFlow, NetProviderBMP}, 12},
{12, 24, []NetProvider{NetProviderFlow, NetProviderBMP}, 12},
}
for i, tc := range cases {
i++
t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) {
r := reporter.NewMock(t)

// We don't need all components as we won't start the component.
configuration := DefaultConfiguration()
configuration.NetProviders = tc.Providers
bmpComponent, _ := bmp.NewMock(t, r, bmp.DefaultConfiguration())
bmpComponent.PopulateRIB(t)

c, err := New(r, configuration, Dependencies{
Daemon: daemon.NewMock(t),
GeoIP: geoip.NewMock(t, r),
BMP: bmpComponent,
Schema: schema.NewMock(t),
})
if err != nil {
t.Fatalf("New() error:\n%+v", err)
}
got := c.getNetMask(tc.FlowNetMask, uint8(tc.BMPNetMask))
if diff := helpers.Diff(got, tc.Expected); diff != "" {
t.Fatalf("getNetMask() (-got, +want):\n%s", diff)
}
})
}
}
4 changes: 2 additions & 2 deletions inlet/flow/decoder/netflow/decode.go
Expand Up @@ -88,9 +88,9 @@ func (nd *Decoder) decodeRecord(fields []netflow.DataField) *schema.FlowMessage
etype = helpers.ETypeIPv6
bf.DstAddr = decodeIP(v)
case netflow.NFV9_FIELD_SRC_MASK, netflow.NFV9_FIELD_IPV6_SRC_MASK:
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcNetMask, decodeUNumber(v))
bf.SrcNetMask = uint8(decodeUNumber(v))
case netflow.NFV9_FIELD_DST_MASK, netflow.NFV9_FIELD_IPV6_DST_MASK:
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstNetMask, decodeUNumber(v))
bf.DstNetMask = uint8(decodeUNumber(v))
case netflow.NFV9_FIELD_IPV4_NEXT_HOP, netflow.NFV9_FIELD_BGP_IPV4_NEXT_HOP, netflow.NFV9_FIELD_IPV6_NEXT_HOP, netflow.NFV9_FIELD_BGP_IPV6_NEXT_HOP:
bf.NextHop = decodeIP(v)

Expand Down

0 comments on commit 694a40a

Please sign in to comment.