Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IllegalStateException on one specific record #621

Open
FurcyPin opened this issue Dec 1, 2021 · 0 comments
Open

IllegalStateException on one specific record #621

FurcyPin opened this issue Dec 1, 2021 · 0 comments
Assignees

Comments

@FurcyPin
Copy link

FurcyPin commented Dec 1, 2021

Actual behavior

When trying to read from an Event Hub, our executors failed with the following error message:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost, executor driver): java.lang.IllegalStateException: In partition 1 of [EVENTHUB_NAME], with consumer group [CONSUMER_GROUP_NAME], request seqNo 1067 is less than the received seqNo 1064. The earliest seqNo is 1067, the last seqNo is 1075, and received seqNo 1064
	at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.checkCursor(CachedEventHubsReceiver.scala:227)
	at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(CachedEventHubsReceiver.scala:261)
	at org.apache.spark.eventhubs.client.CachedEventHubsReceiver$.receive(CachedEventHubsReceiver.scala:389)
	at org.apache.spark.eventhubs.rdd.EventHubsRDD.compute(EventHubsRDD.scala:120)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$10.apply(BlockManager.scala:1203)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$10.apply(BlockManager.scala:1201)
	at org.apache.spark.storage.DiskStore.put(DiskStore.scala:69)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1201)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

This message happens while reading one particular event in the event hub. If we configure the startingPosition and endingPosition to read the messages that are before or after this message, everything works fine.

Below is a minimal example in pySpark that demonstrate the behaviour we observed:

import json
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
spark: SparkSession = SparkSession.builder.master("local[2]").appName("event_hub").getOrCreate()
def read_event_hub(starting_time: str, end_time: str):
    hub_name = "[EVENTHUB_NAMESPACE]"
    topic = "[EVENTHUB_NAME]"
    key_name = "[SECRET_KEY_NAME]"
    key = "[SECRET_KEY]"
    consumer_group = "[CONSUMER_GROUP_NAME]"
    connection_string = f"Endpoint=amqps://{hub_name}.servicebus.windows.net/{hub_name};EntityPath={topic};"\
                        f"SharedAccessKeyName={key_name};SharedAccessKey={key}"
    starting_event_position = {
        "offset": None,  # not in use
        "seqNo": -1,  # not in use
        "enqueuedTime": starting_time,
        "isInclusive": True
    }
    ending_event_position = {
        "offset": None,  # not in use
        "seqNo": -1,  # not in use
        "enqueuedTime": end_time,
        "isInclusive": False
    }
    conf = {
        "eventhubs.connectionString": connection_string,
        "eventhubs.startingPosition": json.dumps(starting_event_position),
        "eventhubs.endingPosition": json.dumps(ending_event_position),
        "eventhubs.consumerGroup": consumer_group
    }
    df = spark.read.format("eventhubs").options(**conf).load()
    return df

The event that we can't read was sent to the EventHub at "2021-11-24T19:45:07".
As the code below shows:

  • when we query data before 19:45, we can read the data normally.
  • when we query data after 19:46, we can read the data normally.
  • when we query data between 19:45 and 19:46, we can't read the data.
read_event_hub("2021-11-24T00:00:00.000000Z", "2021-11-24T19:45:00.000000Z").show()
# +--------------------+---------+------------+--------------+--------------------+---------+------------+----------+----------------+
# |                body|partition|      offset|sequenceNumber|        enqueuedTime|publisher|partitionKey|properties|systemProperties|
# +--------------------+---------+------------+--------------+--------------------+---------+------------+----------+----------------+
# |[7B 22 70 61 72 7...|        2|317827579904|           897|2021-11-24 19:37:...|     null|        null|        []|              []|
# |[7B 22 70 61 72 7...|        3|317827579904|          1133|2021-11-24 19:36:...|     null|        null|        []|              []|
# +--------------------+---------+------------+--------------+--------------------+---------+------------+----------+----------------+
read_event_hub("2021-11-24T19:46:00.000000Z", "2021-11-25T00:00:00.000000Z").show()
# +----+---------+------+--------------+------------+---------+------------+----------+----------------+
# |body|partition|offset|sequenceNumber|enqueuedTime|publisher|partitionKey|properties|systemProperties|
# +----+---------+------+--------------+------------+---------+------------+----------+----------------+
# +----+---------+------+--------------+------------+---------+------------+----------+----------------+
read_event_hub("2021-11-24T19:45:00.000000Z", "2021-11-24T19:46:00.000000Z").show()
# Caused by: java.lang.IllegalStateException: In partition 1 of [EVENTHUB_NAME], with consumer group [CONSUMER_GROUP_NAME], request seqNo 1067 is less than the received seqNo 1064. The earliest seqNo is 1067 and the last seqNo is 1070

We contacted Azure's support, who did not see any error on the server side.

We also managed to read the problematic event without any error by using other libraries :

Expected behavior

The expected behavior is to get no error.
We use this connector with similar code on many EventHubs every day, and it never happened before.

Spark version

We reproduced it with spark 2.4.5 and 3.0.3

spark-eventhubs artifactId and version

We reproduced it with two different version of dependencies:

    <dependency>
      <groupId>com.microsoft.azure</groupId>
      <artifactId>azure-eventhubs-spark_2.12</artifactId>
      <version>2.3.21</version>
    </dependency>

    <dependency>
      <groupId>com.microsoft.azure</groupId>
      <artifactId>azure-eventhubs</artifactId>
      <version>3.3.0</version>
    </dependency>

and

    <dependency>
      <groupId>com.microsoft.azure</groupId>
      <artifactId>azure-eventhubs-spark_2.11</artifactId>
      <version>2.3.9</version>
    </dependency>

    <dependency>
      <groupId>com.microsoft.azure</groupId>
      <artifactId>azure-eventhubs</artifactId>
      <version>2.2.0</version>
    </dependency>

Extra details

This never happened before and seems to only happen in very specific circumstances, so it might be quite difficult to reproduce.

Hopefully, this will be enough for the dev team to find some fix, extra security, or at least print more details next time this happens.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants