Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PROBLEM: Failed to write data back to ES by using Spark DataFrame save() API !! #836

Closed
1 of 2 tasks
rjurney opened this issue Aug 29, 2016 · 3 comments
Closed
1 of 2 tasks

Comments

@rjurney
Copy link

rjurney commented Aug 29, 2016

What kind an issue is this?

  • Bug report. If you’ve found a bug, please provide a code snippet or test to reproduce it below.
    The easier it is to track down the bug, the faster it is solved.
  • Feature Request. Start by telling us what problem you’re trying to solve.
    Often a solution already exists! Don’t send pull requests to implement new features without
    first getting our support. Sometimes we leave features out on purpose to keep the project small.

Issue description

Unable to save a Spark DataFrame to Elasticsearch after upgrading to Spark 2.0. This worked in the Past with Spark 1.X.

Steps to reproduce

spark-defaults.conf looks like:

spark.speculation false
spark.jars /Users/rjurney/Software/Agile_Data_Code_2/lib/elasticsearch-spark_2.10-2.3.4.jar

Code:

# Load the parquet file
on_time_dataframe = sqlContext.read.parquet('data/on_time_performance.parquet')

on_time_dataframe.write.format("org.elasticsearch.spark.sql")\
  .option("es.resource","agile_data_science/on_time_performance")\
  .mode("overwrite")\
  .save()

Strack trace:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-4-4d1acf8098de> in <module>()
----> 1 on_time_dataframe.write.format("org.elasticsearch.spark.sql")  .option("es.resource","agile_data_science/on_time_performance")  .save()

/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/sql/readwriter.pyc in save(self, path, format, mode, partitionBy, **options)
    528             self.format(format)
    529         if path is None:
--> 530             self._jwrite.save()
    531         else:
    532             self._jwrite.save(path)

/Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    931         answer = self.gateway_client.send_command(command)
    932         return_value = get_return_value(
--> 933             answer, self.gateway_client, self.target_id, self.name)
    934 
    935         for temp_arg in temp_args:

/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    310                 raise Py4JJavaError(
    311                     "An error occurred while calling {0}{1}{2}.\n".
--> 312                     format(target_id, ".", name), value)
    313             else:
    314                 raise Py4JError(

Py4JJavaError: An error occurred while calling o39.save.
: java.lang.AbstractMethodError: org.elasticsearch.spark.sql.DefaultSource.createRelation(Lorg/apache/spark/sql/SQLContext;Lorg/apache/spark/sql/SaveMode;Lscala/collection/immutable/Map;Lorg/apache/spark/sql/Dataset;)Lorg/apache/spark/sql/sources/BaseRelation;
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:429)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:211)
    at java.lang.Thread.run(Thread.java:745)

Version Info

OS: : OS X 10.11.4: 15.4.0 Darwin Kernel Version 15.4.0: Fri Feb 26 22:08:05 PST 2016; root:xnu-3248.40.184~3/RELEASE_X86_64 x86_64
JVM : java version "1.8.0_66"
Scala: Scala code runner version 2.10.3 -- Copyright 2002-2013, LAMP/EPFL
Hadoop/Spark: Hadoop 2.6.4, Spark 2.0.0
ES-Hadoop : 2.3.4
ES : 2.3.5

@rjurney
Copy link
Author

rjurney commented Aug 29, 2016

Note: the data I am loading is available at s3://agile_data_science/on_time_performance.parquet/

@rjurney
Copy link
Author

rjurney commented Aug 29, 2016

The other exception I get is:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-2-4ff7cd1bec8e> in <module>()
      2 on_time_dataframe = sqlContext.read.parquet('data/on_time_performance.parquet')
      3 
----> 4 on_time_dataframe.write.format("org.elasticsearch.spark.sql")  .option("es.resource","agile_data_science/on_time_performance")  .mode("overwrite")  .save()

/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/sql/readwriter.pyc in save(self, path, format, mode, partitionBy, **options)
    528             self.format(format)
    529         if path is None:
--> 530             self._jwrite.save()
    531         else:
    532             self._jwrite.save(path)

/Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    931         answer = self.gateway_client.send_command(command)
    932         return_value = get_return_value(
--> 933             answer, self.gateway_client, self.target_id, self.name)
    934 
    935         for temp_arg in temp_args:

/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    310                 raise Py4JJavaError(
    311                     "An error occurred while calling {0}{1}{2}.\n".
--> 312                     format(target_id, ".", name), value)
    313             else:
    314                 raise Py4JError(

Py4JJavaError: An error occurred while calling o40.save.
: java.lang.AbstractMethodError: org.elasticsearch.spark.sql.DefaultSource.createRelation(Lorg/apache/spark/sql/SQLContext;Lorg/apache/spark/sql/SaveMode;Lscala/collection/immutable/Map;Lorg/apache/spark/sql/Dataset;)Lorg/apache/spark/sql/sources/BaseRelation;
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:429)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:211)
    at java.lang.Thread.run(Thread.java:745)

@jbaiera
Copy link
Member

jbaiera commented Aug 29, 2016

Hadoop/Spark: Hadoop 2.6.4, Spark 2.0.0
ES-Hadoop : 2.3.4

Spark 2.0 support was added in ES-Hadoop 5.0.0-alpha5. ES-Hadoop 2.3.4 is not compatible with Spark 2.0.0. Do please note that v5.0.0-alpha5 is an alpha release and should not be used in production.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants