Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Array types not supported in automatic mapping #847

Closed
burtonator opened this issue Sep 13, 2016 · 3 comments
Closed

Array types not supported in automatic mapping #847

burtonator opened this issue Sep 13, 2016 · 3 comments

Comments

@burtonator
Copy link

Array types in Elasticsearch aren't necessarily special.

for example, because of the analyzer, you could have:

tags: "hello world"
tags: ["hello", "world"]

And they are essentially functionally equivalent.

This is defined here:

https://www.elastic.co/guide/en/elasticsearch/reference/1.4/mapping-array-type.html

The mapping would look like this:

"tags" : {"type" : "string" }

But Spark and elasticsearch-hadoop breaks on the mapping.

It gets the mapping as: "string"

test_posts: org.apache.spark.sql.DataFrame = [tags: string, text: string ... 1 more field]

But it's not a string. it's list[string]

The type conversion here is wrong:

https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-type-conversion

Spark does not detect it as a list[string].

This means that functions like explode() in Hive won't work on this document properly.

@burtonator
Copy link
Author

Additionally, simply running basic operations on this fails:

scala> var query = "SELECT tags FROM test_posts"
query: String = SELECT tags FROM test_posts

scala> val top_hashtags = sqlContext.sql(query)
top_hashtags: org.apache.spark.sql.DataFrame = [tags: string]

(1 + 4) / 19]16/09/13 04:21:31 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 3, 136.243.58.237): scala.MatchError: Buffer(love) (of class scala.collection.convert.Wrappers$JListWrapper)

@burtonator
Copy link
Author

Even basic operations on it fail:

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
  at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1925)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1924)
  at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2562)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:1924)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2139)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
  ... 58 elided
Caused by: scala.MatchError: Buffer(politics, technology) (of class scala.collection.convert.Wrappers$JListWrapper)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:296)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:403)
  at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$2.apply(ExistingRDD.scala:67)
  at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$2.apply(ExistingRDD.scala:64)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
  at org.apache.spark.scheduler.Task.run(Task.scala:85)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)


@costin
Copy link
Member

costin commented Sep 13, 2016

The type conversion here is wrong:
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-type-conversion

To quote the page you referred to:

Important
When dealing with multi-value/array fields, please see this section and in particular these configuration > options.

In other words see, https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html#cfg-field-info in particular es.read.field.as.array.include.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants