Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
[SPARK-30091][SQL][PYTHON] Document mergeSchema option directly in th…
…e PySpark Parquet APIs

### What changes were proposed in this pull request?

This change properly documents the `mergeSchema` option directly in the Python APIs for reading Parquet data.

### Why are the changes needed?

The docstring for `DataFrameReader.parquet()` mentions `mergeSchema` but doesn't show it in the API. It seems like a simple oversight.

Before this PR, you'd have to do this to use `mergeSchema`:

```python
spark.read.option('mergeSchema', True).parquet('test-parquet').show()
```

After this PR, you can use the option as (I believe) it was intended to be used:

```python
spark.read.parquet('test-parquet', mergeSchema=True).show()
```

### Does this PR introduce any user-facing change?

Yes, this PR changes the signatures of `DataFrameReader.parquet()` and `DataStreamReader.parquet()` to match their docstrings.

### How was this patch tested?

Testing the `mergeSchema` option directly seems to be left to the Scala side of the codebase. I tested my change manually to confirm the API works.

I also confirmed that setting `spark.sql.parquet.mergeSchema` at the session does not get overridden by leaving `mergeSchema` at its default when calling `parquet()`:

```
>>> spark.conf.set('spark.sql.parquet.mergeSchema', True)
>>> spark.range(3).write.parquet('test-parquet/id')
>>> spark.range(3).withColumnRenamed('id', 'name').write.parquet('test-parquet/name')
>>> spark.read.option('recursiveFileLookup', True).parquet('test-parquet').show()
+----+----+
|  id|name|
+----+----+
|null|   1|
|null|   2|
|null|   0|
|   1|null|
|   2|null|
|   0|null|
+----+----+
>>> spark.read.option('recursiveFileLookup', True).parquet('test-parquet', mergeSchema=False).show()
+----+
|  id|
+----+
|null|
|null|
|null|
|   1|
|   2|
|   0|
+----+
```

Closes #26730 from nchammas/parquet-merge-schema.

Authored-by: Nicholas Chammas <nicholas.chammas@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
nchammas authored and HyukjinKwon committed Dec 4, 2019
1 parent 708cf16 commit e766a32
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 17 deletions.
14 changes: 7 additions & 7 deletions python/pyspark/sql/readwriter.py
Expand Up @@ -305,22 +305,22 @@ def table(self, tableName):


@since(1.4) @since(1.4)
def parquet(self, *paths, **options): def parquet(self, *paths, **options):
"""Loads Parquet files, returning the result as a :class:`DataFrame`. """
Loads Parquet files, returning the result as a :class:`DataFrame`.
:param mergeSchema: sets whether we should merge schemas collected from all
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``.
The default value is specified in ``spark.sql.parquet.mergeSchema``.
:param recursiveFileLookup: recursively scan a directory for files. Using this option :param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_. disables `partition discovery`_.
You can set the following Parquet-specific option(s) for reading Parquet files:
* ``mergeSchema``: sets whether we should merge schemas collected from all \
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
The default value is specified in ``spark.sql.parquet.mergeSchema``.
>>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned') >>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
>>> df.dtypes >>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')] [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
""" """
mergeSchema = options.get('mergeSchema', None)
recursiveFileLookup = options.get('recursiveFileLookup', None) recursiveFileLookup = options.get('recursiveFileLookup', None)
self._set_opts(recursiveFileLookup=recursiveFileLookup) self._set_opts(mergeSchema=mergeSchema, recursiveFileLookup=recursiveFileLookup)
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))


@ignore_unicode_prefix @ignore_unicode_prefix
Expand Down
19 changes: 9 additions & 10 deletions python/pyspark/sql/streaming.py
Expand Up @@ -535,26 +535,25 @@ def orc(self, path, recursiveFileLookup=None):
raise TypeError("path can be only a single string") raise TypeError("path can be only a single string")


@since(2.0) @since(2.0)
def parquet(self, path, recursiveFileLookup=None): def parquet(self, path, mergeSchema=None, recursiveFileLookup=None):
"""Loads a Parquet file stream, returning the result as a :class:`DataFrame`. """
Loads a Parquet file stream, returning the result as a :class:`DataFrame`.
.. note:: Evolving.
:param mergeSchema: sets whether we should merge schemas collected from all
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``.
The default value is specified in ``spark.sql.parquet.mergeSchema``.
:param recursiveFileLookup: recursively scan a directory for files. Using this option :param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_. disables `partition discovery`_.
You can set the following Parquet-specific option(s) for reading Parquet files:
* ``mergeSchema``: sets whether we should merge schemas collected from all \
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
The default value is specified in ``spark.sql.parquet.mergeSchema``.
.. note:: Evolving.
>>> parquet_sdf = spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp()) >>> parquet_sdf = spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp())
>>> parquet_sdf.isStreaming >>> parquet_sdf.isStreaming
True True
>>> parquet_sdf.schema == sdf_schema >>> parquet_sdf.schema == sdf_schema
True True
""" """
self._set_opts(recursiveFileLookup=recursiveFileLookup) self._set_opts(mergeSchema=mergeSchema, recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring): if isinstance(path, basestring):
return self._df(self._jreader.parquet(path)) return self._df(self._jreader.parquet(path))
else: else:
Expand Down

0 comments on commit e766a32

Please sign in to comment.