Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[auditbeat] - Migration of system/package module to flatbuffers #34817

Merged
merged 19 commits into from Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
dcf6e76
initial working commit
ShourieG Mar 10, 2023
870305f
flatbuffers migration now working with tests
ShourieG Mar 13, 2023
cc8592b
updated changelog
ShourieG Mar 13, 2023
c38ae22
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Mar 13, 2023
1cffa01
removed comments
ShourieG Mar 13, 2023
aaf3fc2
resolved PR suggetions and updated license
ShourieG Mar 14, 2023
5d0f6e4
updated notice
ShourieG Mar 14, 2023
4ac7ef8
updated tests, added benchmarks, renamed files
ShourieG Mar 15, 2023
2d55de8
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Mar 20, 2023
7525656
updated tests & added error storage in schema
ShourieG Mar 20, 2023
7166fb3
license issue fixed
ShourieG Mar 20, 2023
4d7bee9
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Mar 23, 2023
d6f553a
updated according to PR suggestions
ShourieG Mar 23, 2023
2aa8112
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Mar 28, 2023
892e37e
refactored package migration based on bucket version and is now done …
ShourieG Mar 29, 2023
15e1fd1
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Mar 29, 2023
f8369d9
Merge remote-tracking branch 'elastic/main' into auditbeat/flat_buffers
andrewkroh Mar 31, 2023
5096686
Perform migration inside of a bolt transaction, fix linter warnings
andrewkroh Mar 31, 2023
c7feb02
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Apr 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Expand Up @@ -253,7 +253,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Add nginx ingress_controller parsing if one of upstreams fails to return response {pull}34787[34787]

*Auditbeat*

- Migration of system/package module storage from gob encoding to flatbuffer encoding in bolt db. {pull}34817[34817]

*Filebeat*

Expand Down
51 changes: 43 additions & 8 deletions auditbeat/datastore/datastore.go
Expand Up @@ -46,10 +46,25 @@ func OpenBucket(name string) (Bucket, error) {
return ds.OpenBucket(name)
}

// Update executes a function within the context of a read-write managed transaction.
// If no error is returned from the function then the transaction is committed. If an
// error is returned then the entire transaction is rolled back.
func Update(fn func(tx *bolt.Tx) error) error {
initDatastoreOnce.Do(func() {
ds = &boltDatastore{
path: paths.Resolve(paths.Data, "beat.db"),
mode: 0o600,
}
})

return ds.Update(fn)
}

// Datastore

type Datastore interface {
OpenBucket(name string) (Bucket, error)
Update(fn func(tx *bolt.Tx) error) error
}

type boltDatastore struct {
Expand All @@ -67,14 +82,8 @@ func New(path string, mode os.FileMode) Datastore {
func (ds *boltDatastore) OpenBucket(bucket string) (Bucket, error) {
ds.mutex.Lock()
defer ds.mutex.Unlock()

// Initialize the Bolt DB.
if ds.db == nil {
var err error
ds.db, err = bolt.Open(ds.path, ds.mode, nil)
if err != nil {
return nil, err
}
if err := ds.init(); err != nil {
return nil, err
}

// Ensure the name exists.
Expand All @@ -89,6 +98,32 @@ func (ds *boltDatastore) OpenBucket(bucket string) (Bucket, error) {
return &boltBucket{ds, bucket}, nil
}

// Update executes a function within the context of a read-write managed transaction.
// If no error is returned from the function then the transaction is committed. If an
// error is returned then the entire transaction is rolled back.
func (ds *boltDatastore) Update(fn func(tx *bolt.Tx) error) error {
ds.mutex.Lock()
defer ds.mutex.Unlock()
if err := ds.init(); err != nil {
return err
}

return ds.db.Update(fn)
}

// init initializes the backing data store if it is not already open.
// Callers should hold the datastore mutex.
func (ds *boltDatastore) init() error {
if ds.db == nil {
var err error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is repeated a lot. How about adding a boltDatastore.init() error method.

ds.db, err = bolt.Open(ds.path, ds.mode, nil)
if err != nil {
return err
}
}
return nil
}

func (ds *boltDatastore) done() {
ds.mutex.Lock()
defer ds.mutex.Unlock()
Expand Down
26 changes: 20 additions & 6 deletions x-pack/auditbeat/module/system/package/_meta/data.json
Expand Up @@ -2,24 +2,38 @@
"@timestamp": "2017-10-12T08:05:34.853Z",
"event": {
"action": "existing_package",
"category": [
"package"
],
"dataset": "package",
"id": "5cafbacd-9288-4022-be30-521db563f669",
"id": "6bed65c5-9797-4fb7-9ec7-2d1873c54371",
"kind": "state",
"module": "system"
"module": "system",
"type": [
"info"
]
},
"message": "Package zstd (1.5.4) is already installed",
"package": {
"description": "Zstandard is a real-time compression algorithm",
"installed": "2023-02-15T20:40:24.390086982-05:00",
"name": "zstd",
"reference": "https://facebook.github.io/zstd/",
"type": "brew",
"version": "1.5.4"
},
"message": "Package zstd (1.3.5) is already installed",
"service": {
"type": "system"
},
"system": {
"audit": {
"package": {
"entity_id": "wsRV2fmTddn+/GHa",
"installtime": "2018-08-30T18:41:23.85657356+01:00",
"entity_id": "SxYD3ZMh/Ym0lBIk",
"installtime": "2023-02-15T20:40:24.390086982-05:00",
"name": "zstd",
"summary": "Zstandard is a real-time compression algorithm",
"url": "https://facebook.github.io/zstd/",
"version": "1.3.5"
"version": "1.5.4"
}
}
}
Expand Down
190 changes: 190 additions & 0 deletions x-pack/auditbeat/module/system/package/flatbuffers.go
@@ -0,0 +1,190 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !windows
// +build !windows

package pkg

import (
"errors"
"fmt"
"sync"
"time"

flatbuffers "github.com/google/flatbuffers/go"

"github.com/elastic/beats/v7/x-pack/auditbeat/module/system/package/schema"
)

// Requires the Google flatbuffer compiler and Elastic go-licenser.
//go:generate flatc --go schema.fbs
//go:generate go-licenser -license=Elastic schema

var bufferPool sync.Pool

func init() {
bufferPool.New = func() interface{} {
return flatbuffers.NewBuilder(1024)
}
}

// fbGetBuilder returns a Builder that can be used for encoding data. The builder
// should be put back into the pool by invoking the put function after the encoded bytes
// are no longer in used (i.e. a copy of b.FinishedBytes() has been made).
func fbGetBuilder() (b *flatbuffers.Builder, put func()) {
b = bufferPool.Get().(*flatbuffers.Builder)
b.Reset()
return b, func() { bufferPool.Put(b) }
}

// encodePackages encodes an array of packages by creating a vector of packages and tracking offsets. It uses the
// func fbEncodePackage to encode individual packages, and returns a []byte containing the encoded data.
func encodePackages(builder *flatbuffers.Builder, packages []*Package) []byte {
offsets := make([]flatbuffers.UOffsetT, len(packages))

for i, p := range packages {
offsets[i] = fbEncodePackage(builder, p)
}
schema.PackageContainerStartPackagesVector(builder, len(offsets))
for _, offset := range offsets {
builder.PrependUOffsetT(offset)
}
packageContainerVector := builder.EndVector(len(offsets))
schema.PackageContainerStart(builder)
schema.PackageContainerAddPackages(builder, packageContainerVector)
root := schema.PackageContainerEnd(builder)
builder.Finish(root)
return builder.FinishedBytes()
}

// fbEncodePackage encodes the given Package to a flatbuffer. The returned bytes
// are a pointer into the Builder's memory.
func fbEncodePackage(b *flatbuffers.Builder, p *Package) flatbuffers.UOffsetT {
if p == nil {
return 0
}

return fbWritePackage(b, p)
}

func fbWritePackage(b *flatbuffers.Builder, p *Package) flatbuffers.UOffsetT {
if p == nil {
return 0
}

var packageNameOffset,
packageVersionOffset,
packageReleaseOffset,
packageArchOffset,
packageLicenseOffset,
packageSummaryOffset,
packageURLOffset,
packageTypeOffset,
packageErrorOffset flatbuffers.UOffsetT

if p.Name != "" {
packageNameOffset = b.CreateString(p.Name)
}
if p.Version != "" {
packageVersionOffset = b.CreateString(p.Version)
}
if p.Release != "" {
packageReleaseOffset = b.CreateString(p.Release)
}
if p.Arch != "" {
packageArchOffset = b.CreateString(p.Arch)
}
if p.License != "" {
packageLicenseOffset = b.CreateString(p.License)
}
if p.Summary != "" {
packageSummaryOffset = b.CreateString(p.Summary)
}
if p.URL != "" {
packageURLOffset = b.CreateString(p.URL)
}
if p.Type != "" {
packageTypeOffset = b.CreateString(p.Type)
}
if p.error != nil {
packageErrorOffset = b.CreateString(p.error.Error())
}

schema.PackageStart(b)
schema.PackageAddInstalltime(b, uint64(p.InstallTime.Unix()))
schema.PackageAddSize(b, p.Size)

if packageNameOffset > 0 {
schema.PackageAddName(b, packageNameOffset)
}
if packageVersionOffset > 0 {
schema.PackageAddVersion(b, packageVersionOffset)
}
if packageReleaseOffset > 0 {
schema.PackageAddRelease(b, packageReleaseOffset)
}
if packageArchOffset > 0 {
schema.PackageAddArch(b, packageArchOffset)
}
if packageLicenseOffset > 0 {
schema.PackageAddLicense(b, packageLicenseOffset)
}
if packageSummaryOffset > 0 {
schema.PackageAddSummary(b, packageSummaryOffset)
}
if packageURLOffset > 0 {
schema.PackageAddUrl(b, packageURLOffset)
}
if packageTypeOffset > 0 {
schema.PackageAddType(b, packageTypeOffset)
}
if packageErrorOffset > 0 {
schema.PackageAddError(b, packageErrorOffset)
}

return schema.PackageEnd(b)
}

// decodePackagesFromContainer accepts a flatbuffer encoded byte slice, and decodes
// each package from the container vector with the help of fbDecodePackage.
// It returns an array of package objects.
func decodePackagesFromContainer(data []byte) ([]*Package, error) {
var packages []*Package
container := schema.GetRootAsPackageContainer(data, 0)
for i := 0; i < container.PackagesLength(); i++ {
sPkg := schema.Package{}
done := container.Packages(&sPkg, i)
if !done {
return nil, fmt.Errorf("failed to load package at container vector position: %d", i)
} else {
p := fbDecodePackage(&sPkg)
packages = append(packages, p)
}
}
return packages, nil
}

// fbDecodePackage decodes flatbuffer package data and copies it into a Package
// object that is returned.
func fbDecodePackage(p *schema.Package) *Package {
var err error
if string(p.Error()) != "" {
err = errors.New(string(p.Error()))
}

return &Package{
Name: string(p.Name()),
Version: string(p.Version()),
Release: string(p.Release()),
Arch: string(p.Arch()),
License: string(p.License()),
InstallTime: time.Unix(int64(p.Installtime()), 0).UTC(),
Size: p.Size(),
Summary: string(p.Summary()),
URL: string(p.Url()),
Type: string(p.Type()),
error: err,
}
}