Skip to content

Not able to use kryoserializer for writing data into Elastic Search. #1019

Closed
@ShubhamSisodia0590

Description

@ShubhamSisodia0590

What kind an issue is this?

  • Bug report.

Not able to use kryoserializer for writing data into Elastic Search.

When i just set the "spark.serializer" as KryoSerializer.class.getName() in spark conf it is giving me exception

org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: Cannot handle type [class com.spark.apps.pojo.AnalyticsOneDayData] within type [class com.spark.apps.pojo.AnalyticsDailyAggregatedData],

and when i add

"es.ser.writer.value.class" as KryoSerializer.class.getName() i get below exception

org.elasticsearch.hadoop.EsHadoopIllegalStateException: Cannot instantiate class [org.apache.spark.serializer.KryoSerializer]

as the ObjectUtils does clz.newInstance and there is no default constructor for KryoSerializer.

Issue description

Description

I want to insert a nested structure into Elastic Search. The corresponding java classes are

public class DailyAggregatedData implements Serializable {

	private static final long serialVersionUID = -9150958167164783629L;
	String id;
	String cId;
	OneDayData[] aggregatedData;
}
public class OneDayData implements Serializable{

	private static final long serialVersionUID = 8982330246703144883L;
	String date;
	long xcount;
	long ycount;
}

Steps to reproduce

Code:

                SparkConf sparkConf = new SparkConf();
		sparkConf.set(EsSparkConstants.ES_INDEX_AUTO_CREATE, "true");
		sparkConf.set(EsSparkConstants.ES_RESOURCE, "indexName");
		sparkConf.set(EsSparkConstants.ES_NDOES, "localhost");
		sparkConf.set(EsSparkConstants.ES_PORT, "9200");
		sparkConf.set(EsSparkConstants.ES_NODES_WAN_ONLY, "true");
		sparkConf.set("spark.serializer", KryoSerializer.class.getName());

                 SparkConf sparkConf = buildSparkConf();
		JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

                //build JavaRDD this is working fine.
                JavaRDD<DailyAggregatedData> DailyAggregatedDataRDD = 

                JavaEsSpark.saveToEs(javaRDD,
		 "indexName", propertiesMap);

Test/code snippet

Strack trace:

with just "spark.serializer"

org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: Cannot handle type [class com.spark.apps.pojo.AnalyticsOneDayData] within type [class com.spark.apps.pojo.AnalyticsDailyAggregatedData], instance [com.spark.apps.pojo.AnalyticsOneDayData@41c7150a] within instance [com.spark.apps.pojo.AnalyticsDailyAggregatedData@18fb1a1d] using writer [org.spark.serialization.ScalaValueWriter@660cef49]

when i add "es.ser.writer.value.class"

org.elasticsearch.hadoop.EsHadoopIllegalStateException: Cannot instantiate class [org.apache.spark.serializer.KryoSerializer]
	at org.elasticsearch.hadoop.util.ObjectUtils.instantiate(ObjectUtils.java:43)
	at org.elasticsearch.hadoop.util.ObjectUtils.instantiate(ObjectUtils.java:52)
	at org.elasticsearch.hadoop.util.ObjectUtils.instantiate(ObjectUtils.java:48)
	at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory.<init>(AbstractBulkFactory.java:169)
	at org.elasticsearch.hadoop.serialization.bulk.IndexBulkFactory.<init>(IndexBulkFactory.java:27)
	at org.elasticsearch.hadoop.serialization.bulk.BulkCommands.create(BulkCommands.java:40)
	at org.elasticsearch.hadoop.rest.RestRepository.lazyInitWriting(RestRepository.java:127)
	at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:158)
	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
	at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:102)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InstantiationException: org.apache.spark.serializer.KryoSerializer
	at java.lang.Class.newInstance(Class.java:427)
	at org.elasticsearch.hadoop.util.ObjectUtils.instantiate(ObjectUtils.java:41)
	... 16 more
Caused by: java.lang.NoSuchMethodException: org.apache.spark.serializer.KryoSerializer.<init>()
	at java.lang.Class.getConstructor0(Class.java:3082)
	at java.lang.Class.newInstance(Class.java:412)
	... 17 more

Version Info

OS: : Amazon Linux (Running Spark Cluster on AWS)
JVM : JDK8
Hadoop/Spark:
Hadoop distribution:Amazon 2.7.3
Applications:Spark 2.0.2

ES-Hadoop : 5.3
ES : 5.3

Feature description

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions