Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

savetoES can't use pre-defined mapping #424

Closed
jiangff opened this issue Apr 15, 2015 · 6 comments
Closed

savetoES can't use pre-defined mapping #424

jiangff opened this issue Apr 15, 2015 · 6 comments

Comments

@jiangff
Copy link

jiangff commented Apr 15, 2015

I am using the saveToES method to write an RDD into elasticsearch. Because I need to specify the mapping, I set the "es.index.auto.create" as "false" and define a mapping beforehand using the other scala elasticsearch library (com.sksamuel.elastic4s._) . The mapping is created successfully, but after I called "saveToES" to save the RDD to the predefined index/map , the data are not consistent with the mapping I created beforehand. Below is my code:

To create mapping:

def createElasticSearchMapping(host:String, port:String, indexName:String, schema:String): Unit = {

  val uri = ElasticsearchClientUri("elasticsearch://" + host + ":" + port)

  println("elasticsearch://" + host + ":" + port)

  val settings = ImmutableSettings.settingsBuilder().put("cluster.name","tv3devescluster").build()
  val client = ElasticClient.remote(settings, (uri))


  client.execute {

    val index = indexName
    create index indexName mappings (
      schema as (
        "accountid" typed StringType index "not_analyzed",
        "deviceid" typed StringType index "not_analyzed",
        "userid" typed StringType index "not_analyzed",
        "eventtype" typed StringType index "not_analyzed",
        "assetid" typed StringType index "not_analyzed",
        "starttime" typed DateType,
        "endtime" typed DateType,
        "durationinsec" typed IntegerType,
        "stssessionid" typed StringType index "not_analyzed",
        "ipaddress" typed StringType index "not_anayzed",
        "latitude" typed FloatType,
        "longitude" typed FloatType
        )
      )

  }
}

---- and later, write to ES ----------------------

userActivityRDD.map( line => HashMap("accountid" -> line._1.accountId,
                                 "deviceid" -> line._1.deviceId,
                                 "userid" -> line._1.userId,
                                 "eventtype" -> line._1.eventType,
                                 "assetid" -> line._1.assetId,
                                 "starttime" -> line._1.startTime.toString(),
                                 "endtime" -> line._1.endTime.toString(),
                                 "durationinsec" -> line._1.duration,
                                 "stssessionid" -> line._1.stsSessionId,
                                 "ipaddress" -> line._2,
                                 "latitude" -> line._3,
                                 "longitude" -> line._4) )
               .distinct()
               .saveToEs(indexName + "/" + schema)

I believe I have set the spark conf properly:

conf.set("es.nodes", esHost)
    .set("es.port", "9200")
    .set("es.index.auto.create","false")
    .set("index.mapper.dynamic","false")

So although I have specified in the code to have, for example, deviceid as "not_analyzed", the deviceid data in indexName/schema still end up as "analyzed" in ES. It seems like savetoES overwrites the predefined map.

Is there a workround on this?

Thanks!

@costin
Copy link
Member

costin commented Apr 15, 2015

That is strange. what version of elasticsearch are you using? Can you
please turn on logging (see the docs) in particular the rest package and
upload the logs somewhere / a gist ?
Do you have multiple nodes or only one in your cluster?
Es-hadoop simply reuses the index definition - if it's there, it should be
picked up - if it doesn't than either the definition doesn't exist (maybe
it's a different index) or it was not propagated across the cluster (if you
are using multiple nodes).

On Wed, Apr 15, 2015 at 4:29 AM, jiangff notifications@github.com wrote:

I am using the saveToES method to write an RDD into elasticsearch. Because
I need to specify the mapping, I set the "es.index.auto.create" as "false"
and define a mapping beforehand using the other scala elasticsearch library
(com.sksamuel.elastic4s._) . The mapping is created successfully, but after
I called "saveToES" to save the RDD to the predefined index/map , the data
are not consistent with the mapping I created beforehand. Below is my code:

To create mapping:---------------------------------------------------

def createElasticSearchMapping(host:String, port:String, indexName:String,
schema:String): Unit = {

val uri = ElasticsearchClientUri("elasticsearch://" + host + ":" + port)

println("elasticsearch://" + host + ":" + port)

val settings = ImmutableSettings.settingsBuilder().put("cluster.name
","tv3devescluster").build()
val client = ElasticClient.remote(settings, (uri))

client.execute {

val index = indexName
create index indexName mappings (
schema as (
"accountid" typed StringType index "not_analyzed",
"deviceid" typed StringType index "not_analyzed",
"userid" typed StringType index "not_analyzed",
"eventtype" typed StringType index "not_analyzed",
"assetid" typed StringType index "not_analyzed",
"starttime" typed DateType,
"endtime" typed DateType,
"durationinsec" typed IntegerType,
"stssessionid" typed StringType index "not_analyzed",
"ipaddress" typed StringType index "not_anayzed",
"latitude" typed FloatType,
"longitude" typed FloatType
)
)

}
}

---- and later, write to ES ----------------------
userActivityRDD.map( line => HashMap("accountid" -> line._1.accountId,
"deviceid" -> line._1.deviceId,
"userid" -> line._1.userId,
"eventtype" -> line._1.eventType,
"assetid" -> line._1.assetId,
"starttime" -> line._1.startTime.toString(),
"endtime" -> line._1.endTime.toString(),
"durationinsec" -> line._1.duration,
"stssessionid" -> line._1.stsSessionId,
"ipaddress" -> line._2,
"latitude" -> line._3,
"longitude" -> line._4) )
.distinct()
.saveToEs(indexName + "/" + schema)

I believe I have set the spark conf properly:
conf.set("es.nodes", esHost)
.set("es.port", "9200")
.set("es.index.auto.create","false")
.set("index.mapper.dynamic","false")

So although I have specified in the code to have, for example, deviceid as
"not_analyzed", the deviceid data in indexName/schema still end up as
"analyzed" in ES. It seems like savetoES overwrites the predefined map.

Is there a workround on this?

Thanks!


Reply to this email directly or view it on GitHub
#424.

@jiangff
Copy link
Author

jiangff commented Apr 16, 2015

Below is the output from debug. I am using a 3 node ES cluster (version 1.4.4):

15/04/16 00:09:38 INFO CassandraConnector: Disconnected from Cassandra cluster: MediaroomAnalyticsDevtestCassandra
15/04/16 00:12:02 DEBUG EsRDDWriter: Using pre-defined writer serializer [org.elasticsearch.spark.serialization.ScalaValueWriter] as default
15/04/16 00:12:02 DEBUG EsRDDWriter: Using pre-defined field extractor [org.elasticsearch.spark.serialization.ScalaMapFieldExtractor] as default
15/04/16 00:12:03 DEBUG EsRDDWriter: Nodes discovery enabled - found [10.0.0.6:9200, 10.0.0.5:9200, 10.0.0.4:9200]
15/04/16 00:12:03 DEBUG EsRDDWriter: Discovered Elasticsearch version [1.4.4]
15/04/16 00:12:03 DEBUG EsRDDWriter: Resource [useractivity-2015.04.14/devtest] resolves as a single index
15/04/16 00:12:03 DEBUG NetworkClient: Opening (pinned) network client to ***:9200
15/04/16 00:12:07 DEBUG RestRepository: Closing repository and connection to Elasticsearch ...
15/04/16 00:12:07 DEBUG RestRepository: Sending batch of [0] bytes/[0] entries
15/04/16 00:12:07 DEBUG NetworkClient: Opening (pinned) network client to 10.0.0.5:9200
15/04/16 00:12:07 DEBUG EsRDDWriter: Partition writer instance [4] assigned to primary shard [4] at address [10.0.0.5:9200]
15/04/16 00:12:07 DEBUG RestRepository: Closing repository and connection to Elasticsearch ...
15/04/16 00:12:07 DEBUG RestRepository: Sending batch of [108107] bytes/[272] entries
15/04/16 00:12:07 DEBUG RestRepository: Refreshing index [useractivity-2015.04.14/devtest]
15/04/16 00:12:08 DEBUG EsRDDWriter: Using pre-defined writer serializer [org.elasticsearch.spark.serialization.ScalaValueWriter] as default
15/04/16 00:12:08 DEBUG EsRDDWriter: Using pre-defined field extractor [org.elasticsearch.spark.serialization.ScalaMapFieldExtractor] as default
15/04/16 00:12:08 DEBUG EsRDDWriter: Nodes discovery enabled - found [10.0.0.6:9200, 10.0.0.5:9200, 10.0.0.4:9200]
15/04/16 00:12:08 DEBUG EsRDDWriter: Discovered Elasticsearch version [1.4.4]
15/04/16 00:12:08 DEBUG EsRDDWriter: Resource [useractivity-2015.04.14/devtest] resolves as a single index
15/04/16 00:12:08 DEBUG NetworkClient: Opening (pinned) network client to 10.0.0.6:9200
15/04/16 00:12:08 DEBUG RestRepository: Closing repository and connection to Elasticsearch ...
15/04/16 00:12:08 DEBUG RestRepository: Sending batch of [0] bytes/[0] entries
15/04/16 00:12:08 DEBUG NetworkClient: Opening (pinned) network client to 10.0.0.5:9200
15/04/16 00:12:08 DEBUG EsRDDWriter: Partition writer instance [1] assigned to primary shard [1] at address [10.0.0.5:9200]
15/04/16 00:12:08 DEBUG RestRepository: Closing repository and connection to Elasticsearch ...
15/04/16 00:12:08 DEBUG RestRepository: Sending batch of [119493] bytes/[301] entries
15/04/16 00:12:08 DEBUG RestRepository: Refreshing index [useractivity-2015.04.14/devtest]
15/04/16 00:12:09 DEBUG EsRDDWriter: Using pre-defined writer serializer [org.elasticsearch.spark.serialization.ScalaValueWriter] as default
15/04/16 00:12:09 DEBUG EsRDDWriter: Using pre-defined field extractor [org.elasticsearch.spark.serialization.ScalaMapFieldExtractor] as default
15/04/16 00:12:09 DEBUG EsRDDWriter: Nodes discovery enabled - found [10.0.0.6:9200, 10.0.0.5:9200, 10.0.0.4:9200]
15/04/16 00:12:09 DEBUG EsRDDWriter: Discovered Elasticsearch version [1.4.4]
15/04/16 00:12:09 DEBUG EsRDDWriter: Resource [useractivity-2015.04.14/devtest] resolves as a single index
15/04/16 00:12:09 DEBUG NetworkClient: Opening (pinned) network client to 10.0.0.5:9200
15/04/16 00:12:09 DEBUG RestRepository: Closing repository and connection to Elasticsearch ...
15/04/16 00:12:09 DEBUG RestRepository: Sending batch of [0] bytes/[0] entries
15/04/16 00:12:09 DEBUG NetworkClient: Opening (pinned) network client to 10.0.0.4:9200
15/04/16 00:12:09 DEBUG EsRDDWriter: Partition writer instance [2] assigned to primary shard [2] at address [10.0.0.4:9200]
15/04/16 00:12:09 DEBUG RestRepository: Closing repository and connection to Elasticsearch ...
15/04/16 00:12:09 DEBUG RestRepository: Sending batch of [115226] bytes/[291] entries
15/04/16 00:12:09 DEBUG RestRepository: Refreshing index [useractivity-2015.04.14/devtest]
15/04/16 00:12:09 DEBUG EsRDDWriter: Using pre-defined writer serializer [org.elasticsearch.spark.serialization.ScalaValueWriter] as default
15/04/16 00:12:09 DEBUG EsRDDWriter: Using pre-defined field extractor [org.elasticsearch.spark.serialization.ScalaMapFieldExtractor] as default
15/04/16 00:12:09 DEBUG EsRDDWriter: Nodes discovery enabled - found [10.0.0.6:9200, 10.0.0.5:9200, 10.0.0.4:9200]
15/04/16 00:12:09 DEBUG EsRDDWriter: Discovered Elasticsearch version [1.4.4]
15/04/16 00:12:09 DEBUG EsRDDWriter: Resource [useractivity-2015.04.14/devtest] resolves as a single index
15/04/16 00:12:09 DEBUG NetworkClient: Opening (pinned) network client to 10.0.0.4:9200
15/04/16 00:12:11 DEBUG RestRepository: Closing repository and connection to Elasticsearch ...
15/04/16 00:12:11 DEBUG RestRepository: Sending batch of [0] bytes/[0] entries
15/04/16 00:12:11 DEBUG NetworkClient: Opening (pinned) network client to 10.0.0.6:9200
15/04/16 00:12:11 DEBUG EsRDDWriter: Partition writer instance [3] assigned to primary shard [3] at address [10.0.0.6:9200]
15/04/16 00:12:11 DEBUG RestRepository: Closing repository and connection to Elasticsearch ...
15/04/16 00:12:11 DEBUG RestRepository: Sending batch of [117498] bytes/[296] entries
15/04/16 00:12:11 DEBUG RestRepository: Refreshing index [useractivity-2015.04.14/devtest]
15/04/16 00:12:11 DEBUG EsRDDWriter: Using pre-defined writer serializer [org.elasticsearch.spark.serialization.ScalaValueWriter] as default
15/04/16 00:12:11 DEBUG EsRDDWriter: Using pre-defined field extractor [org.elasticsearch.spark.serialization.ScalaMapFieldExtractor] as default
15/04/16 00:12:11 DEBUG EsRDDWriter: Nodes discovery enabled - found [10.0.0.6:9200, 10.0.0.5:9200, 10.0.0.4:9200]
15/04/16 00:12:11 DEBUG EsRDDWriter: Discovered Elasticsearch version [1.4.4]
15/04/16 00:12:11 DEBUG EsRDDWriter: Resource [useractivity-2015.04.14/devtest] resolves as a single index
15/04/16 00:12:11 DEBUG NetworkClient: Opening (pinned) network client to tv3devesusw1.cloudapp.net:9200
15/04/16 00:12:11 DEBUG RestRepository: Closing repository and connection to Elasticsearch ...
15/04/16 00:12:11 DEBUG RestRepository: Sending batch of [0] bytes/[0] entries
15/04/16 00:12:11 DEBUG NetworkClient: Opening (pinned) network client to 10.0.0.5:9200
15/04/16 00:12:11 DEBUG EsRDDWriter: Partition writer instance [4] assigned to primary shard [4] at address [10.0.0.5:9200]
15/04/16 00:12:11 DEBUG RestRepository: Closing repository and connection to Elasticsearch ...
15/04/16 00:12:11 DEBUG RestRepository: Sending batch of [114942] bytes/[290] entries
15/04/16 00:12:11 DEBUG RestRepository: Refreshing index [useractivity-2015.04.14/devtest]
15/04/16 00:12:11 DEBUG EsRDDWriter: Using pre-defined writer serializer [org.elasticsearch.spark.serialization.ScalaValueWriter] as default
15/04/16 00:12:11 DEBUG EsRDDWriter: Using pre-defined field extractor [org.elasticsearch.spark.serialization.ScalaMapFieldExtractor] as default
15/04/16 00:12:11 DEBUG EsRDDWriter: Nodes discovery enabled - found [10.0.0.6:9200, 10.0.0.5:9200, 10.0.0.4:9200]
15/04/16 00:12:11 DEBUG EsRDDWriter: Discovered Elasticsearch version [1.4.4]
15/04/16 00:12:11 DEBUG EsRDDWriter: Resource [useractivity-2015.04.14/devtest] resolves as a single index
15/04/16 00:12:11 DEBUG NetworkClient: Opening (pinned) network client to 10.0.0.6:9200
15/04/16 00:12:11 DEBUG RestRepository: Closing repository and connection to Elasticsearch ...
15/04/16 00:12:11 DEBUG RestRepository: Sending batch of [0] bytes/[0] entries
15/04/16 00:12:11 DEBUG NetworkClient: Opening (pinned) network client to 10.0.0.6:9200
15/04/16 00:12:11 DEBUG EsRDDWriter: Partition writer instance [5] assigned to primary shard [0] at address [10.0.0.6:9200]
15/04/16 00:12:11 DEBUG RestRepository: Closing repository and connection to Elasticsearch ...
15/04/16 00:12:11 DEBUG RestRepository: Sending batch of [112604] bytes/[284] entries
15/04/16 00:12:11 DEBUG RestRepository: Refreshing index [useractivity-2015.04.14/devtest]

@costin
Copy link
Member

costin commented Apr 16, 2015

@jiangff I've edited your post to make them readable - I hope you agree they look much better and the effort was minimal - the Markdown editor in github makes this a breeze and instead of using------- you can just use 3 ` and you're done. Also use a gist next time for posting code.

Coming back to the issue at hand, it looks like es-hadoop is not creating the index but rather using the existing ones. I'm not sure how you are using the client but after creating the mapping, test the mapping in Elasticsearch (on the 10.0.0.x nodes that the connector uses it, as mentioned in the logs).

You can use curl or some other rest client as mentioned here.
Check out the mapping and see whether it is correct - Elasticsearch tends to ignore incorrect mapping if they are not root types. Once you confirm the mapping is correct, try adding one or two documents as explained here and check whether they are mapped correctly.

Then report back.
Currently it looks like the either the mapping in Elasticsearch is incorrect or you're using a different (maybe short lived cluster) than the one es-hadoop relies on.

@jeffsteinmetz
Copy link

Not sure if it is related, but I recall at one time in the past (unrelated to elasticsearch-hadoop) if I created an index, then sent a mapping definition, then - within milliseconds - tried to index a document (basically a sequence of create, mapping, insert all in series), I had some instances where the document indexing might fail if it relied on a specific mapping, and the mapping didn't propagate to all cluster nodes in time. If I added a short pause after the mapping was set, it all worked out ok.

@jiangff
Copy link
Author

jiangff commented Apr 17, 2015

I found out what the issue is. There was a typo ("not_anayzed") when I created mapping, so the mapping wasn't even successfully created. The other library( elastic4s) didn't throw out an exception so I didn't catch it. Thanks for the reply, @costin !

@jiangff jiangff closed this as completed Apr 17, 2015
@kant111
Copy link

kant111 commented Jan 24, 2018

Is there any easy way to do this now using Spark 2.2.0 and ES 6.1.2? Can the ElasticSearch-hadoop connector create a mapping given a Streaming DataSet<Row> ?

For example I have the following code but it doesn't create index or type or its mapping before writing docs to ES.

DataSet<Row> df = kafkaDF.map(row -> {})

df.printSchema()

df.dtypes() // gives dataframe schema need to use this to create mapping for a ES type.

df
        .writeStream()
        .outputMode("append")
        .trigger(Trigger.ProcessingTime(1000))
        .format("org.elasticsearch.spark.sql")
        .option("es.index.auto.create", true)
        .option("index.mapper.dynamic","false")
        .option("checkpointLocation", "/tmp")
        .start("test/hello");

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants