-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23097][SQL][SS] Migrate text socket source to V2 #20382
Conversation
@@ -56,7 +58,7 @@ trait ConsoleWriter extends Logging { | |||
println("-------------------------------------------") | |||
// scalastyle:off println | |||
spark | |||
.createDataFrame(spark.sparkContext.parallelize(rows), schema) | |||
.createDataFrame(rows.toList.asJava, schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change here to avoid triggering new distributed job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this fix should go into 2.3 branch. thanks for catching this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I will create a separate PR for this small fix.
Test build #86581 has finished for PR 20382 at commit
|
@jose-torres can you please help to review, thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we shouldn't remove the old source, contrary to what I did with the console sink. We should add a conf to disable the V2 implementation on a per-source basis, which we can use to (a) fall back if some user finds the new implementation problematic and (b) run tests with the conf to make sure that the V1 execution path still works.
I'll write a PR to handle that.
} | ||
|
||
class TextSocketSourceProviderV2 extends DataSourceV2 | ||
with MicroBatchReadSupport with DataSourceRegister with Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intent is for the V2 and V1 source to live in the same register, so existing queries can start using the V2 source with no change needed. This also allows the V2 implementation to be validated by passing all the old tests.
RateSourceV2 is a bad example; it only exists because I didn't have time to write a fully compatible rate source. I'll work on fixing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jose-torres , you mean that instead of creating a new V2 socket source, modifying current V1 socket source to make it work with V2, am I understanding correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is that the existing TextSocketSourceProvider will have the MicroBatchReadSupport implementation here, in addition to the StreamSourceProvider implementation it already has.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks for the clarify. Let me change it.
private val host = options.get(HOST).get() | ||
private val port = options.get(PORT).get().toInt | ||
private val includeTimestamp = options.getBoolean(INCLUDE_TIMESTAMP, false) | ||
private val numPartitions = options.getInt(NUM_PARTITIONS, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To match the old parallelize behavior, the default number of partitions should be sparkContext.defaultParallelism.
private var lastOffsetCommitted: Long = -1L | ||
|
||
override def setOffsetRange(start: Optional[Offset], end: Optional[Offset]): Unit = { | ||
if (!initialized) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to initialize in the constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what I want to bring out. Originally I initialized this in constructor like old socket source. But I found that MicroBatchReader
will be created in two different places with two objects. So initializing in constructor will create two sock threads and connectors. This is different from V1 source. In V1 source, we only created source once, but with V2 MicroBatchReader
we will create two objects in two different places (one for schema), which means such side-affect actions in constructor will have two copies. Ideally we should only create this MicroBatchReader
once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this will solve that problem, since each reader will just have its own initialize bit.
In general, I think it's fine if we do a bit of extra work. V1 sources do have to support being created multiple times (in e.g. restart scenarios), and the lifecycles of the two V2 readers being created here don't overlap. (We should be closing the tempReader created in DataStreamReader, though.)
|
||
override def commit(end: Offset): Unit = synchronized { | ||
val newOffset = end.asInstanceOf[TextSocketStreamOffset] | ||
val offsetDiff = (newOffset.offset - lastOffsetCommitted).toInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: conversion to int is unnecessary
Test build #86640 has finished for PR 20382 at commit
|
It's unfortunate that the socket tests don't actually run streams end to end, but I think that's orthogonal to this PR. Can you run one of the programming guide examples using socket source (e.g. org.apache.spark.examples.sql.streaming.StructuredSessionization) to make sure it works after this PR? If it does, LGTM |
Jenkins, retest this please. |
Hi @jose-torres , thanks for your reviewing. I tried both the example you mentioned and simple spark-shell command, I think it works, but the path will always go to V2 |
Right, that makes sense. LGTM |
Test build #86671 has finished for PR 20382 at commit
|
Test build #86677 has finished for PR 20382 at commit
|
Try(params.getOrElse("includeTimestamp", "false").toBoolean) match { | ||
case Success(bool) => bool | ||
class TextSocketSourceProvider extends DataSourceV2 | ||
with MicroBatchReadSupport with StreamSourceProvider with DataSourceRegister with Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we still need StreamSourceProvider?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I don't misunderstand @jose-torres 's intention, basically he wanted this socket source to work also in V1 code path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aah, i see earlier comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TD and I discussed this offline. It should be fine to remove the V1 StreamSourceProvider implementation, because:
- this isn't a production-quality source, so users shouldn't need to fall back to it
- this source won't be particularly useful at exercising the V1 execution pipeline once we transition all sources to V2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I will update the patch accordingly.
|
||
import org.apache.spark.internal.Logging | ||
|
||
trait TextSocketReader extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add docs!! This is a base interface used by two source implementations.
Also rename this such that its clear that this a base class and not an actual Reader (i.e. not a subclass of DataSourceV2 readers). Maybe TextSocketReaderBase
override def toString: String = s"TextSocketSource[host: $host, port: $port]" | ||
} | ||
|
||
case class TextSocketOffset(offset: Long) extends V2Offset { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would wait for my PR #20445 to go in where I migrate LongOffset to use OffsetV2
I am holding off further comments on this PR until the major change of eliminating v1 Source is done. That would cause significant refactoring (including the fact that the common trait wont be needed). BTW, I strongly suggest moving the socket code to execution.streaming.sources, like other v2 sources. |
Sure, will waiting for others to be merged, thanks @tdas . |
#20445 will be merged in a few hours. please go ahead and update your PR with the refactoring that was suggested (mainly, no v1 version). |
Sure, I will do it. |
9ceb3be
to
fdc9b9c
Compare
Test build #87199 has finished for PR 20382 at commit
|
Test build #87202 has finished for PR 20382 at commit
|
jenkins test this please |
Test build #87203 has finished for PR 20382 at commit
|
Hi @tdas , would you please help to review again, thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looks good, just a few comments.
org.apache.spark.sql.execution.streaming.RateSourceProvider | ||
org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a redirection in the DataSource.backwardCompatibilityMap
for this?
* A source that reads text lines through a TCP socket, designed only for tutorials and debugging. | ||
* This source will *not* work in production applications due to multiple reasons, including no | ||
* support for fault recovery and keeping all of the text read in memory forever. | ||
* A MicroBatchReader that reads text lines through a TCP socket, designed only for tutorials and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: tutorials -> testing (i know it was like that, but lets fix it since we are changing it anyway)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tutorials is correct here; see e.g. StructuredSessionization.scala
* A MicroBatchReader that reads text lines through a TCP socket, designed only for tutorials and | ||
* debugging. This MicroBatchReader will *not* work in production applications due to multiple | ||
* reasons, including no support for fault recovery and keeping all of the text read in memory | ||
* forever. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this does not keep it forever. so remove this reason, just keep "no support for fault recover".
} | ||
|
||
override def readSchema(): StructType = { | ||
val includeTimestamp = options.getBoolean("includeTimestamp", false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
supernit: is there need for a variable here?
override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP | ||
else TextSocketSource.SCHEMA_REGULAR | ||
override def setOffsetRange( | ||
start: Optional[Offset], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: wont this fit on a single line?
@@ -164,54 +213,43 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo | |||
} | |||
} | |||
|
|||
override def toString: String = s"TextSocketSource[host: $host, port: $port]" | |||
override def toString: String = s"TextSocketMicroBatchReader[host: $host, port: $port]" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shows up in the StreamingQueryProgressEvent as description, so it may be better to have it as "TextSocket[..."
schema: Optional[StructType], | ||
checkpointLocation: String, | ||
options: DataSourceOptions): MicroBatchReader = { | ||
checkParameters(options.asMap().asScala.toMap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not check it as DataSourceOptions (which is known to be case-insensitive) rather than a map which raises questions about case sensitivity?
@@ -177,11 +177,14 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo | |||
Optional.ofNullable(userSpecifiedSchema.orNull), | |||
Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath, | |||
options) | |||
val schema = tempReader.readSchema() | |||
// Stop tempReader to avoid side-affect thing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: side-affect -> side-effect.
good catch.
@@ -177,11 +177,14 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo | |||
Optional.ofNullable(userSpecifiedSchema.orNull), | |||
Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath, | |||
options) | |||
val schema = tempReader.readSchema() | |||
// Stop tempReader to avoid side-affect thing | |||
tempReader.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i feel like this needs a try finally approach as well.
@@ -0,0 +1,246 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this show up as a new file? was this not a "git mv"? something went wrong, i would prefer that i can see a simple diff. Not much should change in the tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry @tdas , I did it by simply "mv", not "git mv". This doesn't change a lot, just to be suited for data source v2 API.
068c050
to
647c5cd
Compare
Test build #87371 has finished for PR 20382 at commit
|
Test build #87372 has finished for PR 20382 at commit
|
Test build #87370 has finished for PR 20382 at commit
|
Change-Id: I22a5cef90b269b29e6dbb442aba77aa3c1f3e2c4
Jenkins, retest this please. |
Test build #87664 has finished for PR 20382 at commit
|
Jenkins, retest this please. |
Test build #87667 has finished for PR 20382 at commit
|
StopStream | ||
) | ||
|
||
assert(!batch2Stamp.before(batch1Stamp)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a slim chance that batch2stamp will be same as batch1stamp.
maybe worth adding a sleep(10) to ensure this.
you should also check batch1stamp with timestamp taken directly before the query. otherwise it may pass tests if the query generated batch1stamp = -1 and batch2stamp = -2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @tdas , what's the meaning of "you should also check batch1stamp with timestamp taken directly before the query. ", I'm not clearly sure what specifically are you pointing to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val timestamp = System.currentTimeMillis
testStream(...)(
// get batch1stamp
)
// assert batch1stamp >= timestamp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Will update it.
intercept[IOException] { | ||
batchReader = provider.createMicroBatchReader( | ||
Optional.empty(), "", new DataSourceOptions(parameters.asJava)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert on the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my local test, the assert message is Can't assign requested address
, but on Jenkins, it is Connection refused
. The difference might be due to different OS/native method.
I think it would be better to not check the message due to different outputs. Even if we change to follow Jenkins way, it still fails in my local Mac.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thats fine.
|
||
@GuardedBy("this") | ||
protected var currentOffset: LongOffset = new LongOffset(-1) | ||
private[sources] var currentOffset: LongOffset = LongOffset(-1L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this does not make sense. you are directly accessing something that should be accessed while synchronized on this.
@jerryshao please address the above comment, then we are good to merge! |
Sure, I will do it today. |
Test build #87819 has finished for PR 20382 at commit
|
relevant test failed. please make sure that there is no flakiness in the tests. |
Test build #87825 has finished for PR 20382 at commit
|
Test build #87831 has finished for PR 20382 at commit
|
LGTM. Merging to master. |
This PR moves structured streaming text socket source to V2. Questions: do we need to remove old "socket" source? Unit test and manual verification. Author: jerryshao <sshao@hortonworks.com> Closes apache#20382 from jerryshao/SPARK-23097. Ref: LIHADOOP-48531
What changes were proposed in this pull request?
This PR moves structured streaming text socket source to V2.
Questions: do we need to remove old "socket" source?
How was this patch tested?
Unit test and manual verification.