From e766a323bc3462763b03f9d892a0b3fdf2cb29db Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Wed, 4 Dec 2019 11:31:57 +0900 Subject: [PATCH] [SPARK-30091][SQL][PYTHON] Document mergeSchema option directly in the PySpark Parquet APIs ### What changes were proposed in this pull request? This change properly documents the `mergeSchema` option directly in the Python APIs for reading Parquet data. ### Why are the changes needed? The docstring for `DataFrameReader.parquet()` mentions `mergeSchema` but doesn't show it in the API. It seems like a simple oversight. Before this PR, you'd have to do this to use `mergeSchema`: ```python spark.read.option('mergeSchema', True).parquet('test-parquet').show() ``` After this PR, you can use the option as (I believe) it was intended to be used: ```python spark.read.parquet('test-parquet', mergeSchema=True).show() ``` ### Does this PR introduce any user-facing change? Yes, this PR changes the signatures of `DataFrameReader.parquet()` and `DataStreamReader.parquet()` to match their docstrings. ### How was this patch tested? Testing the `mergeSchema` option directly seems to be left to the Scala side of the codebase. I tested my change manually to confirm the API works. I also confirmed that setting `spark.sql.parquet.mergeSchema` at the session does not get overridden by leaving `mergeSchema` at its default when calling `parquet()`: ``` >>> spark.conf.set('spark.sql.parquet.mergeSchema', True) >>> spark.range(3).write.parquet('test-parquet/id') >>> spark.range(3).withColumnRenamed('id', 'name').write.parquet('test-parquet/name') >>> spark.read.option('recursiveFileLookup', True).parquet('test-parquet').show() +----+----+ | id|name| +----+----+ |null| 1| |null| 2| |null| 0| | 1|null| | 2|null| | 0|null| +----+----+ >>> spark.read.option('recursiveFileLookup', True).parquet('test-parquet', mergeSchema=False).show() +----+ | id| +----+ |null| |null| |null| | 1| | 2| | 0| +----+ ``` Closes #26730 from nchammas/parquet-merge-schema. Authored-by: Nicholas Chammas Signed-off-by: HyukjinKwon --- python/pyspark/sql/readwriter.py | 14 +++++++------- python/pyspark/sql/streaming.py | 19 +++++++++---------- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 3f8a3a7595458..153776d8b4171 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -305,22 +305,22 @@ def table(self, tableName): @since(1.4) def parquet(self, *paths, **options): - """Loads Parquet files, returning the result as a :class:`DataFrame`. + """ + Loads Parquet files, returning the result as a :class:`DataFrame`. + :param mergeSchema: sets whether we should merge schemas collected from all + Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. + The default value is specified in ``spark.sql.parquet.mergeSchema``. :param recursiveFileLookup: recursively scan a directory for files. Using this option disables `partition discovery`_. - You can set the following Parquet-specific option(s) for reading Parquet files: - * ``mergeSchema``: sets whether we should merge schemas collected from all \ - Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \ - The default value is specified in ``spark.sql.parquet.mergeSchema``. - >>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned') >>> df.dtypes [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')] """ + mergeSchema = options.get('mergeSchema', None) recursiveFileLookup = options.get('recursiveFileLookup', None) - self._set_opts(recursiveFileLookup=recursiveFileLookup) + self._set_opts(mergeSchema=mergeSchema, recursiveFileLookup=recursiveFileLookup) return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) @ignore_unicode_prefix diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 93b4c78953860..6359c31ba5655 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -535,26 +535,25 @@ def orc(self, path, recursiveFileLookup=None): raise TypeError("path can be only a single string") @since(2.0) - def parquet(self, path, recursiveFileLookup=None): - """Loads a Parquet file stream, returning the result as a :class:`DataFrame`. + def parquet(self, path, mergeSchema=None, recursiveFileLookup=None): + """ + Loads a Parquet file stream, returning the result as a :class:`DataFrame`. + + .. note:: Evolving. + :param mergeSchema: sets whether we should merge schemas collected from all + Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. + The default value is specified in ``spark.sql.parquet.mergeSchema``. :param recursiveFileLookup: recursively scan a directory for files. Using this option disables `partition discovery`_. - You can set the following Parquet-specific option(s) for reading Parquet files: - * ``mergeSchema``: sets whether we should merge schemas collected from all \ - Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \ - The default value is specified in ``spark.sql.parquet.mergeSchema``. - - .. note:: Evolving. - >>> parquet_sdf = spark.readStream.schema(sdf_schema).parquet(tempfile.mkdtemp()) >>> parquet_sdf.isStreaming True >>> parquet_sdf.schema == sdf_schema True """ - self._set_opts(recursiveFileLookup=recursiveFileLookup) + self._set_opts(mergeSchema=mergeSchema, recursiveFileLookup=recursiveFileLookup) if isinstance(path, basestring): return self._df(self._jreader.parquet(path)) else: