Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[auditbeat] - Migration of system/package module to flatbuffers #34817

Merged
merged 19 commits into from Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
dcf6e76
initial working commit
ShourieG Mar 10, 2023
870305f
flatbuffers migration now working with tests
ShourieG Mar 13, 2023
cc8592b
updated changelog
ShourieG Mar 13, 2023
c38ae22
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Mar 13, 2023
1cffa01
removed comments
ShourieG Mar 13, 2023
aaf3fc2
resolved PR suggetions and updated license
ShourieG Mar 14, 2023
5d0f6e4
updated notice
ShourieG Mar 14, 2023
4ac7ef8
updated tests, added benchmarks, renamed files
ShourieG Mar 15, 2023
2d55de8
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Mar 20, 2023
7525656
updated tests & added error storage in schema
ShourieG Mar 20, 2023
7166fb3
license issue fixed
ShourieG Mar 20, 2023
4d7bee9
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Mar 23, 2023
d6f553a
updated according to PR suggestions
ShourieG Mar 23, 2023
2aa8112
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Mar 28, 2023
892e37e
refactored package migration based on bucket version and is now done …
ShourieG Mar 29, 2023
15e1fd1
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Mar 29, 2023
f8369d9
Merge remote-tracking branch 'elastic/main' into auditbeat/flat_buffers
andrewkroh Mar 31, 2023
5096686
Perform migration inside of a bolt transaction, fix linter warnings
andrewkroh Mar 31, 2023
c7feb02
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Apr 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Expand Up @@ -245,7 +245,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Mention `mito` CEL tool in CEL input docs. {pull}34959[34959]

*Auditbeat*

- Migration of system/package module storage from gob encoding to flatbuffer encoding in bolt db. {pull}34817[34817]

*Filebeat*

Expand Down
93 changes: 93 additions & 0 deletions auditbeat/datastore/datastore.go
Expand Up @@ -46,10 +46,38 @@ func OpenBucket(name string) (Bucket, error) {
return ds.OpenBucket(name)
}

// BucketExists returns a new Bucket if it already exists but does not create a new one.
// The returned Bucket must be closed when finished to ensure all resources
// are released.
func BucketExists(name string) (Bucket, error) {
initDatastoreOnce.Do(func() {
ds = &boltDatastore{
path: paths.Resolve(paths.Data, "beat.db"),
mode: 0o600,
}
})

return ds.BucketExists(name)
}

// DeleteBucket deletes an existing bucket
func DeleteBucket(name string) error {
initDatastoreOnce.Do(func() {
ds = &boltDatastore{
path: paths.Resolve(paths.Data, "beat.db"),
mode: 0o600,
}
})

return ds.DeleteBucket(name)
}

// Datastore

type Datastore interface {
OpenBucket(name string) (Bucket, error)
BucketExists(name string) (Bucket, error)
DeleteBucket(name string) error
}

type boltDatastore struct {
Expand Down Expand Up @@ -89,6 +117,71 @@ func (ds *boltDatastore) OpenBucket(bucket string) (Bucket, error) {
return &boltBucket{ds, bucket}, nil
}

// BucketExists, checks if a bucket exists in the given database
// and if true returns a handle to it.
func (ds *boltDatastore) BucketExists(bucket string) (Bucket, error) {
ds.mutex.Lock()
defer ds.mutex.Unlock()

// Initialize the Bolt DB.
if ds.db == nil {
var err error
ds.db, err = bolt.Open(ds.path, ds.mode, nil)
if err != nil {
return nil, err
}
}

var bucketExists bool
err := ds.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(bucket))
if bucket != nil {
bucketExists = true
}

return nil
})

if err != nil {
return nil, err
}

if !bucketExists {
return nil, nil
}

return &boltBucket{ds, bucket}, nil
}

// DeleteBucket, deletes the given bucket
func (ds *boltDatastore) DeleteBucket(name string) error {
ds.mutex.Lock()
defer ds.mutex.Unlock()

// Initialize the Bolt DB.
if ds.db == nil {
var err error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is repeated a lot. How about adding a boltDatastore.init() error method.

ds.db, err = bolt.Open(ds.path, ds.mode, nil)
if err != nil {
return err
}
}

err := ds.db.Update(func(tx *bolt.Tx) error {
err := tx.DeleteBucket([]byte(name))
if err != nil {
return err
}
return nil
})

if err != nil {
return err
}

return nil
}

func (ds *boltDatastore) done() {
ds.mutex.Lock()
defer ds.mutex.Unlock()
Expand Down
193 changes: 193 additions & 0 deletions x-pack/auditbeat/module/system/package/flatbuffers.go
@@ -0,0 +1,193 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !windows
// +build !windows

package pkg

import (
"errors"
"fmt"
"sync"
"time"

flatbuffers "github.com/google/flatbuffers/go"

"github.com/elastic/beats/v7/x-pack/auditbeat/module/system/package/schema"
)

// Requires the Google flatbuffer compiler and Elastic go-licenser.
//go:generate flatc --go schema.fbs
//go:generate go-licenser schema
ShourieG marked this conversation as resolved.
Show resolved Hide resolved

var bufferPool sync.Pool

func init() {
bufferPool.New = func() interface{} {
return flatbuffers.NewBuilder(1024)
}
}

// fbGetBuilder returns a Builder that can be used for encoding data. The builder
// should be put back into the pool by invoking the put function after the encoded bytes
// are no longer in used (i.e. a copy of b.FinishedBytes() has been made).
func fbGetBuilder() (b *flatbuffers.Builder, put func()) {
b = bufferPool.Get().(*flatbuffers.Builder)
b.Reset()
return b, func() { bufferPool.Put(b) }
}

// encodePackages, encodes an array of packages by creating a vector of packages and tracking offsets. It uses the
// func fbEncodePackage to encode individual packages, and returns a []byte containing the encoded data
func encodePackages(builder *flatbuffers.Builder, packages []*Package) []byte {
offsets := make([]flatbuffers.UOffsetT, len(packages))

for i, p := range packages {
offsets[i] = fbEncodePackage(builder, p)
}
schema.PackageContainerStartPackagesVector(builder, len(offsets))
for _, offset := range offsets {
builder.PrependUOffsetT(offset)
}
packageContainerVector := builder.EndVector(len(offsets))
schema.PackageContainerStart(builder)
schema.PackageContainerAddPackages(builder, packageContainerVector)
root := schema.PackageContainerEnd(builder)
builder.Finish(root)
return builder.FinishedBytes()
}

// fbEncodePackage encodes the given Package to a flatbuffer. The returned bytes
// are a pointer into the Builder's memory.
func fbEncodePackage(b *flatbuffers.Builder, p *Package) flatbuffers.UOffsetT {
if p == nil {
return 0
}

return fbWritePackage(b, p)
}

func fbWritePackage(b *flatbuffers.Builder, p *Package) flatbuffers.UOffsetT {
if p == nil {
return 0
}

var (
packageNameOffset,
packageVersionOffset,
packageReleaseOffset,
packageArchOffset,
packageLicenseOffset,
packageSummaryOffset,
packageURLOffset,
packageTypeOffset,
packageErrorOffset flatbuffers.UOffsetT
)

if p.Name != "" {
packageNameOffset = b.CreateString(p.Name)
}
if p.Version != "" {
packageVersionOffset = b.CreateString(p.Version)
}
if p.Release != "" {
packageReleaseOffset = b.CreateString(p.Release)
}
if p.Arch != "" {
packageArchOffset = b.CreateString(p.Arch)
}
if p.License != "" {
packageLicenseOffset = b.CreateString(p.License)
}
if p.Summary != "" {
packageSummaryOffset = b.CreateString(p.Summary)
}
if p.URL != "" {
packageURLOffset = b.CreateString(p.URL)
}
if p.Type != "" {
packageTypeOffset = b.CreateString(p.Type)
}
if p.error != nil {
packageErrorOffset = b.CreateString(p.error.Error())
}

schema.PackageStart(b)
schema.PackageAddInstalltime(b, uint64(p.InstallTime.Unix()))
schema.PackageAddSize(b, p.Size)

if packageNameOffset > 0 {
schema.PackageAddName(b, packageNameOffset)
}
if packageVersionOffset > 0 {
schema.PackageAddVersion(b, packageVersionOffset)
}
if packageReleaseOffset > 0 {
schema.PackageAddRelease(b, packageReleaseOffset)
}
if packageArchOffset > 0 {
schema.PackageAddArch(b, packageArchOffset)
}
if packageLicenseOffset > 0 {
schema.PackageAddLicense(b, packageLicenseOffset)
}
if packageSummaryOffset > 0 {
schema.PackageAddSummary(b, packageSummaryOffset)
}
if packageURLOffset > 0 {
schema.PackageAddUrl(b, packageURLOffset)
}
if packageTypeOffset > 0 {
schema.PackageAddType(b, packageTypeOffset)
}
if packageErrorOffset > 0 {
schema.PackageAddError(b, packageErrorOffset)
}

return schema.PackageEnd(b)
}

// decodePackagesFromContainer, accepts a flatbuffer encoded byte slice, and decodes
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
// each package from the container vector with the help of he func fbDecodePackage.
// It returns an array of package objects.
func decodePackagesFromContainer(data []byte) ([]*Package, error) {
var packages []*Package
container := schema.GetRootAsPackageContainer(data, 0)
for i := 0; i < container.PackagesLength(); i++ {
sPkg := schema.Package{}
done := container.Packages(&sPkg, i)
if !done {
return nil, fmt.Errorf("failed to load package at container vector position: %d", i)
} else {
p := fbDecodePackage(&sPkg)
packages = append(packages, p)
}
}
return packages, nil
}

// fbDecodePackage decodes flatbuffer package data and copies it into a Package
// object that is returned.
func fbDecodePackage(p *schema.Package) *Package {
var err error
if string(p.Error()) != "" {
err = errors.New(string(p.Error()))
}

return &Package{
Name: string(p.Name()),
Version: string(p.Version()),
Release: string(p.Release()),
Arch: string(p.Arch()),
License: string(p.License()),
InstallTime: time.Unix(int64(p.Installtime()), 0).UTC(),
Size: p.Size(),
Summary: string(p.Summary()),
URL: string(p.Url()),
Type: string(p.Type()),
error: err,
}

ShourieG marked this conversation as resolved.
Show resolved Hide resolved
}