You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Am getting this exception each time I run my job ?
Any idea why ?
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to s
tage failure: Task 2 in stage 0.0 failed 4 times, most recent failure: Lost task
2.3 in stage 0.0 (TID 13, 10.0.0.209): java.lang.ClassCastException: scala.coll
ection.Iterator$$anon$11 cannot be cast to scala.Tuple2
at org.apache.spark.sql.DataFrame$$anonfun$33.apply(DataFrame.scala:1189
)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.a
pply(RDD.scala:686)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.a
pply(RDD.scala:686)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:
35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:
35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:
35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:
35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:
35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
Sample code
public static void main(String[] args) throws UnsupportedEncodingException, FileNotFoundException, IOException {
int cpus = 30;
SparkConf sparkConf = new SparkConf(true).setAppName("SparkQueryApp")
.setMaster("spark://192.168.1.201:7077")
.set("es.nodes", "192.168.1.209")
.set("es.nodes.discovery", "false")
.set("es.cluster", "es-dev")
.set("es.scroll.size", "5000")
.setJars(JavaSparkContext.jarOfClass(Demo.class))
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.default.parallelism", String.valueOf(cpus * 2))
.set("spark.logConf", "true");
SparkContext sparkCtx = new SparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sparkCtx);
DataFrame df = JavaEsSparkSQL.esDF(sqlContext, "rt-logs/test");
df.registerTempTable("Log");
DataFrame sqlDFTest = sqlContext.sql("SELECT created, count(instances) FROM Log where id = 100000 group by instances");
List<String> results = sqlDFTest.javaRDD().map(new Function<Row, String>(){
static final long serialVersionUID = 1L;
@Override
public String call(Row row) throws Exception {
return "Created" + row.getString(0) + " Count " + row.getInt(1);
}
}).collect();
try (Writer writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("result3.txt"), "utf-8"))) {
for (String row : results) {
writer.write(row);
}
writer.write(sqlDFTest.first().toString());
}
}
The text was updated successfully, but these errors were encountered:
Am getting this exception each time I run my job ?
Any idea why ?
Sample code
The text was updated successfully, but these errors were encountered: