Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 2.3 app in k8s cluster: parquet->COS throws exception #200

Closed
dkazakevich opened this issue Aug 9, 2018 · 20 comments
Closed

Spark 2.3 app in k8s cluster: parquet->COS throws exception #200

dkazakevich opened this issue Aug 9, 2018 · 20 comments

Comments

@dkazakevich
Copy link

Created a sample Spark 2.3 application that runs in k8s cluster. The application creates a sample Dataset and trying to put it into Bluemix COS using stocator:

       StructType schema = DataTypes.createStructType(
                new StructField[]{
                        createStructField("NAME", StringType, false),
                        createStructField("STRING_VALUE", StringType, false),
                        createStructField("NUM_VALUE", IntegerType, false),
                });
        Row r1 = RowFactory.create("name1", "value1", 1);
        Row r2 = RowFactory.create("name2", "value2", 2);
        List<Row> rowList = ImmutableList.of(r1, r2);
        Dataset<Row> rows = spark.createDataFrame(rowList, schema);

        spark.sparkContext().hadoopConfiguration().set("fs.stocator.scheme.list", "cos");
        spark.sparkContext().hadoopConfiguration().set("fs.cos.impl", "com.ibm.stocator.fs.ObjectStoreFileSystem");
        spark.sparkContext().hadoopConfiguration().set("fs.stocator.cos.impl", "com.ibm.stocator.fs.cos.COSAPIClient");
        spark.sparkContext().hadoopConfiguration().set("fs.stocator.cos.scheme", "cos");
        spark.sparkContext().hadoopConfiguration().set("fs.cos.service.iam.api.key", spark.sparkContext().getConf().get("spark.COS_API_KEY"));
        spark.sparkContext().hadoopConfiguration().set("fs.cos.service.iam.service.id", spark.sparkContext().getConf().get("spark.COS_SERVICE_ID"));
        spark.sparkContext().hadoopConfiguration().set("fs.cos.service.endpoint", spark.sparkContext().getConf().get("spark.COS_ENDPOINT"));

        String path = "cos://" + spark.sparkContext().getConf().get("spark.COS_BUCKET") + ".service/" +
                spark.sparkContext().getConf().get("spark.COS_OUTPUT_FILENAME");
        rows.write().mode(SaveMode.Overwrite).parquet(path);

But I'm getting exception:

...
2018-08-09 20:18:44 INFO  TaskSchedulerImpl:54 - Adding task set 0.0 with 2 tasks
2018-08-09 20:18:44 INFO  TaskSetManager:54 - Starting task 0.0 in stage 0.0 (TID 0, 172.17.0.6, executor 2, partition 0, PROCESS_LOCAL, 8140 bytes)
2018-08-09 20:18:44 INFO  TaskSetManager:54 - Starting task 1.0 in stage 0.0 (TID 1, 172.17.0.5, executor 1, partition 1, PROCESS_LOCAL, 8205 bytes)
2018-08-09 20:18:45 INFO  BlockManagerInfo:54 - Added broadcast_0_piece0 in memory on 172.17.0.5:43525 (size: 48.4 KB, free: 408.9 MB)
2018-08-09 20:18:45 INFO  BlockManagerInfo:54 - Added broadcast_0_piece0 in memory on 172.17.0.6:39377 (size: 48.4 KB, free: 408.9 MB)
2018-08-09 20:18:47 WARN  TaskSetManager:66 - Lost task 0.0 in stage 0.0 (TID 0, 172.17.0.6, executor 2): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: saving output plants.parquet/part-00000-8e5efd57-0e4a-4bed-95f9-2178e9b9dedd-c000-attempt_20180809201845_0000_m_000000_0.snappy.parquet com.ibm.stocator.thirdparty.cos.AmazonClientException: Unable to complete transfer: null
	at com.ibm.stocator.fs.cos.COSOutputStream.close(COSOutputStream.java:173)
	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
	at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
	at org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64)
	at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:685)
	at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:122)
	at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:244)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1369)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
	... 8 more

2018-08-09 20:18:47 INFO  TaskSetManager:54 - Starting task 0.1 in stage 0.0 (TID 2, 172.17.0.6, executor 2, partition 0, PROCESS_LOCAL, 8140 bytes)
2018-08-09 20:18:47 INFO  TaskSetManager:54 - Lost task 0.1 in stage 0.0 (TID 2) on 172.17.0.6, executor 2: org.apache.spark.SparkException (Task failed while writing rows.) [duplicate 1]
2018-08-09 20:18:47 INFO  TaskSetManager:54 - Starting task 0.2 in stage 0.0 (TID 3, 172.17.0.6, executor 2, partition 0, PROCESS_LOCAL, 8140 bytes)
2018-08-09 20:18:47 INFO  TaskSetManager:54 - Lost task 1.0 in stage 0.0 (TID 1) on 172.17.0.5, executor 1: org.apache.spark.SparkException (Task failed while writing rows.) [duplicate 2]
2018-08-09 20:18:47 INFO  TaskSetManager:54 - Starting task 1.1 in stage 0.0 (TID 4, 172.17.0.5, executor 1, partition 1, PROCESS_LOCAL, 8205 bytes)
2018-08-09 20:18:48 INFO  TaskSetManager:54 - Lost task 0.2 in stage 0.0 (TID 3) on 172.17.0.6, executor 2: org.apache.spark.SparkException (Task failed while writing rows.) [duplicate 3]
2018-08-09 20:18:48 INFO  TaskSetManager:54 - Starting task 0.3 in stage 0.0 (TID 5, 172.17.0.6, executor 2, partition 0, PROCESS_LOCAL, 8140 bytes)
2018-08-09 20:18:48 INFO  TaskSetManager:54 - Lost task 1.1 in stage 0.0 (TID 4) on 172.17.0.5, executor 1: org.apache.spark.SparkException (Task failed while writing rows.) [duplicate 4]
2018-08-09 20:18:48 INFO  TaskSetManager:54 - Starting task 1.2 in stage 0.0 (TID 6, 172.17.0.5, executor 1, partition 1, PROCESS_LOCAL, 8205 bytes)
2018-08-09 20:18:48 INFO  TaskSetManager:54 - Lost task 1.2 in stage 0.0 (TID 6) on 172.17.0.5, executor 1: org.apache.spark.SparkException (Task failed while writing rows.) [duplicate 5]
2018-08-09 20:18:48 INFO  TaskSetManager:54 - Starting task 1.3 in stage 0.0 (TID 7, 172.17.0.5, executor 1, partition 1, PROCESS_LOCAL, 8205 bytes)
2018-08-09 20:18:48 INFO  TaskSetManager:54 - Lost task 0.3 in stage 0.0 (TID 5) on 172.17.0.6, executor 2: org.apache.spark.SparkException (Task failed while writing rows.) [duplicate 6]
2018-08-09 20:18:48 ERROR TaskSetManager:70 - Task 0 in stage 0.0 failed 4 times; aborting job
2018-08-09 20:18:48 INFO  TaskSchedulerImpl:54 - Cancelling stage 0
2018-08-09 20:18:48 INFO  TaskSchedulerImpl:54 - Stage 0 was cancelled
2018-08-09 20:18:48 INFO  DAGScheduler:54 - ResultStage 0 (parquet at App.java:75) failed in 3.923 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 5, 172.17.0.6, executor 2): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: saving output plants.parquet/part-00000-8e5efd57-0e4a-4bed-95f9-2178e9b9dedd-c000-attempt_20180809201848_0000_m_000000_3.snappy.parquet com.ibm.stocator.thirdparty.cos.AmazonClientException: Unable to complete transfer: Could not initialize class sun.security.ssl.SSLSessionImpl
	at com.ibm.stocator.fs.cos.COSOutputStream.close(COSOutputStream.java:173)
	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
	at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
	at org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64)
	at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:685)
	at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:122)
	at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:244)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1369)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
	... 8 more

Driver stacktrace:
2018-08-09 20:18:48 INFO  DAGScheduler:54 - Job 0 failed: parquet at App.java:75, took 3.973086 s
2018-08-09 20:18:48 ERROR FileFormatWriter:91 - Aborting job null.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 5, 172.17.0.6, executor 2): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: saving output plants.parquet/part-00000-8e5efd57-0e4a-4bed-95f9-2178e9b9dedd-c000-attempt_20180809201848_0000_m_000000_3.snappy.parquet com.ibm.stocator.thirdparty.cos.AmazonClientException: Unable to complete transfer: Could not initialize class sun.security.ssl.SSLSessionImpl
	at com.ibm.stocator.fs.cos.COSOutputStream.close(COSOutputStream.java:173)
	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
	at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
	at org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64)
	at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:685)
	at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:122)
	at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:244)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1369)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
	... 8 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1607)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1595)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1594)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1594)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1828)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1777)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1766)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:662)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:662)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:662)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:554)
	at com.ibm.mpw.App.main(App.java:75)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:846)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:921)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:932)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: saving output plants.parquet/part-00000-8e5efd57-0e4a-4bed-95f9-2178e9b9dedd-c000-attempt_20180809201848_0000_m_000000_3.snappy.parquet com.ibm.stocator.thirdparty.cos.AmazonClientException: Unable to complete transfer: Could not initialize class sun.security.ssl.SSLSessionImpl
	at com.ibm.stocator.fs.cos.COSOutputStream.close(COSOutputStream.java:173)
	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
	at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
	at org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64)
	at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:685)
	at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:122)
	at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:244)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1369)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
	... 8 more
2018-08-09 20:18:48 WARN  TaskSetManager:66 - Lost task 1.3 in stage 0.0 (TID 7, 172.17.0.5, executor 1): TaskKilled (Stage cancelled)
2018-08-09 20:18:48 INFO  TaskSchedulerImpl:54 - Removed TaskSet 0.0, whose tasks have all completed, from pool 
2018-08-09 20:18:49 WARN  COSAPIClient:532 - Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 97de1087-4d62-4b33-a118-e627aa5943b7)
Error: Job aborted.

Writing the same data to COS json file works fine:

rows.write().mode(SaveMode.Overwrite).json(path);
@gilv
Copy link
Contributor

gilv commented Aug 11, 2018

@dkazakevich thanks. A bit strange, are you sure the same code works for CSV and fails for Parquet? The exception in the log is "Could not initialize class sun.security.ssl.SSLSessionImpl", which is related SSL and has nothing to do to Stocator.
What stocator version (and a branch) you are using?

@gilv
Copy link
Contributor

gilv commented Aug 12, 2018

@dkazakevich a short update; I tested your code on Spark 2.3 and the same code works perfectly for me. So i need more details ( see my previous msg ) in order to proceed with it

@dkazakevich
Copy link
Author

dkazakevich commented Aug 13, 2018

@gilv Thank you for response. It works for JSON, but failed for parquet. I haven’t check fo CSV, will try it tomorrow.
I tried 1.0.21-ibm-sdk, 1.0.22-ibm-sdk and 1.0.23-Snapshot-ibm-sdk branches.
Also this works for spark client/local mode, but failed for claster mode with master and executor(s).
I uses local minikube and bluemix k8s clusters to submit this spark job.

@gilv
Copy link
Contributor

gilv commented Aug 13, 2018

@dkazakevich I want to be sure i understand correctly. Is the following correct?

  • Your code works both for JSON and Parquet when you use local Spark cluster.
  • Your code fails to work with Parquet over bluemix k8s. The exception is "Could not initialize class sun.security.ssl.SSLSessionImpl"
  • Your code works with JSON over bluemix k8s

Do you use exactly the same code and the same path when access JSON and Parquet? SSL issues can't be related file types, so i try to figure out what else is different in your code

@bahdzevich
Copy link

@gilv, yes, you are correct. I have the same behavior:

  • also works fine using local Spark cluster

  • the same exception using either minikube or bluemix:
    Caused by: java.io.IOException: saving output test-json-rnd-values/part-00001-67f2343b-5c52-43c1-8266-9c1d617aeb50-c000-attempt_20180813152227_0000_m_000001_3.snappy.parquet com.ibm.stocator.thirdparty.cos.AmazonClientException: Unable to complete transfer: Could not initialize class sun.security.ssl.SSLSessionImpl at com.ibm.stocator.fs.cos.COSOutputStream.close(COSOutputStream.java:173) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:639) at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117) at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:163) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42) at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.releaseResources(FileFormatWriter.scala:405) at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:396) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1414) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272) ... 8 more

  • yes, but there is one warning:
    2018-08-13 15:20:10 WARN COSAPIClient:532 - Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 72facbe1-ec9a-4316-a0b9-1f13ac8b948c)

@gilv
Copy link
Contributor

gilv commented Aug 13, 2018

@dkazakevich @bahdzevich thanks. what JDK version you are using? I also wonder, what is the value of "fs.cos.service.endpoint" ?

@dkazakevich
Copy link
Author

dkazakevich commented Aug 13, 2018

@gilv We are using JDK 1.8.0_172
fs.cos.service.endpoint=s3-api.dal-us-geo.objectstorage.softlayer.net
We are using the same code and the same COS path for parquet and json. Just updated rows.write().mode(SaveMode.Overwrite).parquet(path) to rows.write().mode(SaveMode.Overwrite).json(path). It also looks strange for us and don't know what the problem source.

@paul-carron
Copy link
Contributor

@dkazakevich have you installed or do you need to install a cert on Kubernetes?

@dkazakevich
Copy link
Author

