Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[auditbeat] - Migration of system/package module to flatbuffers #34817

Merged
merged 19 commits into from Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
dcf6e76
initial working commit
ShourieG Mar 10, 2023
870305f
flatbuffers migration now working with tests
ShourieG Mar 13, 2023
cc8592b
updated changelog
ShourieG Mar 13, 2023
c38ae22
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Mar 13, 2023
1cffa01
removed comments
ShourieG Mar 13, 2023
aaf3fc2
resolved PR suggetions and updated license
ShourieG Mar 14, 2023
5d0f6e4
updated notice
ShourieG Mar 14, 2023
4ac7ef8
updated tests, added benchmarks, renamed files
ShourieG Mar 15, 2023
2d55de8
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Mar 20, 2023
7525656
updated tests & added error storage in schema
ShourieG Mar 20, 2023
7166fb3
license issue fixed
ShourieG Mar 20, 2023
4d7bee9
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Mar 23, 2023
d6f553a
updated according to PR suggestions
ShourieG Mar 23, 2023
2aa8112
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Mar 28, 2023
892e37e
refactored package migration based on bucket version and is now done …
ShourieG Mar 29, 2023
15e1fd1
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Mar 29, 2023
f8369d9
Merge remote-tracking branch 'elastic/main' into auditbeat/flat_buffers
andrewkroh Mar 31, 2023
5096686
Perform migration inside of a bolt transaction, fix linter warnings
andrewkroh Mar 31, 2023
c7feb02
Merge remote-tracking branch 'upstream/main' into auditbeat/flat_buffers
ShourieG Apr 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Expand Up @@ -234,7 +234,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Add unix socket log parsing for nginx ingress_controller {pull}34732[34732]

*Auditbeat*

- Migration of system/package module storage from gob encoding to flatbuffer encoding in bolt db. {pull}34817[34817]

*Filebeat*

Expand Down
191 changes: 191 additions & 0 deletions x-pack/auditbeat/module/system/package/faltbuffers.go
@@ -0,0 +1,191 @@
// Licensed to Elasticsearch B.V. under one or more contributor
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

ShourieG marked this conversation as resolved.
Show resolved Hide resolved
package pkg

import (
"sync"
"time"

"github.com/elastic/beats/v7/x-pack/auditbeat/module/system/package/schema"
"github.com/elastic/elastic-agent-libs/logp"
flatbuffers "github.com/google/flatbuffers/go"
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
)

// Requires the Google flatbuffer compiler and Elastic go-licenser.
//go:generate flatc --go schema.fbs
//go:generate go-licenser schema

var bufferPool sync.Pool

func init() {
bufferPool.New = func() interface{} {
return flatbuffers.NewBuilder(1024)
}
}

// fbGetBuilder returns a Builder that can be used for encoding data. The builder
// should be released by invoking the release function after the encoded bytes
// are no longer in used (i.e. a copy of b.FinishedBytes() has been made).
func fbGetBuilder() (b *flatbuffers.Builder, release func()) {
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
b = bufferPool.Get().(*flatbuffers.Builder)
b.Reset()
return b, func() { bufferPool.Put(b) }
}

// encodePackages, encodes an array of packages by creating a vector of packages and tracking offsets. It uses the
// func fbEncodePackage to encode individual packages, and returns a []byte containing the encoded data
func encodePackages(builder *flatbuffers.Builder, packages []*Package) []byte {
offsets := make([]flatbuffers.UOffsetT, len(packages))

for i, p := range packages {
offsets[i] = fbEncodePackage(builder, p)
}
schema.PackageContainerStartPackagesVector(builder, len(offsets))
for _, offset := range offsets {
builder.PrependUOffsetT(offset)
}
packageContainerVector := builder.EndVector(len(offsets))
schema.PackageContainerStart(builder)
schema.PackageContainerAddPackages(builder, packageContainerVector)
root := schema.PackageContainerEnd(builder)
builder.Finish(root)
return builder.Bytes[builder.Head():]
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
}

// fbEncodePackage encodes the given Package to a flatbuffer. The returned bytes
// are a pointer into the Builder's memory.
func fbEncodePackage(b *flatbuffers.Builder, p *Package) flatbuffers.UOffsetT {
if p == nil {
return 0
}

offset := fbWritePackage(b, p)
// b.Finish(offset)
// return b.FinishedBytes()
return offset
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
}

