Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 189 additions & 0 deletions docs/resources/elasticsearch_ml_datafeed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@

---
# generated by https://github.com/hashicorp/terraform-plugin-docs
page_title: "elasticstack_elasticsearch_ml_datafeed Resource - terraform-provider-elasticstack"
subcategory: "Ml"
description: |-
Creates and manages Machine Learning datafeeds. Datafeeds retrieve data from Elasticsearch for analysis by an anomaly detection job. Each anomaly detection job can have only one associated datafeed. See the ML Datafeed API documentation https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-put-datafeed.html for more details.
---

# elasticstack_elasticsearch_ml_datafeed (Resource)

Creates and manages Machine Learning datafeeds. Datafeeds retrieve data from Elasticsearch for analysis by an anomaly detection job. Each anomaly detection job can have only one associated datafeed. See the [ML Datafeed API documentation](https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-put-datafeed.html) for more details.

## Example Usage

```terraform
# Basic ML Datafeed
resource "elasticstack_elasticsearch_ml_datafeed" "basic" {
datafeed_id = "my-basic-datafeed"
job_id = elasticstack_elasticsearch_ml_anomaly_detector.example.job_id
indices = ["log-data-*"]

query = jsonencode({
match_all = {}
})
}

# Comprehensive ML Datafeed with all options
resource "elasticstack_elasticsearch_ml_datafeed" "comprehensive" {
datafeed_id = "my-comprehensive-datafeed"
job_id = elasticstack_elasticsearch_ml_anomaly_detector.example.job_id
indices = ["app-logs-*", "system-logs-*"]

query = jsonencode({
bool = {
must = [
{
range = {
"@timestamp" = {
gte = "now-1h"
}
}
},
{
term = {
"status" = "error"
}
}
]
}
})

scroll_size = 1000
frequency = "30s"
query_delay = "60s"
max_empty_searches = 10

chunking_config {
mode = "manual"
time_span = "30m"
}

delayed_data_check_config {
enabled = true
check_window = "2h"
}

indices_options {
ignore_unavailable = true
allow_no_indices = false
expand_wildcards = ["open", "closed"]
}

runtime_mappings = jsonencode({
"hour_of_day" = {
"type" = "long"
"script" = {
"source" = "emit(doc['@timestamp'].value.getHour())"
}
}
})

script_fields = jsonencode({
"my_script_field" = {
"script" = {
"source" = "_score * doc['my_field'].value"
}
}
})
}

# Required ML Job for the datafeed
resource "elasticstack_elasticsearch_ml_anomaly_detector" "example" {
job_id = "example-anomaly-job"
description = "Example anomaly detection job"

analysis_config {
bucket_span = "15m"
detectors {
function = "count"
}
}

data_description {
time_field = "@timestamp"
time_format = "epoch_ms"
}
}
```

<!-- schema generated by tfplugindocs -->
## Schema

### Required

- `datafeed_id` (String) A numerical character string that uniquely identifies the datafeed. This identifier can contain lowercase alphanumeric characters (a-z and 0-9), hyphens, and underscores. It must start and end with alphanumeric characters.
- `indices` (List of String) An array of index names. Wildcards are supported. If any of the indices are in remote clusters, the machine learning nodes must have the `remote_cluster_client` role.
- `job_id` (String) Identifier for the anomaly detection job. The job must exist before creating the datafeed.

### Optional

- `aggregations` (String) If set, the datafeed performs aggregation searches. Support for aggregations is limited and should be used only with low cardinality data. This should be a JSON object representing the aggregations to be performed.
- `chunking_config` (Attributes) Datafeeds might search over long time periods, for several months or years. This search is split into time chunks in order to ensure the load on Elasticsearch is managed. Chunking configuration controls how the size of these time chunks are calculated; it is an advanced configuration option. (see [below for nested schema](#nestedatt--chunking_config))
- `delayed_data_check_config` (Attributes) Specifies whether the datafeed checks for missing data and the size of the window. The datafeed can optionally search over indices that have already been read in an effort to determine whether any data has subsequently been added to the index. If missing data is found, it is a good indication that the `query_delay` is set too low and the data is being indexed after the datafeed has passed that moment in time. This check runs only on real-time datafeeds. (see [below for nested schema](#nestedatt--delayed_data_check_config))
- `elasticsearch_connection` (Block List, Deprecated) Elasticsearch connection configuration block. (see [below for nested schema](#nestedblock--elasticsearch_connection))
- `frequency` (String) The interval at which scheduled queries are made while the datafeed runs in real time. The default value is either the bucket span for short bucket spans, or, for longer bucket spans, a sensible fraction of the bucket span. When `frequency` is shorter than the bucket span, interim results for the last (partial) bucket are written then eventually overwritten by the full bucket results. If the datafeed uses aggregations, this value must be divisible by the interval of the date histogram aggregation.
- `indices_options` (Attributes) Specifies index expansion options that are used during search. (see [below for nested schema](#nestedatt--indices_options))
- `max_empty_searches` (Number) If a real-time datafeed has never seen any data (including during any initial training period), it automatically stops and closes the associated job after this many real-time searches return no documents. In other words, it stops after `frequency` times `max_empty_searches` of real-time operation. If not set, a datafeed with no end time that sees no data remains started until it is explicitly stopped.
- `query` (String) The Elasticsearch query domain-specific language (DSL). This value corresponds to the query object in an Elasticsearch search POST body. All the options that are supported by Elasticsearch can be used, as this object is passed verbatim to Elasticsearch. By default uses `{"match_all": {"boost": 1}}`.
- `query_delay` (String) The number of seconds behind real time that data is queried. For example, if data from 10:04 a.m. might not be searchable in Elasticsearch until 10:06 a.m., set this property to 120 seconds. The default value is randomly selected between `60s` and `120s`. This randomness improves the query performance when there are multiple jobs running on the same node.
- `runtime_mappings` (String) Specifies runtime fields for the datafeed search. This should be a JSON object representing the runtime field mappings.
- `script_fields` (String) Specifies scripts that evaluate custom expressions and returns script fields to the datafeed. The detector configuration objects in a job can contain functions that use these script fields. This should be a JSON object representing the script fields.
- `scroll_size` (Number) The size parameter that is used in Elasticsearch searches when the datafeed does not use aggregations. The maximum value is the value of `index.max_result_window`, which is 10,000 by default.

### Read-Only

- `id` (String) Internal identifier of the resource

<a id="nestedatt--chunking_config"></a>
### Nested Schema for `chunking_config`

Required:

- `mode` (String) The chunking mode. Can be `auto`, `manual`, or `off`. In `auto` mode, the chunk size is dynamically calculated. In `manual` mode, chunking is applied according to the specified `time_span`. In `off` mode, no chunking is applied.

Optional:

- `time_span` (String) The time span for each chunk. Only applicable and required when mode is `manual`. Must be a valid duration.


<a id="nestedatt--delayed_data_check_config"></a>
### Nested Schema for `delayed_data_check_config`

Optional:

- `check_window` (String) The window of time that is searched for late data. This window of time ends with the latest finalized bucket. It defaults to null, which causes an appropriate `check_window` to be calculated when the real-time datafeed runs.
- `enabled` (Boolean) Specifies whether the datafeed periodically checks for delayed data.


<a id="nestedblock--elasticsearch_connection"></a>
### Nested Schema for `elasticsearch_connection`

Optional:

- `api_key` (String, Sensitive) API Key to use for authentication to Elasticsearch
- `bearer_token` (String, Sensitive) Bearer Token to use for authentication to Elasticsearch
- `ca_data` (String) PEM-encoded custom Certificate Authority certificate
- `ca_file` (String) Path to a custom Certificate Authority certificate
- `cert_data` (String) PEM encoded certificate for client auth
- `cert_file` (String) Path to a file containing the PEM encoded certificate for client auth
- `endpoints` (List of String, Sensitive) A list of endpoints where the terraform provider will point to, this must include the http(s) schema and port number.
- `es_client_authentication` (String, Sensitive) ES Client Authentication field to be used with the JWT token
- `headers` (Map of String, Sensitive) A list of headers to be sent with each request to Elasticsearch.
- `insecure` (Boolean) Disable TLS certificate validation
- `key_data` (String, Sensitive) PEM encoded private key for client auth
- `key_file` (String) Path to a file containing the PEM encoded private key for client auth
- `password` (String, Sensitive) Password to use for API authentication to Elasticsearch.
- `username` (String) Username to use for API authentication to Elasticsearch.


<a id="nestedatt--indices_options"></a>
### Nested Schema for `indices_options`

Optional:

- `allow_no_indices` (Boolean) If true, wildcard indices expressions that resolve into no concrete indices are ignored. This includes the `_all` string or when no indices are specified.
- `expand_wildcards` (List of String) Type of index that wildcard patterns can match. If the request can target data streams, this argument determines whether wildcard expressions match hidden data streams. Supports comma-separated values.
- `ignore_throttled` (Boolean, Deprecated) If true, concrete, expanded, or aliased indices are ignored when frozen. This setting is deprecated.
- `ignore_unavailable` (Boolean) If true, unavailable indices (missing or closed) are ignored.
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Basic ML Datafeed
resource "elasticstack_elasticsearch_ml_datafeed" "basic" {
datafeed_id = "my-basic-datafeed"
job_id = elasticstack_elasticsearch_ml_anomaly_detector.example.job_id
indices = ["log-data-*"]

query = jsonencode({
match_all = {}
})
}

# Comprehensive ML Datafeed with all options
resource "elasticstack_elasticsearch_ml_datafeed" "comprehensive" {
datafeed_id = "my-comprehensive-datafeed"
job_id = elasticstack_elasticsearch_ml_anomaly_detector.example.job_id
indices = ["app-logs-*", "system-logs-*"]

query = jsonencode({
bool = {
must = [
{
range = {
"@timestamp" = {
gte = "now-1h"
}
}
},
{
term = {
"status" = "error"
}
}
]
}
})

scroll_size = 1000
frequency = "30s"
query_delay = "60s"
max_empty_searches = 10

chunking_config {
mode = "manual"
time_span = "30m"
}

delayed_data_check_config {
enabled = true
check_window = "2h"
}

indices_options {
ignore_unavailable = true
allow_no_indices = false
expand_wildcards = ["open", "closed"]
}

runtime_mappings = jsonencode({
"hour_of_day" = {
"type" = "long"
"script" = {
"source" = "emit(doc['@timestamp'].value.getHour())"
}
}
})

script_fields = jsonencode({
"my_script_field" = {
"script" = {
"source" = "_score * doc['my_field'].value"
}
}
})
}

# Required ML Job for the datafeed
resource "elasticstack_elasticsearch_ml_anomaly_detector" "example" {
job_id = "example-anomaly-job"
description = "Example anomaly detection job"

analysis_config {
bucket_span = "15m"
detectors {
function = "count"
}
}

data_description {
time_field = "@timestamp"
time_format = "epoch_ms"
}
}
38 changes: 38 additions & 0 deletions internal/asyncutils/state_waiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package asyncutils

import (
"context"
"fmt"
"time"

"github.com/hashicorp/terraform-plugin-log/tflog"
)

// StateChecker is a function that checks if a resource is in the desired state.
// It should return true if the resource is in the desired state, false otherwise, and any error that occurred during the check.
type StateChecker func(ctx context.Context) (isDesiredState bool, err error)

// WaitForStateTransition waits for a resource to reach the desired state by polling its current state.
// It uses exponential backoff with a maximum interval to avoid overwhelming the API.
func WaitForStateTransition(ctx context.Context, resourceType, resourceId string, stateChecker StateChecker) error {
const pollInterval = 2 * time.Second
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
isInDesiredState, err := stateChecker(ctx)
if err != nil {
return fmt.Errorf("failed to check state during wait: %w", err)
}
if isInDesiredState {
return nil
}

tflog.Debug(ctx, fmt.Sprintf("Waiting for %s %s to reach desired state...", resourceType, resourceId))
}
}
}
Loading
Loading