Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[auditbeat] - Migration of system/package module to flatbuffers #34817

Merged
merged 19 commits into from Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
dcf6e76
initial working commit
ShourieG Mar 10, 2023
870305f
flatbuffers migration now working with tests
ShourieG Mar 13, 2023
cc8592b
updated changelog
ShourieG Mar 13, 2023
c38ae22
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Mar 13, 2023
1cffa01
removed comments
ShourieG Mar 13, 2023
aaf3fc2
resolved PR suggetions and updated license
ShourieG Mar 14, 2023
5d0f6e4
updated notice
ShourieG Mar 14, 2023
4ac7ef8
updated tests, added benchmarks, renamed files
ShourieG Mar 15, 2023
2d55de8
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Mar 20, 2023
7525656
updated tests & added error storage in schema
ShourieG Mar 20, 2023
7166fb3
license issue fixed
ShourieG Mar 20, 2023
4d7bee9
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Mar 23, 2023
d6f553a
updated according to PR suggestions
ShourieG Mar 23, 2023
2aa8112
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Mar 28, 2023
892e37e
refactored package migration based on bucket version and is now done …
ShourieG Mar 29, 2023
15e1fd1
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Mar 29, 2023
f8369d9
Merge remote-tracking branch 'elastic/main' into auditbeat/flat_buffers
andrewkroh Mar 31, 2023
5096686
Perform migration inside of a bolt transaction, fix linter warnings
andrewkroh Mar 31, 2023
c7feb02
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Apr 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Expand Up @@ -239,7 +239,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Add metrics documentation for CEL and AWS CloudWatch inputs. {issue}34887[34887] {pull}34889[34889]

*Auditbeat*

- Migration of system/package module storage from gob encoding to flatbuffer encoding in bolt db. {pull}34817[34817]

*Filebeat*

Expand Down
193 changes: 193 additions & 0 deletions x-pack/auditbeat/module/system/package/flatbuff.go
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
@@ -0,0 +1,193 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !windows
// +build !windows

package pkg

import (
"errors"
"fmt"
"sync"
"time"

flatbuffers "github.com/google/flatbuffers/go"

"github.com/elastic/beats/v7/x-pack/auditbeat/module/system/package/schema"
)

// Requires the Google flatbuffer compiler and Elastic go-licenser.
//go:generate flatc --go schema.fbs
//go:generate go-licenser schema

var bufferPool sync.Pool

func init() {
bufferPool.New = func() interface{} {
return flatbuffers.NewBuilder(1024)
}
}

// fbGetBuilder returns a Builder that can be used for encoding data. The builder
// should be put back into the pool by invoking the put function after the encoded bytes
// are no longer in used (i.e. a copy of b.FinishedBytes() has been made).
func fbGetBuilder() (b *flatbuffers.Builder, put func()) {
b = bufferPool.Get().(*flatbuffers.Builder)
b.Reset()
return b, func() { bufferPool.Put(b) }
}

// encodePackages, encodes an array of packages by creating a vector of packages and tracking offsets. It uses the
// func fbEncodePackage to encode individual packages, and returns a []byte containing the encoded data
func encodePackages(builder *flatbuffers.Builder, packages []*Package) []byte {
offsets := make([]flatbuffers.UOffsetT, len(packages))

for i, p := range packages {
offsets[i] = fbEncodePackage(builder, p)
}
schema.PackageContainerStartPackagesVector(builder, len(offsets))
for _, offset := range offsets {
builder.PrependUOffsetT(offset)
}
packageContainerVector := builder.EndVector(len(offsets))
schema.PackageContainerStart(builder)
schema.PackageContainerAddPackages(builder, packageContainerVector)
root := schema.PackageContainerEnd(builder)
builder.Finish(root)
return builder.FinishedBytes()
}

// fbEncodePackage encodes the given Package to a flatbuffer. The returned bytes
// are a pointer into the Builder's memory.
func fbEncodePackage(b *flatbuffers.Builder, p *Package) flatbuffers.UOffsetT {
if p == nil {
return 0
}

return fbWritePackage(b, p)
}

func fbWritePackage(b *flatbuffers.Builder, p *Package) flatbuffers.UOffsetT {
if p == nil {
return 0
}

var (
packageNameOffset,
packageVersionOffset,
packageReleaseOffset,
packageArchOffset,
packageLicenseOffset,
packageSummaryOffset,
packageURLOffset,
packageTypeOffset,
packageErrorOffset flatbuffers.UOffsetT
)

if p.Name != "" {
packageNameOffset = b.CreateString(p.Name)
}
if p.Version != "" {
packageVersionOffset = b.CreateString(p.Version)
}
if p.Release != "" {
packageReleaseOffset = b.CreateString(p.Release)
}
if p.Arch != "" {
packageArchOffset = b.CreateString(p.Arch)
}
if p.License != "" {
packageLicenseOffset = b.CreateString(p.License)
}
if p.Summary != "" {
packageSummaryOffset = b.CreateString(p.Summary)
}
if p.URL != "" {
packageURLOffset = b.CreateString(p.URL)
}
if p.Type != "" {
packageTypeOffset = b.CreateString(p.Type)
}
if p.error != nil {
packageErrorOffset = b.CreateString(p.error.Error())
}

schema.PackageStart(b)
schema.PackageAddInstalltime(b, uint64(p.InstallTime.Unix()))
schema.PackageAddSize(b, p.Size)

if packageNameOffset > 0 {
schema.PackageAddName(b, packageNameOffset)
}
if packageVersionOffset > 0 {
schema.PackageAddVersion(b, packageVersionOffset)
}
if packageReleaseOffset > 0 {
schema.PackageAddRelease(b, packageReleaseOffset)
}
if packageArchOffset > 0 {
schema.PackageAddArch(b, packageArchOffset)
}
if packageLicenseOffset > 0 {
schema.PackageAddLicense(b, packageLicenseOffset)
}
if packageSummaryOffset > 0 {
schema.PackageAddSummary(b, packageSummaryOffset)
}
if packageURLOffset > 0 {
schema.PackageAddUrl(b, packageURLOffset)
}
if packageTypeOffset > 0 {
schema.PackageAddType(b, packageTypeOffset)
}
if packageErrorOffset > 0 {
schema.PackageAddError(b, packageErrorOffset)
}

return schema.PackageEnd(b)
}

// decodePackagesFromContainer, accepts a flatbuffer encoded byte slice, and decodes
// each package from the container vector with the help of he func fbDecodePackage.
// It returns an array of package objects.
func decodePackagesFromContainer(data []byte) ([]*Package, error) {
var packages []*Package
container := schema.GetRootAsPackageContainer(data, 0)
for i := 0; i < container.PackagesLength(); i++ {
sPkg := schema.Package{}
done := container.Packages(&sPkg, i)
if !done {
return nil, fmt.Errorf("failed to load package at container vector position: %d", i)
} else {
p := fbDecodePackage(&sPkg)
packages = append(packages, p)
}
}
return packages, nil
}

// fbDecodePackage decodes flatbuffer package data and copies it into a Package
// object that is returned.
func fbDecodePackage(p *schema.Package) *Package {
var err error
if string(p.Error()) != "" {
err = errors.New(string(p.Error()))
}

return &Package{
Name: string(p.Name()),
Version: string(p.Version()),
Release: string(p.Release()),
Arch: string(p.Arch()),
License: string(p.License()),
InstallTime: time.Unix(int64(p.Installtime()), 0).UTC(),
Size: p.Size(),
Summary: string(p.Summary()),
URL: string(p.Url()),
Type: string(p.Type()),
error: err,
}

ShourieG marked this conversation as resolved.
Show resolved Hide resolved
}
148 changes: 148 additions & 0 deletions x-pack/auditbeat/module/system/package/flatbuff_test.go
@@ -0,0 +1,148 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !windows
// +build !windows

package pkg

import (
"errors"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func testPackage() []*Package {
return []*Package{
{
Name: "foo",
Version: "1.2.3",
Release: "1",
Arch: "amd64",
License: "bar",
InstallTime: time.Unix(1591021924, 0).UTC(),
Size: 1234,
Summary: "Foo stuff",
URL: "http://foo.example.com",
Type: "rpm",
},
{
Name: "csv",
Version: "1.5.7",
Release: "2",
Arch: "amd64",
License: "bar",
InstallTime: time.Unix(1591021924, 0).UTC(),
Size: 2456,
Summary: "Csv stuff",
URL: "http://csv.example.com",
Type: "rpm",
},
{
Name: "vscode",
Version: "1.5.7",
Release: "2",
Arch: "",
License: "",
InstallTime: time.Time{},
Size: 0,
Summary: "",
URL: "",
Type: "",
error: errors.New("error unmarshalling JSON in /homebrew/Cellar: invalid JSON"),
},
}
}

func TestFBEncodeDecode(t *testing.T) {
p := testPackage()
builder, release := fbGetBuilder()
defer release()
data := encodePackages(builder, p)
t.Log("encoded length:", len(data))

out, err := decodePackagesFromContainer(data)
if err != nil {
t.Error(err)
}

// since decoded slice is in reversed order
for i, j := 0, len(out)-1; i < j; i, j = i+1, j-1 {
out[i], out[j] = out[j], out[i]
}

assert.Equal(t, len(p), len(out))
for i := 0; i < len(p); i++ {
assert.Equal(t, p[i], out[i])
}
}

// tests if the bufferPool is being shared in an unintended manner
func TestPoolPoison(t *testing.T) {
p := testPackage()
input := [][]*Package{p[:2], p[2:]}

var wg sync.WaitGroup

for i := 0; i < 10; i++ {
wg.Add(1)
go poolPoison(input[i%2], t, &wg)
}
wg.Wait()

}

func poolPoison(p []*Package, t *testing.T, wg *sync.WaitGroup) {
for k := 0; k < 1000; k++ {
builder, release := fbGetBuilder()
defer release()
data := encodePackages(builder, p)
t.Log("encoded length:", len(data))

out, err := decodePackagesFromContainer(data)
if err != nil {
t.Error(err)
}

// since decoded slice is in reversed order
for i, j := 0, len(out)-1; i < j; i, j = i+1, j-1 {
out[i], out[j] = out[j], out[i]
}

assert.Equal(t, len(p), len(out))
for i := 0; i < len(p); i++ {
assert.Equal(t, p[i], out[i])
}
}
wg.Done()
}

func BenchmarkFBEncodePackages(b *testing.B) {
builder, release := fbGetBuilder()
defer release()
p := testPackage()
b.ResetTimer()

for i := 0; i < b.N; i++ {
builder.Reset()
encodePackages(builder, p)
}
}

func BenchmarkFBDecodePackages(b *testing.B) {
builder, release := fbGetBuilder()
defer release()
p := testPackage()
data := encodePackages(builder, p)
b.ResetTimer()

for i := 0; i < b.N; i++ {
if packages, err := decodePackagesFromContainer(data); err != nil || len(packages) == 0 {
b.Fatal("failed to decode")
}
}
}