Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions internal/packages/packages.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,39 @@ type DataStreamManifest struct {
} `config:"streams" json:"streams" yaml:"streams"`
}

// Transform contains information about a transform included in a package.
type Transform struct {
Name string
Path string
Definition TransformDefinition
}

// TransformDefinition is the definition of an Elasticsearch transform
type TransformDefinition struct {
Source struct {
Index []string `config:"index" yaml:"index"`
} `config:"source" yaml:"source"`
Meta struct {
FleetTransformVersion string `config:"fleet_transform_version" yaml:"fleet_transform_version"`
} `config:"_meta" yaml:"_meta"`
}

// HasSource checks if a given index or data stream name maches the transform sources
func (t *Transform) HasSource(name string) (bool, error) {
for _, indexPattern := range t.Definition.Source.Index {
// Using filepath.Match to match index patterns because the syntax
// is basically the same.
found, err := filepath.Match(indexPattern, name)
if err != nil {
return false, fmt.Errorf("maching pattern %q with %q: %w", indexPattern, name, err)
}
if found {
return true, nil
}
}
return false, nil
}

// MustFindPackageRoot finds and returns the path to the root folder of a package.
// It fails with an error if the package root can't be found.
func MustFindPackageRoot() (string, error) {
Expand Down Expand Up @@ -288,6 +321,36 @@ func ReadPackageManifest(path string) (*PackageManifest, error) {
return &m, nil
}

// ReadTransformsFromPackageRoot looks for transforms in the given package root.
func ReadTransformsFromPackageRoot(packageRoot string) ([]Transform, error) {
files, err := filepath.Glob(filepath.Join(packageRoot, "elasticsearch", "transform", "*", "transform.yml"))
if err != nil {
return nil, fmt.Errorf("failed matching files with transform definitions: %w", err)
}

var transforms []Transform
for _, file := range files {
cfg, err := yaml.NewConfigWithFile(file, ucfg.PathSep("."))
if err != nil {
return nil, fmt.Errorf("reading file failed (path: %s): %w", file, err)
}

var definition TransformDefinition
err = cfg.Unpack(&definition)
if err != nil {
return nil, fmt.Errorf("failed to parse transform file \"%s\": %w", file, err)
}

transforms = append(transforms, Transform{
Name: filepath.Base(filepath.Dir(file)),
Path: file,
Definition: definition,
})
}

return transforms, nil
}

func ReadPackageManifestBytes(contents []byte) (*PackageManifest, error) {
cfg, err := yaml.NewConfig(contents, ucfg.PathSep("."))
if err != nil {
Expand Down
122 changes: 122 additions & 0 deletions internal/testrunner/runners/system/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,11 @@ func (r *runner) runTest(config *testConfig, ctxt servicedeployer.ServiceContext
result.FailureMsg = message
}

// Check transforms if present
if err := r.checkTransforms(config, pkgManifest, ds, dataStream); err != nil {
return result.WithError(err)
}
Comment on lines +731 to +733
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably it's not needed. I was wondering if at the end of the system test execution we should delete the indices created by the transforms.
Something similar to what it's done here for the main data stream

r.wipeDataStreamHandler = func() error {
logger.Debugf("deleting data in data stream...")
if err := deleteDataStreamDocs(r.options.API, dataStream); err != nil {
return fmt.Errorf("error deleting data in data stream: %w", err)
}
return nil
}

Or maybe it's better to leave that management to Fleet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it wouldn't be needed because the source data is already deleted. The transform should be deleted by Fleet, but I think it is not doing it.


return result.WithSuccess()
}

Expand Down Expand Up @@ -1005,6 +1010,123 @@ func selectPolicyTemplateByName(policies []packages.PolicyTemplate, name string)
return packages.PolicyTemplate{}, fmt.Errorf("policy template %q not found", name)
}

func (r *runner) checkTransforms(config *testConfig, pkgManifest *packages.PackageManifest, ds kibana.PackageDataStream, dataStream string) error {
transforms, err := packages.ReadTransformsFromPackageRoot(r.options.PackageRootPath)
if err != nil {
return fmt.Errorf("loading transforms for package failed (root: %s): %w", r.options.PackageRootPath, err)
}
for _, transform := range transforms {
hasSource, err := transform.HasSource(dataStream)
if err != nil {
return fmt.Errorf("failed to check if transform %q has %s as source: %w", transform.Name, dataStream, err)
}
if !hasSource {
logger.Debugf("transform %q does not match %q as source (sources: %s)", transform.Name, dataStream, transform.Definition.Source.Index)
continue
}

logger.Debugf("checking transform %q", transform.Name)

// IDs format is: "<type>-<package>.<transform>-<namespace>-<version>"
// For instance: "logs-ti_anomali.latest_ioc-default-0.1.0"
transformPattern := fmt.Sprintf("%s-%s.%s-*-%s",
ds.Inputs[0].Streams[0].DataStream.Type,
pkgManifest.Name,
transform.Name,
transform.Definition.Meta.FleetTransformVersion,
)
transformId, err := r.getTransformId(transformPattern)
if err != nil {
return fmt.Errorf("failed to determine transform ID: %w", err)
}

// Using the preview instead of checking the actual index because
// transforms with retention policies may be deleting the documents based
// on old fixtures as soon as they are indexed.
transformDocs, err := r.previewTransform(transformId)
if err != nil {
return fmt.Errorf("failed to preview transform %q: %w", transformId, err)
}
if len(transformDocs) == 0 {
return fmt.Errorf("no documents found in preview for transform %q", transformId)
}

transformRootPath := filepath.Dir(transform.Path)
fieldsValidator, err := fields.CreateValidatorForDirectory(transformRootPath,
fields.WithSpecVersion(pkgManifest.SpecVersion),
fields.WithNumericKeywordFields(config.NumericKeywordFields),
fields.WithEnabledImportAllECSSChema(true),
)
if err != nil {
return fmt.Errorf("creating fields validator for data stream failed (path: %s): %w", transformRootPath, err)
}
if err := validateFields(transformDocs, fieldsValidator, dataStream); err != nil {
return err
}
}

return nil
}

func (r *runner) getTransformId(transformPattern string) (string, error) {
resp, err := r.options.API.TransformGetTransform(
r.options.API.TransformGetTransform.WithTransformID(transformPattern),
)
if err != nil {
return "", err
}
defer resp.Body.Close()

if resp.IsError() {
return "", fmt.Errorf("failed to get transforms: %s", resp.String())
}

var transforms struct {
Transforms []struct {
ID string `json:"id"`
} `json:"transforms"`
}

err = json.NewDecoder(resp.Body).Decode(&transforms)
switch {
case err != nil:
return "", fmt.Errorf("failed to decode response: %w", err)
case len(transforms.Transforms) == 0:
return "", fmt.Errorf("no transform found with pattern %q", transformPattern)
case len(transforms.Transforms) > 1:
return "", fmt.Errorf("multiple transforms (%d) found with pattern %q", len(transforms.Transforms), transformPattern)
}
id := transforms.Transforms[0].ID
if id == "" {
return "", fmt.Errorf("empty ID found with pattern %q", transformPattern)
}
return id, nil
}

func (r *runner) previewTransform(transformId string) ([]common.MapStr, error) {
resp, err := r.options.API.TransformPreviewTransform(
r.options.API.TransformPreviewTransform.WithTransformID(transformId),
)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.IsError() {
return nil, fmt.Errorf("failed to preview transform %q: %s", transformId, resp.String())
}

var preview struct {
Documents []common.MapStr `json:"preview"`
}
err = json.NewDecoder(resp.Body).Decode(&preview)
if err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}

return preview.Documents, nil
}

func deleteDataStreamDocs(api *elasticsearch.API, dataStream string) error {
body := strings.NewReader(`{ "query": { "match_all": {} } }`)
_, err := api.DeleteByQuery([]string{dataStream}, body)
Expand Down
3 changes: 3 additions & 0 deletions test/packages/parallel/ti_anomali/_dev/build/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
dependencies:
ecs:
reference: git@8.7
33 changes: 33 additions & 0 deletions test/packages/parallel/ti_anomali/_dev/build/docs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Anomali Integration

The Anomali integration supports the following datasets.

- `threatstream` dataset: Support for [Anomali ThreatStream](https://www.anomali.com/products/threatstream), a commercial Threat Intelligence service.

## Logs

### Anomali Threatstream

This integration requires additional software, the _Elastic_ _Extension,_
to connect the Anomali ThreatStream with this integration. It's available
at the [ThreatStream download page.](https://ui.threatstream.com/downloads)

Please refer to the documentation included with the Extension for a detailed
explanation on how to configure the Anomali ThreatStream to send indicator
to this integration.

### Expiration of Indicators of Compromise (IOCs)
The ingested IOCs expire after certain duration. An [Elastic Transform](https://www.elastic.co/guide/en/elasticsearch/reference/current/transforms.html) is created to faciliate only active IOCs be available to the end users. This transform creates a destination index named `logs-ti_anomali_latest.threatstream` which only contains active and unexpired IOCs. When setting up indicator match rules, use this latest destination index to avoid false positives from expired IOCs. Please read [ILM Policy](#ilm-policy) below which is added to avoid unbounded growth on source `.ds-logs-ti_anomali.threatstream-*` indices.

#### Handling Orphaned IOCs
When an IOC expires, Anomali feed contains information about all IOCs that got `deleted`. However, some Anomali IOCs may never expire and will continue to stay in the latest destination index `logs-ti_anomali_latest.threatstream`. To avoid any false positives from such orphaned IOCs, users are allowed to configure `IOC Expiration Duration` parameter while setting up the integration. This parameter deletes all data inside the destination index `logs-ti_anomali_latest.threatstream` after this specified duration is reached. Users must pull entire feed instead of incremental feed when this expiration happens so that the IOCs get reset.

**NOTE:** `IOC Expiration Duration` parameter does not override the expiration provided by the Anomali for their IOCs. So, if Anomali IOC is expired and subsequently such `deleted` IOCs are sent into the feed, they are deleted immediately. `IOC Expiration Duration` parameter only exists to add a fail-safe default expiration in case Anomali IOCs never expire.

### ILM Policy
To facilitate IOC expiration, source datastream-backed indices `.ds-logs-ti_anomali.threat-*` are allowed to contain duplicates from each polling interval. ILM policy is added to these source indices so it doesn't lead to unbounded growth. This means data in these source indices will be deleted after `5 days` from ingested date.


{{event "threatstream"}}

{{fields "threatstream"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
version: '2.3'
services:
limo-http:
image: docker.elastic.co/observability/stream:v0.6.1
ports:
- 8080
volumes:
- ./files:/files:ro
environment:
PORT: 8080
command:
- http-server
- --addr=:8080
- --config=/files/config.yml
threatstream-webhook-http:
image: docker.elastic.co/observability/stream:v0.6.1
volumes:
- ./sample_logs:/sample_logs:ro
environment:
- STREAM_PROTOCOL=webhook
- STREAM_ADDR=http://elastic-agent:9080/
command: log --webhook-content-type application/x-ndjson --start-signal=SIGHUP --delay=5s /sample_logs/test-threatstream-ndjson.log
threatstream-webhook-https:
image: docker.elastic.co/observability/stream:v0.6.1
volumes:
- ./sample_logs:/sample_logs:ro
environment:
- STREAM_PROTOCOL=webhook
- STREAM_INSECURE=true
- STREAM_ADDR=https://elastic-agent:7443/
command: log --webhook-content-type application/x-ndjson --start-signal=SIGHUP --delay=5s /sample_logs/test-threatstream-ndjson.log
threatstream-integrator-test:
image: docker.io/adrisr/filebeat-anomali-integrator-test:latest
volumes:
- ./files:/files:ro
command:
- /files/test-intel.ndjson
Loading