Skip to content

Commit

Permalink
Python Lint and Docs
Browse files Browse the repository at this point in the history
  • Loading branch information
cchighman committed Jun 26, 2020
1 parent 86b5d6f commit b4e0428
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 27 deletions.
37 changes: 27 additions & 10 deletions docs/sql-data-sources-generic-options.md
Expand Up @@ -120,13 +120,30 @@ To load all files recursively, you can use:
</div>
</div>

### Load Files after Modification Date
`fileModifiedDate` is an option used to only load files after a specified modification date.
This parameter expects a timestamp in the following format: `YYYY-MM-DDTHH:mm:ss`.

Example:<br/>
`spark`<br/>
`.read` <br/>
` .format("csv")`<br/>
` .option('fileModifiedDate','2020-05-13T08:33:05`)<br/>

### Modification Date Filter

`modifiedDateFilter` is an option used to only load files after a specified modification
timestamp. The syntax expects a timestamp
in the form <code>YYYY-MM-DDTHH:mm:ss</code>.
It does not change the behavior of partition discovery.

To load files having modification dates after the specified timestamp,
you can use:

<div class="codetabs">
<div data-lang="scala" markdown="1">
{% include_example load_with_modified_date_filter scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
</div>

<div data-lang="java" markdown="1">
{% include_example load_with_modified_date_filter java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
</div>

<div data-lang="python" markdown="1">
{% include_example load_with_modified_date_filter python/sql/datasource.py %}
</div>

<div data-lang="r" markdown="1">
{% include_example load_with_modified_date_filter r/RSparkSQLExample.R %}
</div>
</div>
Expand Up @@ -148,7 +148,7 @@ private static void runGenericFileSourceOptionsExample(SparkSession spark) {
// +-------------+
// $example off:load_with_path_glob_filter$
// $example on:load_with_modified_date_filter$
Dataset<Row> beforeFilterDF = spark.read().format("parquet")
Dataset<Row> beforeFilterDF = spark.read.format("parquet")
.option("modifiedDateFilter", "2020-06-01T05:30:00") // File should not be filtered out
.load("examples/src/main/resources/dir1");
beforeFilterDF.show();
Expand All @@ -157,7 +157,7 @@ private static void runGenericFileSourceOptionsExample(SparkSession spark) {
// +-------------+
// |file1.parquet|
// +-------------+
Dataset<Row> afterFilterDF = spark.read().format("parquet")
Dataset<Row> afterFilterDF = spark.read.format("parquet")
.option("modifiedDateFilter", "2050-06-01T05:30:00") // File should be filtered out
.load("examples/src/main/resources/dir1");
afterFilterDF.show();
Expand Down
1 change: 1 addition & 0 deletions examples/src/main/python/sql/datasource.py
Expand Up @@ -88,6 +88,7 @@ def generic_file_source_options_example(spark):
# +-------------+
# $example off:load_with_modified_date_filter$


def basic_datasource_example(spark):
# $example on:generic_load_save_functions$
df = spark.read.load("examples/src/main/resources/users.parquet")
Expand Down
Expand Up @@ -82,7 +82,7 @@ object SQLDataSourceExample {
// +-------------+
// $example off:load_with_path_glob_filter$
// $example on:load_with_modified_date_filter$
val beforeFilterDF = spark.read().format("parquet")
val beforeFilterDF = spark.read.format("parquet")
.option("modifiedDateFilter", "2020-06-01T05:30:00") // File should not be filtered out
.load("examples/src/main/resources/dir1");
beforeFilterDF.show();
Expand All @@ -91,7 +91,7 @@ object SQLDataSourceExample {
// +-------------+
// |file1.parquet|
// +-------------+
val afterFilterDF = spark.read().format("parquet")
val afterFilterDF = spark.read.format("parquet")
.option("modifiedDateFilter", "2050-06-01T05:30:00") // File should be filtered out
.load("examples/src/main/resources/dir1");
afterFilterDF.show();
Expand Down
15 changes: 10 additions & 5 deletions python/pyspark/sql/readwriter.py
Expand Up @@ -305,7 +305,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
timestampFormat=timestampFormat, multiLine=multiLine,
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep,
samplingRatio=samplingRatio, dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding,
locale=locale, pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup, modifiedDateFilter=modifiedDateFilter)
locale=locale, pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
modifiedDateFilter=modifiedDateFilter)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
Expand Down Expand Up @@ -366,7 +367,8 @@ def parquet(self, *paths, **options):
modifiedDateFilter = options.get('modifiedDateFilter', None)
recursiveFileLookup = options.get('recursiveFileLookup', None)
self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
recursiveFileLookup=recursiveFileLookup, modifiedDateFilter=modifiedDateFilter)
recursiveFileLookup=recursiveFileLookup,
modifiedDateFilter=modifiedDateFilter)
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))

@ignore_unicode_prefix
Expand Down Expand Up @@ -553,7 +555,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep,
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup, modifiedDateFilter=modifiedDateFilter)
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
modifiedDateFilter=modifiedDateFilter)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
Expand Down Expand Up @@ -581,7 +584,8 @@ def func(iterator):
raise TypeError("path can be only string, list or RDD")

@since(1.5)
def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedDateFilter=None):
def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None,
modifiedDateFilter=None):
"""Loads ORC files, returning the result as a :class:`DataFrame`.
:param mergeSchema: sets whether we should merge schemas collected from all
Expand All @@ -602,7 +606,8 @@ def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=N
[('a', 'bigint'), ('b', 'int'), ('c', 'int')]
"""
self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
recursiveFileLookup=recursiveFileLookup, modifiedDateFilter=modifiedDateFilter)
recursiveFileLookup=recursiveFileLookup,
modifiedDateFilter=modifiedDateFilter)
if isinstance(path, basestring):
path = [path]
return self._df(self._jreader.orc(_to_seq(self._spark._sc, path)))
Expand Down
23 changes: 15 additions & 8 deletions python/pyspark/sql/streaming.py
Expand Up @@ -530,14 +530,16 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
timestampFormat=timestampFormat, multiLine=multiLine,
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep, locale=locale,
dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding,
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup, modifiedDateFilter=modifiedDateFilter)
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
modifiedDateFilter=modifiedDateFilter)
if isinstance(path, basestring):
return self._df(self._jreader.json(path))
else:
raise TypeError("path can be only a single string")

@since(2.3)
def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedDateFilter=None):
def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None,
modifiedDateFilter=None):
"""Loads a ORC file stream, returning the result as a :class:`DataFrame`.
.. note:: Evolving.
Expand All @@ -562,14 +564,16 @@ def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=N
True
"""
self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
recursiveFileLookup=recursiveFileLookup, modifiedDateFilter=modifiedDateFilter)
recursiveFileLookup=recursiveFileLookup,
modifiedDateFilter=modifiedDateFilter)
if isinstance(path, basestring):
return self._df(self._jreader.orc(path))
else:
raise TypeError("path can be only a single string")

@since(2.0)
def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedDateFilter=None):
def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None,
modifiedDateFilter=None):
"""
Loads a Parquet file stream, returning the result as a :class:`DataFrame`.
Expand All @@ -596,7 +600,8 @@ def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLook
True
"""
self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
recursiveFileLookup=recursiveFileLookup,modifiedDateFilter=modifiedDateFilter)
recursiveFileLookup=recursiveFileLookup,
modifiedDateFilter=modifiedDateFilter)
if isinstance(path, basestring):
return self._df(self._jreader.parquet(path))
else:
Expand All @@ -605,7 +610,8 @@ def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLook
@ignore_unicode_prefix
@since(2.0)
def text(self, path, wholetext=False, lineSep=None, pathGlobFilter=None,
recursiveFileLookup=None, modifiedDateFilter=None):
recursiveFileLookup=None,
modifiedDateFilter=None):
"""
Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a
string column named "value", and followed by partitioned columns if there
Expand Down Expand Up @@ -781,7 +787,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema,
emptyValue=emptyValue, locale=locale, lineSep=lineSep,
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup, modifiedDateFilter=modifiedDateFilter)
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
modifiedDateFilter=modifiedDateFilter)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
else:
Expand Down Expand Up @@ -1268,4 +1275,4 @@ def _test():


if __name__ == "__main__":
_test()
_test()

0 comments on commit b4e0428

Please sign in to comment.