Skip to content

[DE-616] ttl for cursor cannot be configured from ArangoDB Spark data source and getting cursor not found #46

@kundan59

Description

@kundan59

I am writing a data pipeline to ingest data from Arango db to Bigquery. I have used the arango spark data source.

here is the code :

 df: DataFrame = spark.read.format("com.arangodb.spark") \
        .option("query", query) \
        .options(**arango_connection) \
        .schema(doc_schema).load()
df.count()
    df.write.format('bigquery').mode("append") \
        .option('table', bq_table) \
        .option("project", bq_project) \
        .option("dataset", bq_dataset) \
        .option("writeMethod", "direct") \
        .option('credentialsFile', 'path/of/gcp-credential') \
        .save() 

this code works if my Arango collection has fewer documents, like writing 10000 documents from Arango db to Bigquery.

But my collection will have more than 10, 00, 000 documents. When testing with above 50, 000 document spark job fails.

error getting
Caused by: org.apache.spark.util.TaskCompletionListenerException: Response: 404, Error: 1600 - cursor not found

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (BS1ZN93 executor driver): org.apache.spark.util.TaskCompletionListenerException: Response: 404, Error: 1600 - cursor not found
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:254)
at org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:144)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:137)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:180)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Suppressed: com.arangodb.ArangoDBException: Response: 404, Error: 1600 - cursor not found
at com.arangodb.internal.util.ResponseUtils.checkError(ResponseUtils.java:53)
at com.arangodb.http.HttpCommunication.execute(HttpCommunication.java:86)
at com.arangodb.http.HttpCommunication.execute(HttpCommunication.java:66)
at com.arangodb.http.HttpProtocol.execute(HttpProtocol.java:44)
at com.arangodb.internal.ArangoExecutorSync.execute(ArangoExecutorSync.java:60)
at com.arangodb.internal.ArangoExecutorSync.execute(ArangoExecutorSync.java:48)
at com.arangodb.internal.ArangoDatabaseImpl$1.close(ArangoDatabaseImpl.java:219)
at com.arangodb.internal.cursor.ArangoCursorImpl.close(ArangoCursorImpl.java:69)
at org.apache.spark.sql.arangodb.datasource.reader.ArangoQueryReader.close(ArangoQueryReader.scala:53)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$advanceToNextIter$1(DataSourceRDD.scala:94)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$advanceToNextIter$1$adapted(DataSourceRDD.scala:89)
at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:132)
at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1(TaskContextImpl.scala:144)
at org.apache.spark.TaskContextImpl.$anonfun$invokeTaskCompletionListeners$1$adapted(TaskContextImpl.scala:144)
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:199)
... 10 more

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1018)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:354)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:382)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:354)
at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3459)
at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3458)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
at org.apache.spark.sql.Dataset.count(Dataset.scala:3458)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.util.TaskCompletionListenerException: Response: 404, Error: 1600 - cursor not found
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:254)
at org.apache.spark.TaskContextImpl.invokeTaskCompletionListeners(TaskContextImpl.scala:144)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:137)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:180)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apac

There is no option found to configure ttl which is the default 30 sec. I tried to reduce batch size but still getting cursor not found iss

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions