Skip to content

[SPARK-33836][SS][PYTHON] Expose DataStreamReader.table and DataStreamWriter.toTable#30835

Closed
HeartSaVioR wants to merge 13 commits intoapache:masterfrom
HeartSaVioR:SPARK-33836
Closed

[SPARK-33836][SS][PYTHON] Expose DataStreamReader.table and DataStreamWriter.toTable#30835
HeartSaVioR wants to merge 13 commits intoapache:masterfrom
HeartSaVioR:SPARK-33836

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Dec 18, 2020

What changes were proposed in this pull request?

This PR proposes to expose DataStreamReader.table (SPARK-32885) and DataStreamWriter.toTable (SPARK-32896) to PySpark, which are the only way to read and write with table in Structured Streaming.

Why are the changes needed?

Please refer SPARK-32885 and SPARK-32896 for rationalizations of these public APIs. This PR only exposes them to PySpark.

Does this PR introduce any user-facing change?

Yes, PySpark users will be able to read and write with table in Structured Streaming query.

How was this patch tested?

Manually tested.

v1 table

create table A and ingest to the table A

spark.sql("""
create table table_pyspark_parquet (
    value long,
    `timestamp` timestamp
) USING parquet
""")
df = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
query = df.writeStream.toTable('table_pyspark_parquet', checkpointLocation='/tmp/checkpoint5')
query.lastProgress
query.stop()

read table A and ingest to the table B which doesn't exist

df2 = spark.readStream.table('table_pyspark_parquet')
query2 = df2.writeStream.toTable('table_pyspark_parquet_nonexist', format='parquet', checkpointLocation='/tmp/checkpoint2')
query2.lastProgress
query2.stop()

select tables

spark.sql("DESCRIBE TABLE table_pyspark_parquet").show()
spark.sql("SELECT * FROM table_pyspark_parquet").show()

spark.sql("DESCRIBE TABLE table_pyspark_parquet_nonexist").show()
spark.sql("SELECT * FROM table_pyspark_parquet_nonexist").show()

v2 table (leveraging Apache Iceberg as it provides V2 table and custom catalog as well)

create table A and ingest to the table A

spark.sql("""
create table iceberg_catalog.default.table_pyspark_v2table (
    value long,
    `timestamp` timestamp
) USING iceberg
""")
df = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
query = df.select('value', 'timestamp').writeStream.toTable('iceberg_catalog.default.table_pyspark_v2table', checkpointLocation='/tmp/checkpoint_v2table_1')
query.lastProgress
query.stop()

ingest to the non-exist table B

df2 = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
query2 = df2.select('value', 'timestamp').writeStream.toTable('iceberg_catalog.default.table_pyspark_v2table_nonexist', checkpointLocation='/tmp/checkpoint_v2table_2')
query2.lastProgress
query2.stop()

ingest to the non-exist table C partitioned by value % 10

df3 = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
df3a = df3.selectExpr('value', 'timestamp', 'value % 10 AS partition').repartition('partition')
query3 = df3a.writeStream.partitionBy('partition').toTable('iceberg_catalog.default.table_pyspark_v2table_nonexist_partitioned', checkpointLocation='/tmp/checkpoint_v2table_3')
query3.lastProgress
query3.stop()

select tables

spark.sql("DESCRIBE TABLE iceberg_catalog.default.table_pyspark_v2table").show()
spark.sql("SELECT * FROM iceberg_catalog.default.table_pyspark_v2table").show()

spark.sql("DESCRIBE TABLE iceberg_catalog.default.table_pyspark_v2table_nonexist").show()
spark.sql("SELECT * FROM iceberg_catalog.default.table_pyspark_v2table_nonexist").show()

spark.sql("DESCRIBE TABLE iceberg_catalog.default.table_pyspark_v2table_nonexist_partitioned").show()
spark.sql("SELECT * FROM iceberg_catalog.default.table_pyspark_v2table_nonexist_partitioned").show()

A new table will be created if the table not exists. The returned [[StreamingQuery]]
object can be used to interact with the stream.

.. versionadded:: 3.2.0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally it'd be nice to ship the change in 3.1.0 so that the API is available for both at the same time. I just safely set this to 3.2.0 to see the voices on when to add.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am okay if other people are fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is okay as this only expose the APIs to PySpark.

tableName : str
string, for the name of the table.

.. versionadded:: 3.2.0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@HyukjinKwon HyukjinKwon changed the title [SPARK-33836][SS] Expose DataStreamReader.table and DataStreamWriter.toTable to PySpark [SPARK-33836][SS][PYTHON] Expose DataStreamReader.table and DataStreamWriter.toTable Dec 18, 2020
partitionBy: Optional[Union[str, List[str]]] = ...,
queryName: Optional[str] = ...,
**options: OptionalPrimitiveType
) -> StreamingQuery: ...
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @zero323 FYI

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also +1 for backporting this to branch-3.1.

cc @cloud-fan and @gatorsmile

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@SparkQA
Copy link

SparkQA commented Dec 18, 2020

Test build #133012 has finished for PR 30835 at commit cd359a0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Dec 20, 2020

Test build #133089 has finished for PR 30835 at commit f85fa2b.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

I see two explicit supports on landing this to 3.1.0 and no negative input. Let me set the version to 3.1.0 as of now. I'll revert it once we decide to not include this in 3.1.0 in later review phase.

@SparkQA
Copy link

SparkQA commented Dec 20, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37688/

@SparkQA
Copy link

SparkQA commented Dec 20, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37689/

@SparkQA
Copy link

SparkQA commented Dec 20, 2020

Test build #133090 has finished for PR 30835 at commit 8ee8511.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 20, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37688/

@SparkQA
Copy link

SparkQA commented Dec 20, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37689/

@SparkQA
Copy link

SparkQA commented Dec 20, 2020

Test build #133092 has finished for PR 30835 at commit dc13c8e.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 20, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37692/

@SparkQA
Copy link

SparkQA commented Dec 20, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37692/

@SparkQA
Copy link

SparkQA commented Dec 20, 2020

Test build #133093 has finished for PR 30835 at commit 56ced09.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm. I have some minor nits and suggestions on my mind but that shouldn't block this PR. I'll follow up by myself if you don't mind @HeartSaVioR.

@HyukjinKwon
Copy link
Member

I'll merge this on Monday in KST if there are no outstanding comments.

@HeartSaVioR
Copy link
Contributor Author

Yeah sure, thanks for the detailed review, @HyukjinKwon !

@HyukjinKwon
Copy link
Member

Merged to master and branch-3.1.

HyukjinKwon pushed a commit that referenced this pull request Dec 21, 2020
…mWriter.toTable

### What changes were proposed in this pull request?

This PR proposes to expose `DataStreamReader.table` (SPARK-32885) and `DataStreamWriter.toTable` (SPARK-32896) to PySpark, which are the only way to read and write with table in Structured Streaming.

### Why are the changes needed?

Please refer SPARK-32885 and SPARK-32896 for rationalizations of these public APIs. This PR only exposes them to PySpark.

### Does this PR introduce _any_ user-facing change?

Yes, PySpark users will be able to read and write with table in Structured Streaming query.

### How was this patch tested?

Manually tested.

> v1 table

>> create table A and ingest to the table A

```
spark.sql("""
create table table_pyspark_parquet (
    value long,
    `timestamp` timestamp
) USING parquet
""")
df = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
query = df.writeStream.toTable('table_pyspark_parquet', checkpointLocation='/tmp/checkpoint5')
query.lastProgress
query.stop()
```

>> read table A and ingest to the table B which doesn't exist

```
df2 = spark.readStream.table('table_pyspark_parquet')
query2 = df2.writeStream.toTable('table_pyspark_parquet_nonexist', format='parquet', checkpointLocation='/tmp/checkpoint2')
query2.lastProgress
query2.stop()
```

>> select tables

```
spark.sql("DESCRIBE TABLE table_pyspark_parquet").show()
spark.sql("SELECT * FROM table_pyspark_parquet").show()

spark.sql("DESCRIBE TABLE table_pyspark_parquet_nonexist").show()
spark.sql("SELECT * FROM table_pyspark_parquet_nonexist").show()
```

> v2 table (leveraging Apache Iceberg as it provides V2 table and custom catalog as well)

>> create table A and ingest to the table A

```
spark.sql("""
create table iceberg_catalog.default.table_pyspark_v2table (
    value long,
    `timestamp` timestamp
) USING iceberg
""")
df = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
query = df.select('value', 'timestamp').writeStream.toTable('iceberg_catalog.default.table_pyspark_v2table', checkpointLocation='/tmp/checkpoint_v2table_1')
query.lastProgress
query.stop()
```

>> ingest to the non-exist table B

```
df2 = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
query2 = df2.select('value', 'timestamp').writeStream.toTable('iceberg_catalog.default.table_pyspark_v2table_nonexist', checkpointLocation='/tmp/checkpoint_v2table_2')
query2.lastProgress
query2.stop()
```

>> ingest to the non-exist table C partitioned by `value % 10`

```
df3 = spark.readStream.format('rate').option('rowsPerSecond', 100).load()
df3a = df3.selectExpr('value', 'timestamp', 'value % 10 AS partition').repartition('partition')
query3 = df3a.writeStream.partitionBy('partition').toTable('iceberg_catalog.default.table_pyspark_v2table_nonexist_partitioned', checkpointLocation='/tmp/checkpoint_v2table_3')
query3.lastProgress
query3.stop()
```

>> select tables

```
spark.sql("DESCRIBE TABLE iceberg_catalog.default.table_pyspark_v2table").show()
spark.sql("SELECT * FROM iceberg_catalog.default.table_pyspark_v2table").show()

spark.sql("DESCRIBE TABLE iceberg_catalog.default.table_pyspark_v2table_nonexist").show()
spark.sql("SELECT * FROM iceberg_catalog.default.table_pyspark_v2table_nonexist").show()

spark.sql("DESCRIBE TABLE iceberg_catalog.default.table_pyspark_v2table_nonexist_partitioned").show()
spark.sql("SELECT * FROM iceberg_catalog.default.table_pyspark_v2table_nonexist_partitioned").show()
```

Closes #30835 from HeartSaVioR/SPARK-33836.

Lead-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Co-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit 8d4d433)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing and merging!

@HeartSaVioR HeartSaVioR deleted the SPARK-33836 branch December 21, 2020 11:19
Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Late LGTM.
I'll submit PR for SPARK-33659 today. Sorry for the late. (Some other thing blocked me in the last 2 weeks...)

HyukjinKwon added a commit that referenced this pull request Aug 11, 2022
… DataStreamReader.table

### What changes were proposed in this pull request?

This PR is a followup of #30835 that adds `DataStreamWriter.toTable` and `DataStreamReader.table` into PySpark documentation.

### Why are the changes needed?

To document both features.

### Does this PR introduce _any_ user-facing change?

Yes, both API will be shown in PySpark reference documentation.

### How was this patch tested?

Manually built the documentation and checked.

Closes #37477 from HyukjinKwon/SPARK-40043.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon added a commit that referenced this pull request Aug 11, 2022
… DataStreamReader.table

### What changes were proposed in this pull request?

This PR is a followup of #30835 that adds `DataStreamWriter.toTable` and `DataStreamReader.table` into PySpark documentation.

### Why are the changes needed?

To document both features.

### Does this PR introduce _any_ user-facing change?

Yes, both API will be shown in PySpark reference documentation.

### How was this patch tested?

Manually built the documentation and checked.

Closes #37477 from HyukjinKwon/SPARK-40043.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 4470033)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon added a commit that referenced this pull request Aug 11, 2022
… DataStreamReader.table

### What changes were proposed in this pull request?

This PR is a followup of #30835 that adds `DataStreamWriter.toTable` and `DataStreamReader.table` into PySpark documentation.

### Why are the changes needed?

To document both features.

### Does this PR introduce _any_ user-facing change?

Yes, both API will be shown in PySpark reference documentation.

### How was this patch tested?

Manually built the documentation and checked.

Closes #37477 from HyukjinKwon/SPARK-40043.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 4470033)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon added a commit that referenced this pull request Aug 11, 2022
… DataStreamReader.table

### What changes were proposed in this pull request?

This PR is a followup of #30835 that adds `DataStreamWriter.toTable` and `DataStreamReader.table` into PySpark documentation.

### Why are the changes needed?

To document both features.

### Does this PR introduce _any_ user-facing change?

Yes, both API will be shown in PySpark reference documentation.

### How was this patch tested?

Manually built the documentation and checked.

Closes #37477 from HyukjinKwon/SPARK-40043.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 4470033)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
… DataStreamReader.table

### What changes were proposed in this pull request?

This PR is a followup of apache#30835 that adds `DataStreamWriter.toTable` and `DataStreamReader.table` into PySpark documentation.

### Why are the changes needed?

To document both features.

### Does this PR introduce _any_ user-facing change?

Yes, both API will be shown in PySpark reference documentation.

### How was this patch tested?

Manually built the documentation and checked.

Closes apache#37477 from HyukjinKwon/SPARK-40043.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 4470033)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants

Comments