-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark-18187] [SQL] CompactibleFileStreamLog should not use "compactInterval" direcly with user setting. #15852
Conversation
…rval" to detect a compaction batch
change compactInterval from 4 to 5
(Update the title please; see others for format) |
Test build #68530 has finished for PR 15852 at commit
|
Test build #68533 has finished for PR 15852 at commit
|
Test build #68538 has finished for PR 15852 at commit
|
Test build #68539 has finished for PR 15852 at commit
|
Test build #68541 has finished for PR 15852 at commit
|
Test build #68639 has finished for PR 15852 at commit
|
// 2. If there are two or more '.compact' files, we use the interval of patch id suffix with | ||
// '.compact' as compactInterval. It is unclear whether this case will ever happen in the | ||
// current code, since only the latest '.compact' file is retained i.e., other are garbage | ||
// collected. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log garbage operation is controlled by 'spark.sql.streaming.fileSource.log.deletion'. When it is 'false', there may be two or more '.compact' files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. Please update the comment accordingly.
Test build #68644 has finished for PR 15852 at commit
|
// Find the first divisor >= default compact interval | ||
def properDivisors(n: Int, min: Int) = | ||
(min to n/2).filter(i => n % i == 0) :+ n | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'to' => 'until' ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use the following codes to avoid generating the real number sequence:
def properDivisors(n: Int, min: Int) = (min to n/2).view.filter(n % _ == 0) :+ n
interval = properDivisors(latestCompactBatchId + 1, defaultCompactInterval).head
LGTM overall. If this is accepted, then i will close #15827 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The approach looks good to me. I just left some style suggestions.
protected def compactInterval: Int | ||
protected def defaultCompactInterval: Int | ||
|
||
protected final lazy val compactInterval: Int = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please change protected
to private
since this should not be used by subclassed now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FileStreamSourceLog uses compactInterval in multiple places. Please advise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FileStreamSourceLog uses compactInterval in multiple places. Please advise?
Sorry. My bad. Didn't notice that.
@@ -38,8 +38,9 @@ class FileStreamSourceLog( | |||
import CompactibleFileStreamLog._ | |||
|
|||
// Configurations about metadata compaction | |||
protected override val compactInterval = | |||
protected override def defaultCompactInterval: Int = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: def => val
// 2. If there are two or more '.compact' files, we use the interval of patch id suffix with | ||
// '.compact' as compactInterval. It is unclear whether this case will ever happen in the | ||
// current code, since only the latest '.compact' file is retained i.e., other are garbage | ||
// collected. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. Please update the comment accordingly.
val latestCompactBatchId = compactibleBatchIds(0) | ||
val previousCompactBatchId = compactibleBatchIds(1) | ||
interval = (latestCompactBatchId - previousCompactBatchId).toInt | ||
logInfo(s"Compact interval case 2 = $interval") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Please use a better message, like
Set the compact interval to XXX [the previous two batch Ids: XXX, XXX]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was debugging info that I was going to remove.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to provide this one. It only outputs once and is pretty helpful when some bug happens here.
def verify(execution: StreamExecution) | ||
(batchId: Long, expectedBatches: Int): Boolean = { | ||
def verify(execution: StreamExecution, batchId: Long, | ||
expectedBatches: Int, expectedCompactInterval: Int): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the correct style should be
def verify(
execution: StreamExecution,
batchId: Long,
expectedBatches: Int,
expectedCompactInterval: Int): Boolean = {
@@ -161,7 +161,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { | |||
/** Starts the stream, resuming if data has already been processed. It must not be running. */ | |||
case class StartStream( | |||
trigger: Trigger = ProcessingTime(0), | |||
triggerClock: Clock = new SystemClock) | |||
triggerClock: Clock = new SystemClock, | |||
pairs: mutable.Map[String, String] = mutable.Map.empty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use Map[String, String]
instead since it won't be changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's better to rename pairs
to additionalConfs
for readability.
verify(currentStream == null, "stream already running") | ||
verify(triggerClock.isInstanceOf[SystemClock] | ||
|| triggerClock.isInstanceOf[StreamManualClock], | ||
"Use either SystemClock or StreamManualClock to start the stream") | ||
if (triggerClock.isInstanceOf[StreamManualClock]) { | ||
manualClockExpectedTime = triggerClock.asInstanceOf[StreamManualClock].getTimeMillis() | ||
} | ||
|
||
pairs.foreach(pair => spark.conf.set(pair._1, pair._2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to also change the confs back at the end of this method to avoid affecting other tests sharing the same SparkSession.
logInfo(s"Compact interval case 2 = $interval") | ||
} else if (compactibleBatchIds.length == 1) { | ||
// Case 3 | ||
val latestCompactBatchId = compactibleBatchIds(0).toInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you pull this branch into a method in object CompactibleFileStreamLog
so that it's easy to write tests for this complicated logic? And please add tests as well.
// default compact interval > than any divisor other than latest compact id | ||
interval = latestCompactBatchId + 1 | ||
} | ||
logInfo(s"Compact interval case 3 = $interval") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's better to include all infos in the log, like
Set the compact interval to XXX [latestCompactBatchId: XXX, defaultCompactInterval: XXX]
// Find the first divisor >= default compact interval | ||
def properDivisors(n: Int, min: Int) = | ||
(min to n/2).filter(i => n % i == 0) :+ n | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use the following codes to avoid generating the real number sequence:
def properDivisors(n: Int, min: Int) = (min to n/2).view.filter(n % _ == 0) :+ n
interval = properDivisors(latestCompactBatchId + 1, defaultCompactInterval).head
Test build #68795 has finished for PR 15852 at commit
|
@@ -932,26 +940,28 @@ class FileStreamSourceSuite extends FileStreamSourceTest { | |||
) { | |||
val fileStream = createFileStream("text", src.getCanonicalPath) | |||
val filtered = fileStream.filter($"value" contains "keep") | |||
val updateConf = new mutable.HashMap[String, String]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Use Map(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "5")
} else if (defaultInterval < (latestCompactBatchId + 1) / 2) { | ||
// Find the first divisor >= default compact interval | ||
def properDivisors(min: Int, n: Int) = | ||
(min to n/2).filter(i => n % i == 0) :+ n |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Use (min to n/2).view.filter(i => n % i == 0) :+ n
so that it will stop when finding the first element.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. Just some nits.
}) | ||
} finally { | ||
// Rollback previous configuration values | ||
resetConfValues.foreach { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reset logic should be at the end of the method. Otherwise, it will change confs during a query is running.
import org.apache.spark.SparkFunSuite | ||
import org.apache.spark.sql.test.SharedSQLContext | ||
|
||
class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: SharedSQLContext is not needed. Please remove it to avoid creating SQLContext.
|
||
properDivisors(defaultInterval, latestCompactBatchId + 1).head | ||
} | ||
else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move else {
to the above line.
Test build #68807 has finished for PR 15852 at commit
|
Test build #68808 has finished for PR 15852 at commit
|
Test build #68809 has finished for PR 15852 at commit
|
Test build #68816 has finished for PR 15852 at commit
|
Test build #68827 has finished for PR 15852 at commit
|
LGTM. Thanks! Merging to master and 2.1. |
…terval" direcly with user setting. ## What changes were proposed in this pull request? CompactibleFileStreamLog relys on "compactInterval" to detect a compaction batch. If the "compactInterval" is reset by user, CompactibleFileStreamLog will return wrong answer, resulting data loss. This PR procides a way to check the validity of 'compactInterval', and calculate an appropriate value. ## How was this patch tested? When restart a stream, we change the 'spark.sql.streaming.fileSource.log.compactInterval' different with the former one. The primary solution to this issue was given by uncleGen Added extensions include an additional metadata field in OffsetSeq and CompactibleFileStreamLog APIs. zsxwing Author: Tyson Condie <tcondie@gmail.com> Author: genmao.ygm <genmao.ygm@genmaoygmdeMacBook-Air.local> Closes #15852 from tcondie/spark-18187. (cherry picked from commit 51baca2) Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
…terval" direcly with user setting. ## What changes were proposed in this pull request? CompactibleFileStreamLog relys on "compactInterval" to detect a compaction batch. If the "compactInterval" is reset by user, CompactibleFileStreamLog will return wrong answer, resulting data loss. This PR procides a way to check the validity of 'compactInterval', and calculate an appropriate value. ## How was this patch tested? When restart a stream, we change the 'spark.sql.streaming.fileSource.log.compactInterval' different with the former one. The primary solution to this issue was given by uncleGen Added extensions include an additional metadata field in OffsetSeq and CompactibleFileStreamLog APIs. zsxwing Author: Tyson Condie <tcondie@gmail.com> Author: genmao.ygm <genmao.ygm@genmaoygmdeMacBook-Air.local> Closes apache#15852 from tcondie/spark-18187.
What changes were proposed in this pull request?
CompactibleFileStreamLog relys on "compactInterval" to detect a compaction batch. If the "compactInterval" is reset by user, CompactibleFileStreamLog will return wrong answer, resulting data loss. This PR procides a way to check the validity of 'compactInterval', and calculate an appropriate value.
How was this patch tested?
When restart a stream, we change the 'spark.sql.streaming.fileSource.log.compactInterval' different with the former one.
The primary solution to this issue was given by @uncleGen
Added extensions include an additional metadata field in OffsetSeq and CompactibleFileStreamLog APIs. @zsxwing