Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28912][STREAMING] Fixed MatchError in getCheckpointFiles() #25654

Closed
wants to merge 7 commits into from

Conversation

avkgh
Copy link
Contributor

@avkgh avkgh commented Sep 2, 2019

What changes were proposed in this pull request?

This change fixes issue SPARK-28912.

Why are the changes needed?

If checkpoint directory is set to name which matches regex pattern used for checkpoint files then logs are flooded with MatchError exceptions and old checkpoint files are not removed.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Manually.

  1. Start Hadoop in a pseudo-distributed mode.

  2. In another terminal run command nc -lk 9999

  3. In the Spark shell execute the following statements:

    val ssc = new StreamingContext(sc, Seconds(30))
    ssc.checkpoint("hdfs://localhost:9000/checkpoint-01")
    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)                                
    wordCounts.print()                               
    ssc.start()                       
    ssc.awaitTermination()

@dongjoon-hyun
Copy link
Member

Welcome to Apache Spark community. Thank you for your first contribution, @avkgh .

@dongjoon-hyun
Copy link
Member

ok to test

@@ -102,7 +102,7 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
private[streaming]
object Checkpoint extends Logging {
val PREFIX = "checkpoint-"
val REGEX = (PREFIX + """([\d]+)([\w\.]*)""").r
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a unit test case to streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala?
Please use a test case name, SPARK-28912 Fixed MatchError in getCheckpointFiles. That should be fail without your patch.

@SparkQA
Copy link

SparkQA commented Sep 2, 2019

Test build #110022 has finished for PR 25654 at commit b990196.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -102,7 +102,7 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
private[streaming]
object Checkpoint extends Logging {
val PREFIX = "checkpoint-"
val REGEX = (PREFIX + """([\d]+)([\w\.]*)""").r
val REGEX = (PREFIX + """([\d]{9,})([\w\.]*)""").r
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will technically introduce a behaviour change since it targets to support the checkpoint- name with numbers. Let's clarify it.

Copy link
Contributor Author

@avkgh avkgh Sep 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention behind this change was to skip invalid (or perhaps too old) checkpoint files since numeric part of checkpoint file name consists of current time in milliseconds and therefore cannot be shorter than 9 digits.
This caused some unit tests to fail because they are using ManualClock which reports fake time allowing generation of shorter checkpoint file names like checkpoint-2000 (where 2000 is supposedly current time in milliseconds).
Now I consider this change in regex redundant and unnecessary since filtering out directories and matching only the final component of a path (p.getName) should be sufficient to prevent MatchErrors.
I will revert this change to fix unit test fails.

@@ -102,7 +102,7 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
private[streaming]
object Checkpoint extends Logging {
val PREFIX = "checkpoint-"
val REGEX = (PREFIX + """([\d]+)([\w\.]*)""").r
val REGEX = (PREFIX + """([\d]{9,})([\w\.]*)""").r
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to make it always expect 9+ digits, whereas it accepts 1 or more now. Your case is checkpoint-01 so I'm missing how this works?

Copy link
Contributor Author

@avkgh avkgh Sep 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please read my previous reply to HyukjinKwon. I explained there what was intended.
In my case checkpoint-01 is a checkpoint directory name which matches positively using current regex accepting 1 or more digits.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I agree with leaving out the regex change then.
I'm OK with this; all the better if you can convert your repro into a simple unit test.

@SparkQA
Copy link

SparkQA commented Sep 3, 2019

Test build #110043 has finished for PR 25654 at commit 812d867.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor

Nice catch, +1 on unit test since old checkpoint files are not removed can be asserted.

@dongjoon-hyun
Copy link
Member

Gentle ping, @avkgh .

@avkgh
Copy link
Contributor Author

avkgh commented Sep 5, 2019

I will add a unit test when I have the time. I plan to do it within next 2 days.

@SparkQA
Copy link

SparkQA commented Sep 6, 2019

Test build #110239 has finished for PR 25654 at commit 1835fe4.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

checkpointTimes.foreach(tm => new checkpointWriter.CheckpointWriteHandler(
Time(tm), Array.fill[Byte](10)(1), clearCheckpointDataLater = false).run())
} catch {
case ex: MatchError => fail("Should not throw MatchError", ex)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just let this exception fly

@@ -847,6 +847,38 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
checkpointWriter.stop()
}

test("SPARK-28912: Fix MatchError in getCheckpointFiles") {
val tempDir = Utils.createTempDir()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use withTempDir { tempDir =>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah @srowen you were more quick with this, +1

val fakeRddPath = new Path(checkpointDir, java.util.UUID.randomUUID().toString)
fakeRddPath.getFileSystem(hadoopConf).mkdirs(fakeRddPath)

val checkpointTimes = (1 to 20).map(_ * 1000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you like: 1000 to 20000 by 1000

@@ -847,6 +845,38 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
checkpointWriter.stop()
}

test("SPARK-28912: Fix MatchError in getCheckpointFiles") {
val tempDir = Utils.createTempDir()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe withTempDir { tempDir =>?

val tempDir = Utils.createTempDir()
val checkpointDir = tempDir + "/checkpoint-01"

Utils.deleteRecursively(tempDir)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it needed?

new CheckpointWriter(mock(classOf[JobGenerator]), conf, checkpointDir, hadoopConf)

// Create a fake RDD checkpoint dir to emulate SparkContext.setCheckpointDir()
val fakeRddPath = new Path(checkpointDir, java.util.UUID.randomUUID().toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import java.util.UUID?

assert(Checkpoint.getCheckpointFiles(checkpointDir).map(_.getName) == expectedCheckpoints)

Utils.deleteRecursively(tempDir)
checkpointWriter.stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be better in finally block because if anything throws exception then checkpointWriter will stay open.

@SparkQA
Copy link

SparkQA commented Sep 6, 2019

Test build #110250 has finished for PR 25654 at commit 6c50007.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 6, 2019

Test build #110242 has finished for PR 25654 at commit 4c3300d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 6, 2019

Test build #110252 has finished for PR 25654 at commit eebc393.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much for adding the UT, @avkgh . I'd like to recommend to replace the existing UT with the following with the following two reasons.

  • The current UT is a little overkill because this PR only touches getCheckpointFiles and new UT had better focus to verify the changes. We can skip CheckpointWriter to simplify this.
  • This PR contains two contributions to avoid (1) the parent path match and (2) directory name match. The current UT seems unclear to verify both of them. The existing one seems to cover (2) only.
  test("SPARK-28912: Fix MatchError in getCheckpointFiles") {
    withTempDir { tempDir =>
      val fs = FileSystem.get(tempDir.toURI, new Configuration())
      val checkpointDir = tempDir.getAbsolutePath() + "/checkpoint-01"
      assert(Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).length === 0)

      // Ignore files whose parent path match.
      fs.create(new Path(checkpointDir, "this-is-matched-before-due-to-parent-path")).close()
      assert(Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).length === 0)

      // Ignore directories whose names match.
      fs.mkdirs(new Path(checkpointDir, "checkpoint-1000000000"))
      assert(Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).length === 0)
    }
  }

@avkgh
Copy link
Contributor Author

avkgh commented Sep 6, 2019

OK, I will replace the unit test.

@SparkQA
Copy link

SparkQA commented Sep 7, 2019

Test build #110268 has finished for PR 25654 at commit 90f1e8d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you and congratulation for your first contribution, @avkgh !
Thank you, @srowen , @HyukjinKwon , @gaborgsomogyi !

Merged to master/2.4.

dongjoon-hyun pushed a commit that referenced this pull request Sep 7, 2019
### What changes were proposed in this pull request?

This change fixes issue SPARK-28912.

### Why are the changes needed?

If checkpoint directory is set to name which matches regex pattern used for checkpoint files then logs are flooded with MatchError exceptions and old checkpoint files are not removed.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Manually.

1. Start Hadoop in a pseudo-distributed mode.

2. In another terminal run command  nc -lk 9999

3. In the Spark shell execute the following statements:

    ```scala
    val ssc = new StreamingContext(sc, Seconds(30))
    ssc.checkpoint("hdfs://localhost:9000/checkpoint-01")
    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
    ```

Closes #25654 from avkgh/SPARK-28912.

Authored-by: avk <nullp7r@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 723faad)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@dongjoon-hyun
Copy link
Member

You are added to the Apache Spark contributor group, @avkgh .

@avkgh
Copy link
Contributor Author

avkgh commented Sep 7, 2019

Thank you, @dongjoon-hyun, @HyukjinKwon , @srowen , @gaborgsomogyi !

@gatorsmile
Copy link
Member

[error] /home/jenkins/workspace/spark-branch-2.4-compile-maven-hadoop-2.6/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala:850: not found: value withTempDir
[error]     withTempDir { tempDir =>
[error]     ^
[error] one error found
[error] Compile failed at Sep 6, 2019 6:08:02 PM [8.547s]

We need to revert this from 2.4 branch.

@gatorsmile
Copy link
Member

Reverted.

@gatorsmile
Copy link
Member

@avkgh Could you submit a PR against 2.4 branch?

@avkgh
Copy link
Contributor Author

avkgh commented Sep 7, 2019

@gatorsmile I submitted a PR against branch-2.4.

@dongjoon-hyun
Copy link
Member

Oops. My bad. Thank you for recovering branch-2.4, @gatorsmile !

PavithraRamachandran pushed a commit to PavithraRamachandran/spark that referenced this pull request Sep 15, 2019
### What changes were proposed in this pull request?

This change fixes issue SPARK-28912.

### Why are the changes needed?

If checkpoint directory is set to name which matches regex pattern used for checkpoint files then logs are flooded with MatchError exceptions and old checkpoint files are not removed.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Manually.

1. Start Hadoop in a pseudo-distributed mode.

2. In another terminal run command  nc -lk 9999

3. In the Spark shell execute the following statements:

    ```scala
    val ssc = new StreamingContext(sc, Seconds(30))
    ssc.checkpoint("hdfs://localhost:9000/checkpoint-01")
    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
    ```

Closes apache#25654 from avkgh/SPARK-28912.

Authored-by: avk <nullp7r@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants