Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parameter ES_MAPPING_TIMESTAMP is not working using saveToEs #765

Closed
armaseg opened this issue May 18, 2016 · 6 comments
Closed

Parameter ES_MAPPING_TIMESTAMP is not working using saveToEs #765

armaseg opened this issue May 18, 2016 · 6 comments

Comments

@armaseg
Copy link

armaseg commented May 18, 2016

Hello,

I'm trying to save a document on Elasticsearch using Scala and the parameter ES_MAPPING_TIMESTAMP isn't working: I can see my field in my final document on ES but not as @timestamp. Nevertheless, all the other parameters are working.

      sparkConf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, true)
      .set(ConfigurationOptions.ES_NODES_WAN_ONLY, true)
      .set(ConfigurationOptions.ES_INPUT_JSON,false)

     stream.foreachRDD { rdd =>
        if (rdd.count() > 0) {
        val event = sqlContext.read.json(rdd.values)
        event.saveToEs(Map(ConfigurationOptions.ES_RESOURCE -> "{topic}-{calendardate:YYYY.MM.dd}/log",
            ConfigurationOptions.ES_MAPPING_TIMESTAMP -> "calendardate",
            ConfigurationOptions.ES_MAPPING_EXCLUDE -> "id",
            ConfigurationOptions.ES_MAPPING_ID -> "id"))
       }
      }

My final document looks like this :

{
   "took": 1,
   "timed_out": false,
   "_shards": {
      "total": 5,
      "successful": 5,
      "failed": 0
   },
   "hits": {
      "total": 1,
      "max_score": 1,
      "hits": [
         {
            "_index": "xxx-2016.03.18",
            "_type": "log",
            "_id": "yyy-123",
            "_score": 1,
            "_source": {
               "@timestamp": "2016-05-18T16:39:39.317Z",
               "@version": "1",
               "foo": "bar",
               "topic": "xxx",
               "timenow": "2016-05-18T16:39:49.258Z",
               "calendardate": "2016-03-18T10:11:28.123Z"
            }
         }
      ]
   }
}

On the other hand, on Java using JavaEsSpark.saveJsonToEs the parameter ES_MAPPING_TIMESTAMP works, but not the ConfigurationOptions.ES_MAPPING_EXCLUDE ( related to #381)

Version Info

OS: : Windows 7 64 bits/ CentOS
JVM : JDK 1.8
Hadoop/Spark: HDP 2.4 / 1.6.0
ES-Hadoop : 2.3.1
ES : 2.3.1

@armaseg
Copy link
Author

armaseg commented May 19, 2016

Same behavior on ES-Hadoop : 2.3.2

@costin
Copy link
Member

costin commented May 25, 2016

@armaseg it looks like you are reporting two bugs here:

  1. ES_MAPPING_TIMESTAMP
    I'm unclear from your example what you expected vs the actual behaviour? Do you have timestamp enabled?
    I did a quick test in master and the field is properly extracted and used in the bulk request.
    Can you confirm the ES version used? Additionally you mention in JavaEsSpark.saveJsonToEs the timestamp options works but you cannot exclude it - can you compare the two?
  2. ES_MAPPING_EXCLUDE
    Added a mapping exclude and again this seems to be working.

See below the test case (will soon be in master):

  @Test
  def testEsRDDWriteWithMappingTimestamp() {
    val mapping = """{ "scala-timestamp-write": {
      | "_timestamp" : {
      |   "enabled":true
      | }
      |}
      }""".stripMargin

    val index = "spark-test"
    val target = s"$index/scala-timestamp-write"
    RestUtils.touch(index)
    RestUtils.putMapping(target, mapping.getBytes(StringUtils.UTF_8))


    val doc1 = Map("one" -> null, "two" -> Set("2"), "number" -> 1, "date" -> "2016-05-18T16:39:39.317Z")
    val doc2 = Map("OTP" -> "Otopeni", "SFO" -> "San Fran", "number" -> 2, "date" -> "2016-03-18T10:11:28.123Z")

    sc.makeRDD(Seq(doc1, doc2)).saveToEs(target, Map(ES_MAPPING_ID -> "number", ES_MAPPING_TIMESTAMP -> "date", ES_MAPPING_EXCLUDE -> "date"))

    assertEquals(2, EsSpark.esRDD(sc, target).count());
    assertTrue(RestUtils.exists(target + "/1"))
    assertTrue(RestUtils.exists(target + "/2"))

    val search = RestUtils.get(target + "/_search?")
    assertThat(search, containsString("SFO"))
    assertThat(search, not(containsString("date")))
    assertThat(search, containsString("_timestamp"))
  }

and the http logs:

17:00:05,865 TRACE Executor task launch worker-0 commonshttp.CommonsHttpTransport - Tx [PUT]@[127.0.0.1:9500][spark-test/scala-timestamp-write/_bulk] w/ payload [{"index":{"_id":1,"_timestamp":"2016-05-18T16:39:39.317Z"}}
{"one":null,"two":["2"],"number":1}
{"index":{"_id":2,"_timestamp":"2016-03-18T10:11:28.123Z"}}
{"OTP":"Otopeni","SFO":"San Fran","number":2}
]
...
17:00:06,049 TRACE Executor task launch worker-0 commonshttp.CommonsHttpTransport - Rx @[127.0.0.1] [200-OK] [{"_scroll_id":"DnF1ZXJ5VGhlbkZldGNoAQAAAAAAAAANFjAxUGlrQkdGU3ZTZnlQQXZzUXFqbGc=","took":3,"timed_out":false,"_shards":{"total":1,"successful":1,"failed":0},"hits":{"total":1,"max_score":null,"hits":[{"_index":"spark-test","_type":"scala-timestamp-write","_id":"2","_score":null,"_timestamp":1458295888123,"_source":{"OTP":"Otopeni","SFO":"San Fran","number":2},"sort":[0]}]}}]

Note that unless timestamp is enabled, it will not be indexed.

Last but not least, the usage of timestamp is deprecated in favour of a dedicated, user defined, date field.

costin added a commit that referenced this issue May 25, 2016
@armaseg
Copy link
Author

armaseg commented May 26, 2016

Hi @costin , Thank you very much for all the time you spent working on the tests.

About your questions:

  • My ES version :
{
  "name" : "Flygirl",
  "cluster_name" : "cluster_name",
  "version" : {
    "number" : "2.3.1",
    "build_hash" : "xxx",
    "build_timestamp" : "2016-04-04T12:25:05Z",
    "build_snapshot" : false,
    "lucene_version" : "5.5.0"
  },
  "tagline" : "You Know, for Search"
}
  • Expected document: same date on "@timestamp" than on "calendardate" without the id field in the source section
{
   "took": 1,
   "timed_out": false,
   "_shards": {
      "total": 5,
      "successful": 5,
      "failed": 0
   },
   "hits": {
      "total": 1,
      "max_score": 1,
      "hits": [
         {
            "_index": "xxx-2016.03.18",
            "_type": "log",
            "_id": "yyy-123",
            "_score": 1,
            "_source": {
               "@timestamp": "2016-03-18T16:39:39.317Z",
               "@version": "1",
               "foo": "bar",
               "topic": "xxx",
               "timenow": "2016-05-18T16:39:49.258Z",
               "calendardate": "2016-03-18T10:11:28.123Z"
            }
         }
      ]
   }
}
  • Do you have timestamp enabled?
    I didn't create the mapping in advance: on Java and Scala I used dynamic mapping (that is something different on your tests). It may be a different behavior creating dynamically the mapping?
  • My Java code:
sparkConf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true");
sparkConf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true");
sparkConf.set(ConfigurationOptions.ES_INPUT_JSON, "false");

JavaSparkContext jsc = new JavaSparkContext(sparkConf);

JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(JSON));
JavaEsSpark.saveJsonToEs(stringRDD, ImmutableMap.of(
ConfigurationOptions.ES_RESOURCE,"{topic}-{calendardate:YYYY.MM.dd}/log" 
ConfigurationOptions.ES_MAPPING_TIMESTAMP, "calendardate", 
ConfigurationOptions.ES_MAPPING_EXCLUDE, "id", 
ConfigurationOptions.ES_MAPPING_ID,"id"));
  • My final document looks like this :
{
   "took": 1,
   "timed_out": false,
   "_shards": {
      "total": 5,
      "successful": 5,
      "failed": 0
   },
   "hits": {
      "total": 1,
      "max_score": 1,
      "hits": [
         {
            "_index": "xxx-2016.03.18",
            "_type": "log",
            "_id": "yyy-123",
            "_score": 1,
            "_source": {
               "@timestamp": "2016-03-18T16:39:39.317Z",
               "@version": "1",
               "foo": "bar",
               "topic": "xxx",
               "id":"yyy-123",
               "timenow": "2016-05-18T16:39:49.258Z",
               "calendardate": "2016-03-18T10:11:28.123Z"
            }
         }
      ]
   }
}

I'm using othe field to handle the date, but I wanted to report this that looks like a weird behavior to me.

Did you try ES_MAPPING_EXCLUDE using saveJsonToEs without creating the mapping before?

Thanks!

@costin
Copy link
Member

costin commented May 27, 2016

I think there's a misunderstanding on how timestamp works. Everything inside _source is user defined data. The timestamp field is a metadata one meaning it sits outside _source, that is, outside the document data.
If you look at the doc results in my previous comment you'll notice the following:

{
    "_index" : "spark-test",
    "_type" : "scala-timestamp-write",
    "_id" : "2",
    "_score" : null,
    "_timestamp" : 1458295888123,
    "_source" : {
        "OTP" : "Otopeni",
        "SFO" : "San Fran",
        "number" : 2
    },
    "sort" : [0]
}

Basically a field in the data in Spark was used to extract the metadata field and excluded from the source before indexing it.
In your case it looks like:

  1. you are expecting @timestamp to be that field, which is incorrect - this field comes from your data (and however generates it)
  2. you have not defined/enabled in the mapping the timestamp (again it is metadata) and thus any information about it, is simply ignored.

Hopefully this clarifies things..

@armaseg
Copy link
Author

armaseg commented May 27, 2016

Indeed, I got confused about the logstash @timestamp and the ES _timestamp. Thank you for clarifying these. You may set this bug as Invalid.

Thank you again!

@costin costin added invalid and removed bug labels May 27, 2016
@costin costin closed this as completed May 27, 2016
@costin
Copy link
Member

costin commented May 27, 2016

closed. cheers.

withccm pushed a commit to withccm/elasticsearch-hadoop that referenced this issue Jun 21, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants