Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Connect 5.3.1 / Elasticsearch 7 problems #342

Closed
rmoff opened this issue Sep 26, 2019 · 12 comments
Assignees

Comments

@rmoff
Copy link

@rmoff rmoff commented Sep 26, 2019

Kafka Connect 5.3.1 with version 5.3.1 of io.confluent.connect.elasticsearch.ElasticsearchSinkTask, running agasinst Elasticsearch 7.3.2

So that fields (e.g. epoch timestamps, geopoints) are picked up correctly, I need to use a dynamic template.

Issue 1, dynamic templates in Elasticsearch 7

This used to work < Elasticsearch 7:

curl -XPUT "http://localhost:9200/_template/kafkaconnect/" -H 'Content-Type: application/json' -d'
        {
          "template": "*",
          "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 0
          },
          "mappings": {
            "_default_": {
              "dynamic_templates": [
                {
                  "dates": {
                    "match": "*_TS",
                    "mapping": {
                      "type": "date"
                    }
                  }
                }
              ]
            }
          }
        }'

In Elasticsearch 7 now fails by design with

{
  "error": {
    "root_cause": [
      {
        "type": "mapper_parsing_exception",
        "reason": "Root mapping definition has unsupported parameters:  [_default_ : {dynamic_templates=[{dates={mapping={type=date}, match=*_TS}}, {heights={mapping={type=float}, match=HEIGHT}}, {locations={mapping={type=geo_point}, match=LOCATION}}]}]"
      }
    ],
    "type": "mapper_parsing_exception",
    "reason": "Failed to parse mapping [_doc]: Root mapping definition has unsupported parameters:  [_default_ : {dynamic_templates=[{dates={mapping={type=date}, match=*_TS}}, {heights={mapping={type=float}, match=HEIGHT}}, {locations={mapping={type=geo_point}, match=LOCATION}}]}]",
    "caused_by": {
      "type": "mapper_parsing_exception",
      "reason": "Root mapping definition has unsupported parameters:  [_default_ : {dynamic_templates=[{dates={mapping={type=date}, match=*_TS}}, {heights={mapping={type=float}, match=HEIGHT}}, {locations={mapping={type=geo_point}, match=LOCATION}}]}]"
    }
  },
  "status": 400
}

To get this to work in Elasticsearch 7

Issue 2, Kafka Connect support for Elasticsearch 7

BUT NOW we have another problem. With a dynamic template in place, Kafka Connect Elasticsearch sink fails.

{
"type": "illegal_argument_exception",
"reason": "Rejecting mapping update to [runner_status] as the final mapping would have more than 1 type: [_doc, type.name=kafkaconnect]"
}

Full error:

[2019-09-25 19:45:01,040] WARN Encountered an illegal document error when executing batch 120 of 1 records. Ignoring and will not index record. Error was [{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [runner_status] as the final mapping would have more than 1 type: [_doc, type.name=kafkaconnect]"}] (io.confluent.connect.elasticsearch.bulk.BulkProcessor:416)

Example Kafka Connect connector config:

curl -i -X PUT -H  "Content-Type:application/json" \
      http://localhost:8083/connectors/sink-elastic-cloud-runner_location-00/config \
      -d '{
            "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
            "connection.url": "http://localhost:9200",
            "type.name": "type.name=kafkaconnect",
            "behavior.on.malformed.documents": "warn",
            "topics": "RUNNER_LOCATION",
            "key.ignore": "true",
            "schema.ignore": "true"
            }'

If you change "type.name": "type.name=kafkaconnect", to "type.name": "type.name=_doc" you get the same error:

{
    "type":"illegal_argument_exception",
    "reason":"Rejecting mapping update to [runner_status] as the final mapping would have more than 1 type: [_doc, type.name=_doc]"
}

Only option is to remove the dynamic template, and then the connector will work—but now you don't get your field types set.

Exact same error if you create a static mapping for specific index.

Only option I've found so far is to move back to Elasticsearch 6, at which point everything works as it should :)

Related: #314

@strigona-worksight

This comment has been minimized.

Copy link

@strigona-worksight strigona-worksight commented Oct 1, 2019

I'm not 100% sure here, but when we tested this out (successfully) we set "type.name": "" in Kafka Connector config,

@rmoff

This comment has been minimized.

Copy link
Author

@rmoff rmoff commented Oct 1, 2019

This is what I ran against ES7 , including include_type_name

curl -XPUT "http://localhost:9200/_template/kafkaconnect/?include_type_name=true" 
        -H 'Content-Type: application/json' -d'
{
            "template": "*",
            "settings": { "number_of_shards": 1, "number_of_replicas": 0 },
            "mappings": { "_default_" : {
                "dynamic_templates": [
                    { "dates": { "match": "*_TS", "mapping": { "type": "date" } } },
                    { "heights": { "match": "HEIGHT", "mapping": { "type": "float" } } },
                    { "locations": { "match": "LOCATION", "mapping": { "type": "geo_point" } } }
                ] } } }'
@Duchadian

This comment has been minimized.

Copy link

@Duchadian Duchadian commented Oct 1, 2019

curl -XPUT "http://localhost:9200/_template/kafkaconnect/?include_type_name=true"
-H 'Content-Type: application/json' -d'
{
"template": "",
"settings": { "number_of_shards": 1, "number_of_replicas": 0 },
"mappings": { "default" : {
"dynamic_templates": [
{ "dates": { "match": "
_TS", "mapping": { "type": "date" } } },
{ "heights": { "match": "HEIGHT", "mapping": { "type": "float" } } },
{ "locations": { "match": "LOCATION", "mapping": { "type": "geo_point" } } }
] } } }'

BUT NOW we have another problem. With a dynamic template in place, Kafka Connect Elasticsearch sink fails

What I think is happening here is that you are confusing the type of the document with the name of the template in Elasticsearch. The curl command above does not create a type "kafkaconnect", but a type _doc, i.e. the default. The kafkaconnect name in the curl command is the name you give to the template in question. Elasticsearch does not use it in any way to determine the type of a document.

If you change "type.name": "type.name=kafkaconnect", to "type.name": "type.name=_doc" you get the same error

It seems to me that the response you are getting here is also correct. The configuration should be "type.name': "_doc" based on the index pattern you have defined. It is looking for the literal string "type.name=_doc" when trying to index the document.

if you want to define the document type as being "kafka-connect" (the default for kafka connect), then the following would work:
curl -XPUT "http://localhost:9200/_template/kafkaconnect/?include_type_name=true" -H 'Content-Type: application/json' -d' { "template": "*", "settings": { "number_of_shards": 1, "number_of_replicas": 0 }, "mappings": { "kafka-connect" : { "dynamic_templates": [ { "dates": { "match": "*_TS", "mapping": { "type": "date" } } }, { "heights": { "match": "HEIGHT", "mapping": { "type": "float" } } }, { "locations": { "match": "LOCATION", "mapping": { "type": "geo_point" } } } ] } } }'

Note: after editing the index template, the index needs to be removed and then created again. Otherwise the type change is not propagated.

@purbon purbon self-assigned this Oct 4, 2019
@purbon

This comment has been minimized.

Copy link
Member

@purbon purbon commented Oct 4, 2019

Hi @rmoff looking at this issue right now. I keep the issue updated with thoughts.

@purbon

This comment has been minimized.

Copy link
Member

@purbon purbon commented Oct 4, 2019

@rmoff speaking about issue 1 as described by your post, I can't see how this is related to the connector, do you mind helping me? This issue is related to the changes introduced by Elasticsearch 7 as you say as well in your description, right?

@purbon

This comment has been minimized.

Copy link
Member

@purbon purbon commented Oct 4, 2019

@rmoff now talking about issue 2,

I think there is a typo in the type name of your config, you use something like:

   "type.name": "type.name=kafkaconnect",

why not

            "type.name": "_doc",

this value is passed directly into the createMapping function, as you can see:

if (Mapping.getMapping(client, index, type) == null) {
            Mapping.createMapping(client, index, type, sinkRecord.valueSchema());
          }

and is passed directly into elastic during index creation. Might be what we're seeing here is an issue with the config validation?

As noted by @Duchadian as well, I see probably an improvement proposal here and is a proper template handling by the connector, but this might be a discussion for another issue. As per my understanding of the connector now and in the past, if you decide to create use a template and disable the scheme inference, as a user you enter an unknown arena were you have to narrow down the elastic behaviour.

@Duchadian

This comment has been minimized.

Copy link

@Duchadian Duchadian commented Oct 4, 2019

@purbon @rmoff

Might be what we're seeing here is an issue with the config validation?

As far as my understanding of Elasticsearch goes, this is sort of what is happening. When a document is indexed into Elasticsearch, the type mapping of the document in question is tested against the type mapping that is present for an index. This is done by checking which fields in the document match to the fields defined in the template.

In Elasticsearch 5 and lower, if the document matches more than type, the document is still indexed (although I do believe that Elasticsearch raises a warning in its own log). In Elasticsearch 6 and higher, the document is NOT indexed, and an error like this is raised:

{ "type": "illegal_argument_exception", "reason": "Rejecting mapping update to [runner_status] as the final mapping would have more than 1 type: [_doc, type.name=kafkaconnect]" }

The problem we are seeing results from attempting to index a document that matches more than one type. Because an _doc type is present that matches the document, and the document itself states its type as being type.name=kafkaconnect , Elasticsearch 6 and higher rejects the document.

@purbon

This comment has been minimized.

Copy link
Member

@purbon purbon commented Oct 4, 2019

Hi @Duchadian thanks for your answer.

I agree with your observation, but using types in elasticsearch has been discouraged as a bad practise for many, many years.

Having said that, I wonder is a config that contains

   "type.name": "type.name=kafkaconnect",

does not feels correct to me.

but as well all this started with a dynamic template being created outside the connector, unless the template is in control of the connector this is outside of the things the connector can help out. I'm I missing anything?

@Duchadian

This comment has been minimized.

Copy link

@Duchadian Duchadian commented Oct 4, 2019

Hi @purbon, I totally agree, on both the use of types and the configuration.

In my opinion, it is usually best to not touch _type in Elasticsearch 6+ unless you really have to.

I think it might be fruitful to add some documentation regarding the use of templates, with specific mention to avoid adding extra types. If you really have to change the template to get specific functionality (as @rmoff was doing), it would likely be best to update the template that (I believe) is pushed by the connector.

@purbon

This comment has been minimized.

Copy link
Member

@purbon purbon commented Oct 4, 2019

As described in (and used during mapping and index and mapping manipulation by the connector)

Screen Shot 2019-10-04 at 19 25 15

and as used in the test, that run with Elasticsearch 7.0 using TestContainers, see https://github.com/confluentinc/kafka-connect-elasticsearch/blob/master/src/test/java/io/confluent/connect/elasticsearch/integration/MappingIT.java#L37

I'm actually thinking out loud, but might be @rmoff can confirm:

  • The template was created without specify a mapping, so _doc was used.
  • If the template would have been created with the expected type (type.name=kafkaconnect), no error would have shown.
  • Another error is the 2 mapping error during document creation due to adding the type information in the call using the jest client.

Does this makes sense?

If this makes sense, the ways I see to "help" the user experience with this errors is:

  • Add documentation that if you use dynamic mapping out of the connector you should follow some practises and up to date references to Elasticsearch. As commented earlier in this thread.
  • If ES.version>6, don't allow more than one type.
  • If ES.version>6 is used, get the mapping for the destination index (if available) and raise an error if a type other than the expected is used.

I think I like option 1 and 2 for now, will help people doing templates and going into mistakes.

@Duchadian

This comment has been minimized.

Copy link

@Duchadian Duchadian commented Oct 4, 2019

@purbon , I fully agree with your last comment.

If ES.version>6 is used, get the mapping for the destination index (if available) and raise an error if a type other than the expected is used.

I think this nuance compared to your previous comment is important. I don't think it matters if the user is using another type than _doc, as long as the type is the same as the type the connector is expecting.

Besides that, no comments.

@rmoff

This comment has been minimized.

Copy link
Author

@rmoff rmoff commented Oct 7, 2019

Thanks for all the input & help @Duchadian @purbon @strigona-worksight. As you can tell I am not an Elasticsearch expert so this has taken some flailing around, plus a silly typo in my config ( "type.name": "type.name=kafkaconnect", as @purbon spotted) didn't help.

I can confirm that Kafka Connect Elasticsearch 5.3.1 works very happily with Elasticsearch 7.4. I create a template:

curl -XPUT "http://localhost:9200/_template/kafkaconnect/" \
        -H 'Content-Type: application/json' -d \
        ' {
            "template": "*",
            "settings": {
                "number_of_shards": 1,
                "number_of_replicas": 0
            },
            "mappings": {
                    "dynamic_templates": [
                        {
                            "dates": {
                                "match": "*_TS",
                                "mapping": {
                                    "type": "date"
                                }
                            }
                        },
                        {
                            "heights": {
                                "match": "HEIGHT",
                                "mapping": {
                                    "type": "float"
                                }
                            }
                        },
                        {
                            "locations": {
                                "match": "LOCATION",
                                "mapping": {
                                    "type": "geo_point"
                                }
                            }
                        }
                    ]
                }
            }
        }'

Then I create a connector, with type_name set to blank:

curl -i -X PUT -H  "Content-Type:application/json" \
            http://localhost:8083/connectors/sink-elastic-07/config \
            -d '{
                    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
                    "connection.url": "http://elasticsearch:9200",
                    "type.name": "",
                    "behavior.on.malformed.documents": "warn",
                    "topics": "RUNNER_LOCATION_00",
                    "key.ignore": "true",
                    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
                    "schema.ignore": "true"
                    }'

Documents are indexed under the _doc type:

curl -XPOST -s "http://localhost:9200/runner_location_00/_search" | jq '.hits.hits[0]._type'
"_doc"

Mappings are correctly picked up from the template:

curl -XGET -s "http://localhost:9200/runner_location_00/_mapping" | jq '.runner_location_00.mappings.properties.LOCATION'
{
  "type": "geo_point"
}
@rmoff rmoff closed this Oct 7, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.