Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23092][SQL] Migrate MemoryStream to DataSourceV2 APIs #20445

Closed
wants to merge 20 commits into from

Conversation

tdas
Copy link
Contributor

@tdas tdas commented Jan 31, 2018

What changes were proposed in this pull request?

This PR migrates the MemoryStream to DataSourceV2 APIs.

One additional change is in the reported keys in StreamingQueryProgress.durationMs. "getOffset" and "getBatch" replaced with "setOffsetRange" and "getEndOffset" as tracking these make more sense. Unit tests changed accordingly.

How was this patch tested?

Existing unit tests, few updated unit tests.

*/
def fullOutput: Seq[AttributeReference]
Copy link
Contributor Author

@tdas tdas Jan 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan This fixes the bug I spoke to you offline about.
The target of this PR is only master, not 2.3.x. So if you want to have this fix in 2.3.0, please make a separate PR accordingly.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this PR has to be merged to 2.3.0 branch does it require more additional changes?

@SparkQA
Copy link

SparkQA commented Jan 31, 2018

Test build #86857 has finished for PR 20445 at commit 5adf1fe.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 31, 2018

Test build #86855 has finished for PR 20445 at commit e66d809.

  • This patch fails from timeout after a configured wait of `300m`.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 1, 2018

Test build #86950 has finished for PR 20445 at commit 478ad17.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented Feb 1, 2018

This PR is currently blocked by a DataSourceV2ScanExec bug, which is being fixed in this PR #20387

Optional.empty())

(s, Some(s.getEndOffset))
reportTimeTaken("setOffsetRange") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the old metric names don't make much sense anymore, but I worry about changing external-facing behavior as part of an API migration.

@@ -89,7 +96,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)

def addData(data: TraversableOnce[A]): Offset = {
val encoded = data.toVector.map(d => encoder.toRow(d).copy())
val plan = new LocalRelation(schema.toAttributes, encoded, isStreaming = true)
val plan = new LocalRelation(attributes, encoded, isStreaming = false)
val ds = Dataset[A](sqlContext.sparkSession, plan)
logDebug(s"Adding ds: $ds")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need to store the batches as datasets, now that we're just collect()ing them back out in createDataReaderFactories()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

@SparkQA
Copy link

SparkQA commented Feb 1, 2018

Test build #86951 has finished for PR 20445 at commit 6389d80.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 2, 2018

Test build #86960 has finished for PR 20445 at commit 3f50f33.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 3, 2018

Test build #87010 has finished for PR 20445 at commit c713048.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class MemoryStreamDataReaderFactory(records: Array[UnsafeRow])

@SparkQA
Copy link

SparkQA commented Feb 3, 2018

Test build #87018 has finished for PR 20445 at commit 1204755.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented Feb 6, 2018

jenkins retest this please

@SparkQA
Copy link

SparkQA commented Feb 6, 2018

Test build #87122 has finished for PR 20445 at commit 1204755.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented Feb 6, 2018

jenkins retest this please.

@SparkQA
Copy link

SparkQA commented Feb 7, 2018

Test build #87133 has finished for PR 20445 at commit 1204755.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 7, 2018

Test build #87144 has finished for PR 20445 at commit f0ce5df.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented Feb 7, 2018

Jenkins retest this please

ForeachSinkSuite.Process(value = 4),
ForeachSinkSuite.Close(None)
)
val events = ForeachSinkSuite.allEvents()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test assumed that the output would arrive in specific order after repartitioning, which isnt guaranteed. So I rewrote the test to verify the output in an order-independent way.

@tdas
Copy link
Contributor Author

tdas commented Feb 7, 2018

Retest this please

@@ -149,18 +149,12 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
}

private def generateDebugString(
blocks: Iterable[Array[UnsafeRow]],
blocks: Seq[UnsafeRow],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's probably more "rows" than "blocks" now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right! i thought of changing but forgot. my bad.

@jose-torres
Copy link
Contributor

LGTM pending passing run of that HiveDDLSuite test

@SparkQA
Copy link

SparkQA commented Feb 7, 2018

Test build #87174 has finished for PR 20445 at commit f0ce5df.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented Feb 7, 2018

seems like an unrelated flaky test ^

@SparkQA
Copy link

SparkQA commented Feb 7, 2018

Test build #87176 has finished for PR 20445 at commit c3508e9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented Feb 7, 2018

Merging to master.

@asfgit asfgit closed this in 30295bf Feb 7, 2018
robert3005 pushed a commit to palantir/spark that referenced this pull request Feb 12, 2018
## What changes were proposed in this pull request?

This PR migrates the MemoryStream to DataSourceV2 APIs.

One additional change is in the reported keys in StreamingQueryProgress.durationMs. "getOffset" and "getBatch" replaced with "setOffsetRange" and "getEndOffset" as tracking these make more sense. Unit tests changed accordingly.

## How was this patch tested?
Existing unit tests, few updated unit tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Burak Yavuz <brkyvz@gmail.com>

Closes apache#20445 from tdas/SPARK-23092.
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
This PR migrates the MemoryStream to DataSourceV2 APIs.

One additional change is in the reported keys in StreamingQueryProgress.durationMs. "getOffset" and "getBatch" replaced with "setOffsetRange" and "getEndOffset" as tracking these make more sense. Unit tests changed accordingly.

Existing unit tests, few updated unit tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Burak Yavuz <brkyvz@gmail.com>

Closes apache#20445 from tdas/SPARK-23092.

Ref: LIHADOOP-48531

RB=1832973
G=superfriends-reviewers
R=latang,yezhou,zolin,mshen,fli
A=
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants