Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assertion failed: received sequence number doesn't match request sequence number #313

Closed
tilumi opened this issue May 4, 2018 · 25 comments

Comments

@tilumi
Copy link
Contributor

tilumi commented May 4, 2018

Hi, I use 2.3.2-SNAPSHOT version, running a spark applications shows following error,
I am using namespace: "wascl-nam" ,name: "wasclmonitors" Event Hub.

18/05/04 01:45:26 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 75, wn0-lam-ag.ifckrjhhuanelgm1hrygboricd.xx.internal.cloudapp.net, executor 12): java.lang.AssertionError: assertion failed: requestSeqNo 443495453 does not match the received sequence number 443872987
	at scala.Predef$.assert(Predef.scala:170)
	at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(CachedEventHubsReceiver.scala:89)
	at org.apache.spark.eventhubs.client.CachedEventHubsReceiver$.receive(CachedEventHubsReceiver.scala:118)
	at org.apache.spark.eventhubs.rdd.EventHubsRDD$EventHubsRDDIterator.next(EventHubsRDD.scala:124)
	at org.apache.spark.eventhubs.rdd.EventHubsRDD$EventHubsRDDIterator.next(EventHubsRDD.scala:103)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:371)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1055)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

@Silverlight42
Copy link

I asked a similar question yesterday and this was Sabee's answer:

Hi @Silverlight42 That's an assertion that happens in the Spark executors. Basically, in the Spark driver we schedule a the range of seqNos that will be consumed for a given partition. So, let's say for partition 0 we are going to consume from seqNo X to seqNo Y.

When we actually consume those events in the Spark executors, we want to be very strict - all of the seqNos from X to Y must be present because the seqNos are consecutive. With that in mind, when we consume each event, we assert that it's exactly one greater than the previously consumed seqNo.

What's happening, is your starting your stream at events that are about to expire. So, basically, the Spark driver schedules a range from X to Y, but by the time the Spark executor tries to consume that range, your retention policy has kicked in and those events have dropped out of your event hub.

Hope it helps.

@tilumi
Copy link
Contributor Author

tilumi commented May 4, 2018

But that's not in my case, it's failed on the first batch. I know the first batch is consuming events from end of stream and data sent to EventHub continuously, so I don't think the end of stream is out of retention period.
screenshot 2018-05-05 08 58 20
screenshot 2018-05-05 08 53 00

@tilumi
Copy link
Contributor Author

tilumi commented May 5, 2018

Hi, I found I use StartOfStream to consuming event hubs in my application, so this error should be thrown sometimes, this is not an issue.

@tilumi tilumi closed this as completed May 5, 2018
@tilumi tilumi reopened this May 5, 2018
@tilumi
Copy link
Contributor Author

tilumi commented May 5, 2018

I reopen this issue since I encountered the same error when consuming from end of stream.
From the log, I ensure I am consuming from the end of stream.
screenshot 2018-05-05 12 45 58

The error occurs in some partitions during consuming.
I set batch duration to 5 mins and data is coming into EventHub continuously, so consuming from end of stream won't get data that out of retention period.

18/05/05 03:17:40 INFO TaskSetManager: Finished task 31.0 in stage 232.0 (TID 1870) in 49 ms on wn2-lam-ag.ifckrjhhuanelgm1hrygboricd.xx.internal.cloudapp.net (executor 3) (7/32)
18/05/05 03:17:40 WARN TaskSetManager: Lost task 17.0 in stage 232.0 (TID 1865, wn2-lam-ag.ifckrjhhuanelgm1hrygboricd.xx.internal.cloudapp.net, executor 5): java.lang.AssertionError: assertion failed: requestSeqNo 16351045 does not match the received sequence number 16348116
	at scala.Predef$.assert(Predef.scala:170)
	at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(CachedEventHubsReceiver.scala:89)
	at org.apache.spark.eventhubs.client.CachedEventHubsReceiver$.receive(CachedEventHubsReceiver.scala:118)
	at org.apache.spark.eventhubs.rdd.EventHubsRDD$EventHubsRDDIterator.next(EventHubsRDD.scala:124)
	at org.apache.spark.eventhubs.rdd.EventHubsRDD$EventHubsRDDIterator.next(EventHubsRDD.scala:103)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:371)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1055)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

screenshot 2018-05-05 11 37 10
screenshot 2018-05-05 11 38 00

@ytaous
Copy link

ytaous commented May 5, 2018

I am getting the same error. Same code works fine in 2.3.1 release.
FYI

@sabeegrewal
Copy link
Contributor

Hey @tilumi - do you have any repro steps by chance? I'm not able to reproduce this issue (unless it's the case mentioned earlier which is expected).

@sabeegrewal sabeegrewal self-assigned this May 17, 2018
@sabeegrewal
Copy link
Contributor

Hey @ytaous and @tilumi, I merged a few changes to master. Can you try again and let me know how it goes? I'm getting ready to release soon, so I'll be starting stress tests as well.

@ytaous
Copy link

ytaous commented Jun 13, 2018

Just did the test, still getting the same exception.

@sabeegrewal
Copy link
Contributor

@ytaous can you share a simple repro of the issue with me?

@sabeegrewal
Copy link
Contributor

Hey, if any of you guys can share a repro, please do! The best place to share it is the gitter.

@Fokko
Copy link
Contributor

Fokko commented Jul 17, 2018

I'm having the same issue. It is indeed the case that the consumer has stopped for a longer period than the retention of the Event Hub. It has been added recently with the cached receivers: https://github.com/Azure/azure-event-hubs-spark/pull/303/files/31021d29a67630951ffa9edefb1386ad44b2f0b1

Fokko pushed a commit to Fokko/azure-event-hubs-spark that referenced this issue Jul 25, 2018
In the getPartitions method of the EventHubsRDD class we check if the
offsets are still valid. It is possible that the retention has kicked
in and the messages are no longer available on the bus.

For more info, refer to this issue:
Azure#313

Did some minor refactoring:

- Made the clientFactory static so we don't need to pass this constructor
  around
- Changed the signature of allBoundedSeqNos from a Seq to a Map, since
  the partitionId is unique and later on in the code it is also converted
  to a map.
- Removed the trim method, since passing EventHub config keys to Spark
  does not do any harm. Without this change, the tests are failing since
  they are not being switched to the simulator.
sabeegrewal pushed a commit that referenced this issue Jul 29, 2018
* Dont read messages that are already pruned by EventHub

In the getPartitions method of the EventHubsRDD class we check if the
offsets are still valid. It is possible that the retention has kicked
in and the messages are no longer available on the bus.

For more info, refer to this issue:
#313

Did some minor refactoring:

- Made the clientFactory static so we don't need to pass this constructor
  around
- Changed the signature of allBoundedSeqNos from a Seq to a Map, since
  the partitionId is unique and later on in the code it is also converted
  to a map.
- Removed the trim method, since passing EventHub config keys to Spark
  does not do any harm. Without this change, the tests are failing since
  they are not being switched to the simulator.

* Remove the offset calculation

* Bump version to 2.3.3-SNAPSHOT

* Restore trimmed config when creating a EventHubsRDD
@ogidogi
Copy link

ogidogi commented Aug 22, 2018

Check issue against this version:
Commit: f1bb4ea [f1bb4ea]
Parents: 334255e
Author: Sabee Grewal grewalsabee@gmail.com
Date: Monday, August 20, 2018 12:10:42
Committer: GitHub
Cached receivers are async (#377)

It is still happening!

Log is here:
Driver stacktrace:
2018-08-22 10:36:58 INFO DAGScheduler:54 - Job 11 failed: start at LccStreamingApp.scala:59, took 417.064178 s
2018-08-22 10:36:58 ERROR FileFormatWriter:91 - Aborting job null.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 22 in stage 22.0 failed 4 times, most recent failure: Lost task 22.3 in stage 22.0 (TID 398, 10.244.12.129, executor 11): java.lang.IllegalStateException: Request seqNo 1674884215 is less than the received seqNo 1674918635.
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.checkCursor(CachedEventHubsReceiver.scala:108)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(CachedEventHubsReceiver.scala:116)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver$.receive(CachedEventHubsReceiver.scala:163)
at org.apache.spark.eventhubs.rdd.EventHubsRDD.compute(EventHubsRDD.scala:122)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:131)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.IllegalStateException: Request seqNo 1674884215 is less than the received seqNo 1674918635.
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.checkCursor(CachedEventHubsReceiver.scala:108)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(CachedEventHubsReceiver.scala:116)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver$.receive(CachedEventHubsReceiver.scala:163)
at org.apache.spark.eventhubs.rdd.EventHubsRDD.compute(EventHubsRDD.scala:122)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2018-08-22 10:36:58 ERROR MicroBatchExecution:91 - Query [id = 4ae06395-16ab-4bb4-8c93-0f3a1fe3dd55, runId = 7fb441d5-c418-47ae-bce2-0d32da830c45] terminated with error
org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:131)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

@sabeegrewal
Copy link
Contributor

Hey @ogidogi - it's true that the message printing out is similar but the code is all quite different. But, yea, this is the only bug I've found in the current release - I'm fixing it now!

@ogidogi
Copy link

ogidogi commented Aug 23, 2018

What is a release plan for v2.3.3, with this changes?
Final release of this version.

@sabeegrewal
Copy link
Contributor

@ogidogi the plan is to stress test release candidates and fix bugs as they arise. Once that QA process is over, there will be a new release!

@belgacea
Copy link

This issue should be reopened. I'm currently facing it using the 2.3.2 release and also the current 2.3.3 snapshot (while reading eventhub messages from the end of the stream with a 10-minute watermark threshold on my aggregation process).

@sonamkanungo
Copy link

I am currently facing this issue using the 2.3.2 release. I am consuming from EventPosition.fromStartOfStream with a watermark of 600 seconds. This is happening only in 1 of my ingestion pipeline. My other pipeline which is reading from a different eventhubs always works fine. Any recommendation on why this is happening and when would this be fixed?

@sabeegrewal
Copy link
Contributor

Hey! This issue probably easy to find, but another issue has been opened and closed about this for the 2.3.2 release. It does still happen in the current master branch, but I'll have a PR in soon about it.

@ogidogi
Copy link

ogidogi commented Aug 30, 2018

Commit: 334255e [334255e]
Parents: c9112b0
Author: Sabee Grewal grewalsabee@gmail.com
Date: Tuesday, August 7, 2018 23:06:05
Committer: GitHub
Retry receive when it returns null in translate (#369)

Version from this commit works for me on 10 streaming apps for more than a week under AKS.

@sabeegrewal
Copy link
Contributor

Thanks @ogidogi - I have some slight changes (nothing major) that'll help enforce no data loss. And, there's a bug in the Java client that needs to be fixed...besides that my streams are running great too!

@shuitai
Copy link

shuitai commented Sep 7, 2018

Do you have any solution to avoid the problem?

@belgacea
Copy link

belgacea commented Sep 7, 2018

@shuitai Still waiting for this issue to be reopened. As well as the #361

@sabeegrewal
Copy link
Contributor

@shuitai and @belgacea - these issues aren't going to be reopened. There're a couple things that can cause this assertion failure to happen. I've addressed them in #384 and the changes will be available in 2.3.4 and 2.2.4. I'm going to release those versions right now. If you have issues with those versions or beyond, please open a new issue!

@Alexkuva
Copy link

I tried the solution is working for me. Using Eventhub or IotHub as entrance.
I used the following connectors:

  • azure-eventhubs-spark_2.11-2.3.4
  • azure-cosmosdb-spark_2.3.0_2.11-1.2.0 (not directly with Maven, by import uber jar

Thank you for the release.

@PKHeb
Copy link

PKHeb commented May 24, 2019

I see a similar issue with the same error, but only when i do spark submit without the default settings.
Example:--executor-cores 2 --num-executors 15, with the default settings it just works fine.
the issue worsens as i increase the number of executorcores and executors.
Exception:
ava.lang.IllegalStateException: In partition 28 of approved_nhv_np, with consumer group sparkconsumerlocal, request seqNo 418108744 is less than the received seqNo 418108745. The earliest seqNo is 417139352 and the last seqNo is 418520434
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.checkCursor(CachedEventHubsReceiver.scala:152)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(CachedEventHubsReceiver.scala:172)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver$.receive(CachedEventHubsReceiver.scala:237)
at org.apache.spark.eventhubs.rdd.EventHubsRDD.compute(EventHubsRDD.scala:120)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

SBT Entry:
libraryDependencies += "com.microsoft.azure" %% "azure-eventhubs-spark" % "2.3.12"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests