Skip to content
Permalink
Browse files

[SPARK-27990][SPARK-29903][PYTHON] Add recursiveFileLookup option to …

…Python DataFrameReader

### What changes were proposed in this pull request?

As a follow-up to #24830, this PR adds the `recursiveFileLookup` option to the Python DataFrameReader API.

### Why are the changes needed?

This PR maintains Python feature parity with Scala.

### Does this PR introduce any user-facing change?

Yes.

Before this PR, you'd only be able to use this option as follows:

```python
spark.read.option("recursiveFileLookup", True).text("test-data").show()
```

With this PR, you can reference the option from within the format-specific method:

```python
spark.read.text("test-data", recursiveFileLookup=True).show()
```

This option now also shows up in the Python API docs.

### How was this patch tested?

I tested this manually by creating the following directories with dummy data:

```
test-data
├── 1.txt
└── nested
   └── 2.txt
test-parquet
├── nested
│  ├── _SUCCESS
│  ├── part-00000-...-.parquet
├── _SUCCESS
├── part-00000-...-.parquet
```

I then ran the following tests and confirmed the output looked good:

```python
spark.read.parquet("test-parquet", recursiveFileLookup=True).show()
spark.read.text("test-data", recursiveFileLookup=True).show()
spark.read.csv("test-data", recursiveFileLookup=True).show()
```

`python/pyspark/sql/tests/test_readwriter.py` seems pretty sparse. I'm happy to add my tests there, though it seems we have been deferring testing like this to the Scala side of things.

Closes #26718 from nchammas/SPARK-27990-recursiveFileLookup-python.

Authored-by: Nicholas Chammas <nicholas.chammas@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
nchammas authored and HyukjinKwon committed Dec 4, 2019
1 parent f3abee3 commit 3dd3a623f293bc7fd4937c95f06b967fa187b0f1
Showing with 56 additions and 16 deletions.
  1. +28 −8 python/pyspark/sql/readwriter.py
  2. +28 −8 python/pyspark/sql/streaming.py
@@ -171,7 +171,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None,
dropFieldIfAllNull=None, encoding=None, locale=None):
dropFieldIfAllNull=None, encoding=None, locale=None, recursiveFileLookup=None):
"""
Loads JSON files and returns the results as a :class:`DataFrame`.
@@ -247,6 +247,10 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
:param locale: sets a locale as language tag in IETF BCP 47 format. If None is set,
it uses the default value, ``en-US``. For instance, ``locale`` is used while
parsing dates and timestamps.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.
.. _partition discovery: /sql-data-sources-parquet.html#partition-discovery
>>> df1 = spark.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
@@ -266,7 +270,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
timestampFormat=timestampFormat, multiLine=multiLine,
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep,
samplingRatio=samplingRatio, dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding,
locale=locale)
locale=locale, recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
@@ -300,9 +304,12 @@ def table(self, tableName):
return self._df(self._jreader.table(tableName))

@since(1.4)
def parquet(self, *paths):
def parquet(self, *paths, **options):
"""Loads Parquet files, returning the result as a :class:`DataFrame`.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.
You can set the following Parquet-specific option(s) for reading Parquet files:
* ``mergeSchema``: sets whether we should merge schemas collected from all \
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
@@ -312,11 +319,13 @@ def parquet(self, *paths):
>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
"""
recursiveFileLookup = options.get('recursiveFileLookup', None)
self._set_opts(recursiveFileLookup=recursiveFileLookup)
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))

@ignore_unicode_prefix
@since(1.6)
def text(self, paths, wholetext=False, lineSep=None):
def text(self, paths, wholetext=False, lineSep=None, recursiveFileLookup=None):
"""
Loads text files and returns a :class:`DataFrame` whose schema starts with a
string column named "value", and followed by partitioned columns if there
@@ -329,6 +338,8 @@ def text(self, paths, wholetext=False, lineSep=None):
:param wholetext: if true, read each file from input path(s) as a single row.
:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.
>>> df = spark.read.text('python/test_support/sql/text-test.txt')
>>> df.collect()
@@ -337,7 +348,8 @@ def text(self, paths, wholetext=False, lineSep=None):
>>> df.collect()
[Row(value=u'hello\\nthis')]
"""
self._set_opts(wholetext=wholetext, lineSep=lineSep)
self._set_opts(
wholetext=wholetext, lineSep=lineSep, recursiveFileLookup=recursiveFileLookup)
if isinstance(paths, basestring):
paths = [paths]
return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths)))
@@ -349,7 +361,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None):
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
recursiveFileLookup=None):
r"""Loads a CSV file and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -457,6 +470,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
Maximum length is 1 character.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.
>>> df = spark.read.csv('python/test_support/sql/ages.csv')
>>> df.dtypes
@@ -476,7 +491,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep)
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep,
recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
@@ -504,13 +520,17 @@ def func(iterator):
raise TypeError("path can be only string, list or RDD")

@since(1.5)
def orc(self, path):
def orc(self, path, recursiveFileLookup=None):
"""Loads ORC files, returning the result as a :class:`DataFrame`.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.
>>> df = spark.read.orc('python/test_support/sql/orc_partitioned')
>>> df.dtypes
[('a', 'bigint'), ('b', 'int'), ('c', 'int')]
"""
self._set_opts(recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
path = [path]
return self._df(self._jreader.orc(_to_seq(self._spark._sc, path)))
@@ -411,7 +411,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
multiLine=None, allowUnquotedControlChars=None, lineSep=None, locale=None,
dropFieldIfAllNull=None, encoding=None):
dropFieldIfAllNull=None, encoding=None, recursiveFileLookup=None):
"""
Loads a JSON file stream and returns the results as a :class:`DataFrame`.
@@ -487,6 +487,10 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
the JSON files. For example UTF-16BE, UTF-32LE. If None is set,
the encoding of input JSON will be detected automatically
when the multiLine option is set to ``true``.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.
.. _partition discovery: /sql-data-sources-parquet.html#partition-discovery
>>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
>>> json_sdf.isStreaming
@@ -502,33 +506,41 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
timestampFormat=timestampFormat, multiLine=multiLine,
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep, locale=locale,
dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding)
dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding,
recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
return self._df(self._jreader.json(path))
else:
raise TypeError("path can be only a single string")

@since(2.3)
def orc(self, path):
def orc(self, path, recursiveFileLookup=None):
"""Loads a ORC file stream, returning the result as a :class:`DataFrame`.
.. note:: Evolving.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.
>>> orc_sdf = spark.readStream.schema(sdf_schema).orc(tempfile.mkdtemp())
>>> orc_sdf.isStreaming
True
>>> orc_sdf.schema == sdf_schema
True
"""
self._set_opts(recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
return self._df(self._jreader.orc(path))
else:
raise TypeError("path can be only a single string")

@since(2.0)
def parquet(self, path):
def parquet(self, path, recursiveFileLookup=None):
"""Loads a Parquet file stream, returning the result as a :class:`DataFrame`.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.
You can set the following Parquet-specific option(s) for reading Parquet files:
* ``mergeSchema``: sets whether we should merge schemas collected from all \
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \
@@ -542,14 +554,15 @@ def parquet(self, path):
>>> parquet_sdf.schema == sdf_schema
True
"""
self._set_opts(recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
return self._df(self._jreader.parquet(path))
else:
raise TypeError("path can be only a single string")

@ignore_unicode_prefix
@since(2.0)
def text(self, path, wholetext=False, lineSep=None):
def text(self, path, wholetext=False, lineSep=None, recursiveFileLookup=None):
"""
Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a
string column named "value", and followed by partitioned columns if there
@@ -564,14 +577,17 @@ def text(self, path, wholetext=False, lineSep=None):
:param wholetext: if true, read each file from input path(s) as a single row.
:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.
>>> text_sdf = spark.readStream.text(tempfile.mkdtemp())
>>> text_sdf.isStreaming
True
>>> "value" in str(text_sdf.schema)
True
"""
self._set_opts(wholetext=wholetext, lineSep=lineSep)
self._set_opts(
wholetext=wholetext, lineSep=lineSep, recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
return self._df(self._jreader.text(path))
else:
@@ -584,7 +600,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
enforceSchema=None, emptyValue=None, locale=None, lineSep=None):
enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
recursiveFileLookup=None):
r"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -687,6 +704,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
Maximum length is 1 character.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.
>>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)
>>> csv_sdf.isStreaming
@@ -704,7 +723,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema,
emptyValue=emptyValue, locale=locale, lineSep=lineSep)
emptyValue=emptyValue, locale=locale, lineSep=lineSep,
recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
else:

0 comments on commit 3dd3a62

Please sign in to comment.
You can’t perform that action at this time.