func fbWritePackage(b *flatbuffers.Builder, p *Package) flatbuffers.UOffsetT {
if p == nil {
return 0
}

var packageNameOffset flatbuffers.UOffsetT
var packageVersionOffset flatbuffers.UOffsetT
var packageReleaseOffset flatbuffers.UOffsetT
var packageArchOffset flatbuffers.UOffsetT
var packageLicenseOffset flatbuffers.UOffsetT
var packageSummaryOffset flatbuffers.UOffsetT
var packageURLOffset flatbuffers.UOffsetT
var packageTypeOffset flatbuffers.UOffsetT
ShourieG marked this conversation as resolved.
Show resolved Hide resolved

if p.Name != "" {
packageNameOffset = b.CreateString(p.Name)
}
if p.Version != "" {
packageVersionOffset = b.CreateString(p.Version)
}
if p.Release != "" {
packageReleaseOffset = b.CreateString(p.Release)
}
if p.Arch != "" {
packageArchOffset = b.CreateString(p.Arch)
}
if p.License != "" {
packageLicenseOffset = b.CreateString(p.License)
}
if p.Summary != "" {
packageSummaryOffset = b.CreateString(p.Summary)
}
if p.URL != "" {
packageURLOffset = b.CreateString(p.URL)
}
if p.Type != "" {
packageTypeOffset = b.CreateString(p.Type)
}

schema.PackageStart(b)
schema.PackageAddInstalltime(b, uint64(p.InstallTime.Unix()))
schema.PackageAddSize(b, p.Size)

if packageNameOffset > 0 {
schema.PackageAddName(b, packageNameOffset)
}
if packageVersionOffset > 0 {
schema.PackageAddVersion(b, packageVersionOffset)
}
if packageReleaseOffset > 0 {
schema.PackageAddRelease(b, packageReleaseOffset)
}
if packageArchOffset > 0 {
schema.PackageAddArch(b, packageArchOffset)
}
if packageLicenseOffset > 0 {
schema.PackageAddLicense(b, packageLicenseOffset)
}
if packageSummaryOffset > 0 {
schema.PackageAddSummary(b, packageSummaryOffset)
}
if packageURLOffset > 0 {
schema.PackageAddUrl(b, packageURLOffset)
}
if packageTypeOffset > 0 {
schema.PackageAddType(b, packageTypeOffset)
}

return schema.PackageEnd(b)
}

// decodePackagesFromContainer, accepts a flatbuffer encoded byte slice, and decodes
// each package from the container vector with the help of he func fbDecodePackage.
// It returns an array of package objects.
func decodePackagesFromContainer(data []byte, log *logp.Logger) (packages []*Package) {
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
container := schema.GetRootAsPackageContainer(data, 0)
for i := 0; i < container.PackagesLength(); i++ {
sPkg := schema.Package{}
done := container.Packages(&sPkg, i)
// query: if a single package fails to load, should we abandon the entire loading proces ?
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
if !done && log != nil {
log.Warnf("Failed to load package at container vector position: %d", i)
} else {
p := fbDecodePackage(&sPkg)
packages = append(packages, p)
}
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
}
return packages
}

// fbDecodePackage decodes flatbuffer package data and copies it into a Package
// object that is returned.
func fbDecodePackage(p *schema.Package) *Package {

rtnPkg := &Package{
Name: string(p.Name()),
Version: string(p.Version()),
Release: string(p.Release()),
Arch: string(p.Arch()),
License: string(p.License()),
InstallTime: time.Unix(int64(p.Installtime()), 0).UTC(),
Size: p.Size(),
Summary: string(p.Summary()),
URL: string(p.Url()),
Type: string(p.Type()),
}

return rtnPkg
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
}
43 changes: 43 additions & 0 deletions x-pack/auditbeat/module/system/package/flatbuffers_test.go
@@ -0,0 +1,43 @@
package pkg
ShourieG marked this conversation as resolved.
Show resolved Hide resolved

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func testPackage() []*Package {
return []*Package{
&Package{
Name: "foo",
Version: "1.2.3",
Release: "1",
Arch: "amd64",
License: "bar",
InstallTime: time.Unix(1591021924, 0).UTC(),
Size: 1234,
Summary: "Foo stuff",
URL: "http://foo.example.com",
Type: "rpm",
},
}
}

func TestFBEncodeDecode(t *testing.T) {
p := testPackage()
builder, release := fbGetBuilder()
defer release()
data := encodePackages(builder, p)
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
t.Log("encoded length:", len(data))

out := decodePackagesFromContainer(data, nil)
if out == nil {
t.Fatal("decode returned nil")
}

assert.Equal(t, len(p), len(out))
for i := 0; i < len(p); i++ {
assert.Equal(t, p[i], out[i])
}
}
20 changes: 20 additions & 0 deletions x-pack/auditbeat/module/system/package/package.fbs
@@ -0,0 +1,20 @@

namespace schema;

table Package {
name:string;
version:string;
release:string;
arch:string;
license:string;
installtime:ulong;
size:ulong;
summary:string;
url:string;
type:string;
}

table PackageContainer {
packages: [Package];
}
root_type PackageContainer;
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
53 changes: 36 additions & 17 deletions x-pack/auditbeat/module/system/package/package.go
Expand Up @@ -41,6 +41,7 @@ const (

bucketName = "package.v1"
bucketKeyPackages = "packages"
bucketKeyPackagesFb = "packages_fb"
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
bucketKeyStateTimestamp = "state_timestamp"

eventTypeState = "state"
Expand Down Expand Up @@ -109,6 +110,7 @@ type MetricSet struct {
lastState time.Time

suppressNoPackageWarnings bool
arePackagesGobEncoded bool
}

// Package represents information for a package.
Expand Down Expand Up @@ -425,22 +427,25 @@ func convertToCacheable(packages []*Package) []cache.Cacheable {

// restorePackagesFromDisk loads the packages from disk.
func (ms *MetricSet) restorePackagesFromDisk() (packages []*Package, err error) {
var decoder *gob.Decoder
var gobDecoder *gob.Decoder
var buf *bytes.Buffer
err = ms.bucket.Load(bucketKeyPackages, func(blob []byte) error {
if len(blob) > 0 {
buf := bytes.NewBuffer(blob)
decoder = gob.NewDecoder(buf)
buf = bytes.NewBuffer(blob)
gobDecoder = gob.NewDecoder(buf)
ms.arePackagesGobEncoded = true
}
return nil
})
if err != nil {
return nil, err
}

if decoder != nil {
// if existing packages are gob encoded
if gobDecoder != nil {
for {
pkg := new(Package)
err = decoder.Decode(pkg)
err = gobDecoder.Decode(pkg)
if err == nil {
packages = append(packages, pkg)
} else if err == io.EOF {
Expand All @@ -450,27 +455,41 @@ func (ms *MetricSet) restorePackagesFromDisk() (packages []*Package, err error)
return nil, fmt.Errorf("error decoding packages: %w", err)
}
}
} else {
var data []byte
err = ms.bucket.Load(bucketKeyPackagesFb, func(blob []byte) error {
if len(blob) > 0 {
data = blob
}
return nil
})
if err != nil {
return nil, err
}

// if existing packages are stored as flatbuffers
if len(data) > 0 {
packages = decodePackagesFromContainer(data, ms.log)
}
}

return packages, nil
}

// Save packages to disk.
func (ms *MetricSet) savePackagesToDisk(packages []*Package) error {
var buf bytes.Buffer
encoder := gob.NewEncoder(&buf)

for _, pkg := range packages {
err := encoder.Encode(*pkg)
if err != nil {
return fmt.Errorf("error encoding packages: %w", err)
builder, release := fbGetBuilder()
defer release()
data := encodePackages(builder, packages)
if err := ms.bucket.Store(bucketKeyPackagesFb, data); err != nil {
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("error writing packages as flatbuffers to disk: %w", err)
}
// remove old storage key
if ms.arePackagesGobEncoded {
if err := ms.bucket.Delete(bucketKeyPackages); err != nil {
return fmt.Errorf("error removing old package key from disk: %w", err)
}
}

err := ms.bucket.Store(bucketKeyPackages, buf.Bytes())
if err != nil {
return fmt.Errorf("error writing packages to disk: %w", err)
}
return nil
}

Expand Down