@paul-carron I haven't install certs on minikube and blumix k8s cluster. Also don't know is need cert to put data into COS.
I only put provided k8s cluster config yaml file with cluster certificate-authority .pem file into local ~/.kube directory.
Also created spark k8s serviceaccount, clusterrolebinding and uses it to run spark job as described here: https://spark.apache.org/docs/2.3.0/running-on-kubernetes.html#rbac

@gilv
Copy link
Contributor

gilv commented Aug 14, 2018

@dkazakevich can you read some existing Parquet file from COS on this cluster?

@dkazakevich
Copy link
Author

@gilv reading Parquet file from COS and writing it back into COS works fine.

@gilv
Copy link
Contributor

gilv commented Aug 15, 2018

@dkazakevich so all is working, except when you create 2 records size Parquet file in Spark and write it into COS? This sounds a bit strange.. and it fails with SSL..
I wonder if you can experiment with some other ways to create Parquet files in Spark. Try to use Scala shell and write something like

`
import spark.implicits._

val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")

squaresDF.write.format(“parquet”).save("cos://yourbucket.myCOS/data.parquet")
`

Will this work?

@dkazakevich
Copy link
Author

@gilv Initially we faced with the problem in process of reading data from DB2 and writing into COS. The 2 records size example is a simplified version that reproduce the same problem like we have with DB2 source.
I think that the scala shell example runs spark job in local/client mode that executes jobs using one master (one node). As I described above we also don't have problems in spark local/client mode. The problem accrues for spark cluster mode that runs job using master and one or some executor(s) (some nodes).

@gilv
Copy link
Contributor

gilv commented Aug 15, 2018

@dkazakevich you can connect spark-shell to the exiting cluster, by using --master spark://master_host:master_port

@dkazakevich
Copy link
Author

@gilv Because we don't have k8s spark cluster with running master and executors (that necessary for spark-shell), and uses spark-submit tool that automatically creates master and indicated number of executors for a spark job (feature of spark 2.3 for k8s), is it ok to create a scala application, generate a .jar file for the spark-submit and run it for the above experiment?

@gilv
Copy link
Contributor

gilv commented Aug 15, 2018

@dkazakevich I think it's the same to create jar with scala and use spark-submit

@dkazakevich
Copy link
Author

@gilv Tried the experiment and got the same exception:

object App {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder.appName("COS spark").getOrCreate()
    import sparkSession.sqlContext.implicits._

    sparkSession.sparkContext.hadoopConfiguration.set("fs.stocator.scheme.list", "cos")
    sparkSession.sparkContext.hadoopConfiguration.set("fs.cos.impl", "com.ibm.stocator.fs.ObjectStoreFileSystem")
    sparkSession.sparkContext.hadoopConfiguration.set("fs.stocator.cos.impl", "com.ibm.stocator.fs.cos.COSAPIClient")
    sparkSession.sparkContext.hadoopConfiguration.set("fs.stocator.cos.scheme", "cos")
    sparkSession.sparkContext.hadoopConfiguration.set("fs.cos.service.iam.api.key", "***")
    sparkSession.sparkContext.hadoopConfiguration.set("fs.cos.service.iam.service.id", "***")
    sparkSession.sparkContext.hadoopConfiguration.set("fs.cos.service.endpoint", "s3-api.us-geo.objectstorage.softlayer.net")

    val squaresDF = sparkSession.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
    squaresDF.write.format("parquet").save("cos://mpw-plants.service/data.parquet")
  }
}

Also FYI: I'm not sure that it could be the problem reason, but I tried to load data from some DB2 databases and write parquet into COS and found out that:

  • if a DB available through SSL connection the writing into COS works fine
  • if a DB available without SSL it generates the exception.

@gilv
Copy link
Contributor

gilv commented Aug 16, 2018

@dkazakevich if you can't write a simple Parquet with this Scala example, then it's not related DB2 i guess.. and the exception you getting is related an SSL connection.. but you can read from COS, write JSON without hitting SSL issues..

@gilv
Copy link
Contributor

gilv commented Aug 16, 2018

@dkazakevich please open also a ticket on IBM Bluemix Support. You can link them this issue that you opened against Stocator. You should see support in the console.bluemix.net and create a ticket

@paul-carron
Copy link
Contributor

@dkazakevich At the don't believe this is be an issue with the Java SDK as I’m unable to recreate it in a stand alone or cluster environment. My hunch at the minute is that its some sort of cert issue between Spark and Kubernetes although you haven’t installed certs on their minicube cluster. Unfortunately I’m not familiar with Kubernetes so don’t know what might be required. It might be worth having somebody with Kubernetes knowledge look at this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants