Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ingest] Enrich documents prior to indexing #32789

Closed
45 of 55 tasks
jakelandis opened this issue Aug 10, 2018 · 6 comments · Fixed by #48039
Closed
45 of 55 tasks

[ingest] Enrich documents prior to indexing #32789

jakelandis opened this issue Aug 10, 2018 · 6 comments · Fixed by #48039
Assignees
Labels
:Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP >enhancement Meta

Comments

@jakelandis
Copy link
Contributor

jakelandis commented Aug 10, 2018

Enrichment at ingest

This issue describes a project that will leverage the ingest node to allow for enrichment of documents before they are indexed.

Below is a diagram that highlights the workflow. The red parts are new components.

image

  • .enrich-* - index(es) managed managed by Elasticsearch that contains a highly optimized subset of the source data used for enrichment.
  • source index - a normal index managed externally (e.g. not by Elasticsearch) that contains the data used for enrichment
  • enrich policy - a policy that describes how to synchronize the source index with the .enrich-* index. The policy will describe which fields to copy and how often to copy the fields.
  • decorate processor - an ingest node processor that reads from a .enrich-* index to mutate the raw data before it is indexed. The .enrich-* will be data local to the decorate processor.

There are many moving parts so this issue will serve as a central place to track them.

Tasks

Enrich policy definition

{
  "exact_match": {
	"match_field": "prsnl.id",
	"enrich_fields": [
  	  "prsnl.name.first",
  	  "prsnl.name.last"
	],
	"indices": [
  	  "bar*",
  	  "foo"
	],
	"query": {}
  }
}

instead of:

{
    "type": "exact_match",
    "indices": [
   	 "bar*",
   	 "foo"
    ],
    "match_field": "prsnl.id",
    "enrich_fields": [
   	 "prsnl.name.first",
   	 "prsnl.name.last"
    ],
    "query": {
    }
}

Enrich processor

  • Write rally track for exact match processor.
  • Add an enrich processor that uses the search api via node client in order to do the enrichment.
  • Optimize they way msearch is executed for enrich processor lookups. Enrich indices always have a single shard, which allows us the easily optimize the execution of multiple search requests bundled together in a bulk. Added a custom api to perform the msearch more efficiently for enrich processor #43965
  • Ensure that EnrichProcessorFactory always has access to the latest enrich policies.
    (Currently if multiple CS updates are combined then enrich policy changes may not be visible)
  • Allows IngestService to register components that are updated before the processor factories.
  • Register EnrichProcessorFactory as component that keeps track of the policies.
  • Rename the enrich_key option to field in enrich processor configuration. Enrich processor configuration changes #45466
  • Remove set_from and targets options and introduce target_field option that is inline with what geoip processor is doing. The entire looked up document is placed as json object under the target_field. Enrich processor configuration changes #45466
  • Change the enrich processor to not depend on the actual EnrichPolicy instance. Just on the policy name. From the policy name, the enrich index alias can be resolved and from the the currently active enrich index. The enrich index should have the match_field of policy in the meta mapping stored, this is the only piece of information required to do the enrichment at ingest time. Decouple enrich processor factory from enrich policy #45826
  • Add overwrite parameter to enrich processor. Add support for overwrite parameter in the enrich processor. #45029
  • Add template support to field and target_field parameters.
  • Include match count into document being enriched to see whether there were no matches or multiple matches.
  • Add a LRU cache that is only used when enrich processor needs to make a remote call to do the lookup.
  • Add support for match policy type.
  • Add support for geo_share_match_policy type. Add support for geo_shape_match enrich policy type #42639
  • Add support for ip_range_match policy type.
  • Explore warming the LRU cache based on entries from the previous enrich index.

Policy management

  • Think about bwc around enrich policy types.
    (add created version to EnrichPolicy?) (@jbaiera) Add the cluster version to enrich policies #45021
  • Execute force merge when running policy. (@jbaiera) Add force merge step to Enrich Policy execution #41969
  • Introduce background process that removes enrich indices that are not referenced by an alias or no policy exists for an enrich index. The background process should mark indices for deletion first, and remove them in the next execution (To avoid deleting indices that have been freshly retired from the enrich alias and still potentially in use). Also the background process should not delete any indices that are tied to policies currently being executed - We don't want to throw out new indices that are currently being populated by a policy execution. (@jbaiera) Add Enrich index background task to cleanup old indices #43746
  • Add validation that enrich key fields / enrich values
    field are not inside an array of objects (nested). (@jbaiera) Enrich validate nested mappings #42452
  • De-normalize nested data inside source index when executing policy.
  • Stats (in memory)
  • Error Handling
  • Add description to .enrich index as _meta mapping to indicate that this index is managed by ES and shouldn't be modified in any way. (@jbaiera)
  • Always drop the _id and _routing field from documents originating from source indices. This to ensure the uniqueness of documents. (@jbaiera)
  • Overwrite specific index settings on enrich index: disable field data, global ordinals loading, shard allocation filtering, automatic refresh.
  • Should force merge as part of policy execution results in more than one segment retry the force merge or fail the execute policy request?

APIs

  • Get policy API
  • Execute policy API.
  • Add manage_enrich privilege.
  • Make policies immutable. The PUT policy api should fail when a policy already exists, so effectively this api can only return a 200 response code. If a policy needs to be changed then it first needs to be removed, or alternatively, a new policy under a different name should be added. (@hub-cap) Ensure enrich policy is immutable #43604
  • A policy should not be removed if a pipeline is still referencing it. (@hub-cap) Fail delete policy if pipeline exists #44438
  • The delete policy api should first remove all enrich indices of a policy, before removing the policy from the cluster state. (@hub-cap) Remove enrich indices on delete policy #45870
  • Use has_privilege api as part of put policy api to check whether the user has sufficient privileges in source index. (@hub-cap) Validate read priv of enrich source indices #43595
  • Policy name validation. The validation should be similar to index name validation, because the policy name is used to created an index. (same validation as in MetaDataCreateIndexService#validateIndexOrAliasName) (@martijnvg)
  • Replace current get and list APIs with another API that returns both a single policy and all policies. In both cases a list should be returned. For example
    GET _enrich/policy/users-policy (specific policy) and GET _enrich/policy (all policies). Both variants should always return a list of objects. And later also support:
    GET _enrich/policy/users-* and GET _enrich/policy/users-policy,users2-policy. (@hub-cap) Consolidate enrich list all and get by name APIs #45705
  • CRUD for enrich policy (@hub-cap) _enrich/policy/name
  • Store enrich policy in an index (.enrich-policies ?) instead of in the cluster state. (@hub-cap) Use an index to store enrich policies #47475
  • Stats API
  • Integrate stats api with monitoring
  • Telemetry support
  • task api for execute ?wait_for_completion=false (@hub-cap)
  • GET wildcard and comma separated policy names (@hub-cap)

Misc


EDITS:

  • 2019-4-8: Changed the original description of this issue to reflect the current direction*
  • 20190507: Updated after planning meeting.
@jakelandis jakelandis added >enhancement :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP labels Aug 10, 2018
@jakelandis
Copy link
Contributor Author

Closing as better alternatives for these use cases have been discussed.

@jakelandis
Copy link
Contributor Author

Re-opening per further discussion.

@jakelandis jakelandis reopened this Jan 28, 2019
@jakelandis jakelandis changed the title [ingest] reference data backed enriching processor(s) [ingest] lookup processor / data backed enriching processor(s) Jan 28, 2019
@jakelandis jakelandis added :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP and removed :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP labels Jan 28, 2019
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-features

@dadoonet
Copy link
Member

dadoonet commented Mar 6, 2019

@jakelandis When I closed #20340 I started to work on a JDBC ingest plugin which was basically doing lookups to a 3rd party database. The way I designed it was by heavily using cache to make lookups running as fast as possible with local data.

2 strategies at this period:

  • cache hit by hit. The more you call ingest-jdbc the more you are caching data, the faster it runs
  • cache on ingest startup. It starts an embedded in memory database, create a schema identical to the source one, import the table data in memory.

Of course with cache eviction, memory usage protection (ie. don't load more than x kb/mb of data...).

Is that one of the thing you have in mind?

@gmoskovicz
Copy link
Contributor

This would be beneficial to do real-time lookups within Elasticsearch.

@jakelandis jakelandis changed the title [ingest] lookup processor / data backed enriching processor(s) [ingest] Enrich documents prior to indexing Apr 8, 2019
martijnvg added a commit to martijnvg/elasticsearch that referenced this issue Apr 9, 2019
martijnvg added a commit to martijnvg/elasticsearch that referenced this issue Apr 9, 2019
martijnvg added a commit that referenced this issue Apr 12, 2019
martijnvg added a commit that referenced this issue Apr 12, 2019
@jakelandis jakelandis added the 7x label Apr 15, 2019
martijnvg added a commit to martijnvg/elasticsearch that referenced this issue Apr 25, 2019
The enrich processor uses a field value from the document being
enriched and uses that to do a lookup in the locally allocated
enrich index shard. If there is a match then retrieves the source
of the enrich document from the enrich source field. This is a special
binary doc values field. The document being enriched then gets values
from the enrich document based on the configured decorate fields.

The policy contains the information what field in the enrich index
to query and what fields are available to decorate a document being
enriched with.

The enrich processor has the following configuration options:
* `policy_name` - the name of the policy this processor should use
* `enrich_key_field` - the field in the document being enriched that holds to lookup value
* `enrich_key_field_ignore_missing` - Whether to allow the key field to be missing
* `enrich_values` - a list of fields to decorate the document being enriched with.
                    Each entry holds a source field and a target field.
                    The source field indicates what decorate field to use that is available in the policy.
                    The target field controls the field name to use in the document being enriched.
                    The source and target fields can be the same.

Example pipeline config:

```
{
   "processors": [
      {
         "policy_name": "my_policy",
         "key": "host_name",
         "values": [
            {
              "source": "globalRank",
              "target": "global_rank"
            }
         ]
      }
   ]
}
```

In the above example documents are being enriched with a global rank value.
For each document that has match in the enrich index based on its host_name field,
the document gets an global rank field value, which is fetched from the `globalRank`
field in the enrich index and saved as `global_rank` in the document being enriched.

The enrich source field mapper is an internal field mapper meant to be
used by enrich exclusively.

Relates to elastic#32789
martijnvg added a commit that referenced this issue Sep 11, 2019
Changed the signature of AbstractResponseTestCase#createServerTestInstance(...)
to include the randomly selected xcontent type. This is needed for the
creating a server response instance with a query which is represented as BytesReference.
Maybe this should go into a different change?

This PR also includes HLRC docs for the get policy api.

Relates to #32789
martijnvg added a commit to martijnvg/elasticsearch that referenced this issue Sep 12, 2019
In the case that an ingest processor factory relies on other configuration
in the cluster state in order to construct a processor instance then
it is currently undetermined if a processor facotry can be notified about
a change if multiple cluster state updates are bundled together and
if a processor implement `ClusterStateApplier` interface.
(IngestService implements this interface too)

The idea with ingest cluster state listener is that it is guaranteed to
update the processor factory first before the ingest service creates
a pipeline with their respective processor instances.

Currently this concept is used in the enrich branch:
https://github.com/elastic/elasticsearch/blob/enrich/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java#L21

In this case it a processor factory is interested in enrich indices' _meta
mapping fields.

This is the third PR that merges changes made to server module from
the enrich branch (see elastic#32789) into the master branch.

Changes to the server module are merged separately from the pr that will
merge enrich into master, so that these changes can be reviewed in isolation.
martijnvg added a commit to martijnvg/elasticsearch that referenced this issue Sep 13, 2019
This change also slightly modifies the stats response,
so that is can easier consumer by monitoring and other
users. (coordinators stats are now in a list instead of
a map and has an additional field for the node id)

Relates to elastic#32789
martijnvg added a commit that referenced this issue Sep 18, 2019
In the case that an ingest processor factory relies on other configuration
in the cluster state in order to construct a processor instance then
it is currently undetermined if a processor facotry can be notified about
a change if multiple cluster state updates are bundled together and
if a processor implement `ClusterStateApplier` interface.
(IngestService implements this interface too)

The idea with ingest cluster state listener is that it is guaranteed to
update the processor factory first before the ingest service creates
a pipeline with their respective processor instances.

Currently this concept is used in the enrich branch:
https://github.com/elastic/elasticsearch/blob/enrich/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java#L21

In this case it a processor factory is interested in enrich indices' _meta
mapping fields.

This is the third PR that merges changes made to server module from
the enrich branch (see #32789) into the master branch.

Changes to the server module are merged separately from the pr that will
merge enrich into master, so that these changes can be reviewed in isolation.
martijnvg added a commit that referenced this issue Sep 18, 2019
In the case that an ingest processor factory relies on other configuration
in the cluster state in order to construct a processor instance then
it is currently undetermined if a processor facotry can be notified about
a change if multiple cluster state updates are bundled together and
if a processor implement `ClusterStateApplier` interface.
(IngestService implements this interface too)

The idea with ingest cluster state listener is that it is guaranteed to
update the processor factory first before the ingest service creates
a pipeline with their respective processor instances.

Currently this concept is used in the enrich branch:
https://github.com/elastic/elasticsearch/blob/enrich/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java#L21

In this case it a processor factory is interested in enrich indices' _meta
mapping fields.

This is the third PR that merges changes made to server module from
the enrich branch (see #32789) into the master branch.

Changes to the server module are merged separately from the pr that will
merge enrich into master, so that these changes can be reviewed in isolation.
jkakavas pushed a commit to jkakavas/elasticsearch that referenced this issue Sep 25, 2019
In the case that an ingest processor factory relies on other configuration
in the cluster state in order to construct a processor instance then
it is currently undetermined if a processor facotry can be notified about
a change if multiple cluster state updates are bundled together and
if a processor implement `ClusterStateApplier` interface.
(IngestService implements this interface too)

The idea with ingest cluster state listener is that it is guaranteed to
update the processor factory first before the ingest service creates
a pipeline with their respective processor instances.

Currently this concept is used in the enrich branch:
https://github.com/elastic/elasticsearch/blob/enrich/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java#L21

In this case it a processor factory is interested in enrich indices' _meta
mapping fields.

This is the third PR that merges changes made to server module from
the enrich branch (see elastic#32789) into the master branch.

Changes to the server module are merged separately from the pr that will
merge enrich into master, so that these changes can be reviewed in isolation.
martijnvg added a commit that referenced this issue Sep 25, 2019
This PR changes the ingest executing to be non blocking
by adding an additional method to the Processor interface
that accepts a BiConsumer as handler and changing
IngestService#executeBulkRequest(...) to ingest document
in a non blocking fashion iff a processor executes
in a non blocking fashion.

This is the second PR that merges changes made to server module from
the enrich branch (see #32789) into the master branch.

The plan is to merge changes made to the server module separately from
the pr that will merge enrich into master, so that these changes can
be reviewed in isolation.

This change originates from the enrich branch and was introduced there
in #43361.
martijnvg added a commit to martijnvg/elasticsearch that referenced this issue Sep 25, 2019
…#46241)

This PR changes the ingest executing to be non blocking
by adding an additional method to the Processor interface
that accepts a BiConsumer as handler and changing
IngestService#executeBulkRequest(...) to ingest document
in a non blocking fashion iff a processor executes
in a non blocking fashion.

This is the second PR that merges changes made to server module from
the enrich branch (see elastic#32789) into the master branch.

The plan is to merge changes made to the server module separately from
the pr that will merge enrich into master, so that these changes can
be reviewed in isolation.

This change originates from the enrich branch and was introduced there
in elastic#43361.
martijnvg added a commit that referenced this issue Sep 26, 2019
Backport of #46241

This PR changes the ingest executing to be non blocking
by adding an additional method to the Processor interface
that accepts a BiConsumer as handler and changing
IngestService#executeBulkRequest(...) to ingest document
in a non blocking fashion iff a processor executes
in a non blocking fashion.

This is the second PR that merges changes made to server module from
the enrich branch (see #32789) into the master branch.

The plan is to merge changes made to the server module separately from
the pr that will merge enrich into master, so that these changes can
be reviewed in isolation.

This change originates from the enrich branch and was introduced there
in #43361.
martijnvg added a commit that referenced this issue Sep 26, 2019
This change also slightly modifies the stats response,
so that is can easier consumer by monitoring and other
users. (coordinators stats are now in a list instead of
a map and has an additional field for the node id)

Relates to #32789
martijnvg added a commit that referenced this issue Sep 26, 2019
This change also slightly modifies the stats response,
so that is can easier consumer by monitoring and other
users. (coordinators stats are now in a list instead of
a map and has an additional field for the node id)

Relates to #32789
martijnvg added a commit to martijnvg/elasticsearch that referenced this issue Sep 30, 2019
This PR also includes HLRC docs for the enrich stats api.

Relates to elastic#32789
martijnvg added a commit to martijnvg/elasticsearch that referenced this issue Sep 30, 2019
This PR also includes HLRC docs for the enrich stats api.

Relates to elastic#32789
martijnvg added a commit that referenced this issue Oct 10, 2019
This PR also includes HLRC docs for the enrich stats api.

Relates to #32789
martijnvg added a commit that referenced this issue Oct 10, 2019
This PR also includes HLRC docs for the enrich stats api.

Relates to #32789
martijnvg added a commit to martijnvg/elasticsearch that referenced this issue Oct 14, 2019
This PR also includes HLRC docs for the enrich stats api.

Relates to elastic#32789
martijnvg added a commit that referenced this issue Oct 14, 2019
This PR also includes HLRC docs for the enrich stats api.

Relates to #32789
martijnvg added a commit that referenced this issue Oct 14, 2019
This PR also includes HLRC docs for the enrich stats api.

Relates to #32789
martijnvg added a commit that referenced this issue Oct 15, 2019
which adds a new ingest processor, named enrich processor,
that allows document being ingested to be enriched with data from other indices.

Besides a new enrich processor, this PR adds several APIs to manage an enrich policy.
An enrich policy is in charge of making the data from other indices available to the enrich processor in an efficient manner.

Closes #32789
martijnvg added a commit that referenced this issue Oct 15, 2019
which is backport merge and adds a new ingest processor, named enrich processor,
that allows document being ingested to be enriched with data from other indices.

Besides a new enrich processor, this PR adds several APIs to manage an enrich policy.
An enrich policy is in charge of making the data from other indices available to the enrich processor in an efficient manner.

Related to #32789
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP >enhancement Meta
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants