From b4e042851506880a6c8515d99bc344a26d937283 Mon Sep 17 00:00:00 2001 From: CC Highman Date: Fri, 26 Jun 2020 02:19:17 -0700 Subject: [PATCH] Python Lint and Docs --- docs/sql-data-sources-generic-options.md | 37 ++++++++++++++----- .../sql/JavaSQLDataSourceExample.java | 4 +- examples/src/main/python/sql/datasource.py | 1 + .../examples/sql/SQLDataSourceExample.scala | 4 +- python/pyspark/sql/readwriter.py | 15 +++++--- python/pyspark/sql/streaming.py | 23 ++++++++---- 6 files changed, 57 insertions(+), 27 deletions(-) diff --git a/docs/sql-data-sources-generic-options.md b/docs/sql-data-sources-generic-options.md index 6c02b41b90994..66e718c550600 100644 --- a/docs/sql-data-sources-generic-options.md +++ b/docs/sql-data-sources-generic-options.md @@ -120,13 +120,30 @@ To load all files recursively, you can use: -### Load Files after Modification Date -`fileModifiedDate` is an option used to only load files after a specified modification date. -This parameter expects a timestamp in the following format: `YYYY-MM-DDTHH:mm:ss`. - -Example:
-`spark`
-`.read`
-` .format("csv")`
-` .option('fileModifiedDate','2020-05-13T08:33:05`)
- \ No newline at end of file +### Modification Date Filter + +`modifiedDateFilter` is an option used to only load files after a specified modification +timestamp. The syntax expects a timestamp +in the form YYYY-MM-DDTHH:mm:ss. +It does not change the behavior of partition discovery. + +To load files having modification dates after the specified timestamp, +you can use: + +
+
+{% include_example load_with_modified_date_filter scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %} +
+ +
+{% include_example load_with_modified_date_filter java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %} +
+ +
+{% include_example load_with_modified_date_filter python/sql/datasource.py %} +
+ +
+{% include_example load_with_modified_date_filter r/RSparkSQLExample.R %} +
+
\ No newline at end of file diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java index 41c0a23c10aa2..3649161534c02 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java @@ -148,7 +148,7 @@ private static void runGenericFileSourceOptionsExample(SparkSession spark) { // +-------------+ // $example off:load_with_path_glob_filter$ // $example on:load_with_modified_date_filter$ - Dataset beforeFilterDF = spark.read().format("parquet") + Dataset beforeFilterDF = spark.read.format("parquet") .option("modifiedDateFilter", "2020-06-01T05:30:00") // File should not be filtered out .load("examples/src/main/resources/dir1"); beforeFilterDF.show(); @@ -157,7 +157,7 @@ private static void runGenericFileSourceOptionsExample(SparkSession spark) { // +-------------+ // |file1.parquet| // +-------------+ - Dataset afterFilterDF = spark.read().format("parquet") + Dataset afterFilterDF = spark.read.format("parquet") .option("modifiedDateFilter", "2050-06-01T05:30:00") // File should be filtered out .load("examples/src/main/resources/dir1"); afterFilterDF.show(); diff --git a/examples/src/main/python/sql/datasource.py b/examples/src/main/python/sql/datasource.py index 23f06ea34ffc6..77e22043d3372 100644 --- a/examples/src/main/python/sql/datasource.py +++ b/examples/src/main/python/sql/datasource.py @@ -88,6 +88,7 @@ def generic_file_source_options_example(spark): # +-------------+ # $example off:load_with_modified_date_filter$ + def basic_datasource_example(spark): # $example on:generic_load_save_functions$ df = spark.read.load("examples/src/main/resources/users.parquet") diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala index c2012ebeed591..d0536b7d848cf 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala @@ -82,7 +82,7 @@ object SQLDataSourceExample { // +-------------+ // $example off:load_with_path_glob_filter$ // $example on:load_with_modified_date_filter$ - val beforeFilterDF = spark.read().format("parquet") + val beforeFilterDF = spark.read.format("parquet") .option("modifiedDateFilter", "2020-06-01T05:30:00") // File should not be filtered out .load("examples/src/main/resources/dir1"); beforeFilterDF.show(); @@ -91,7 +91,7 @@ object SQLDataSourceExample { // +-------------+ // |file1.parquet| // +-------------+ - val afterFilterDF = spark.read().format("parquet") + val afterFilterDF = spark.read.format("parquet") .option("modifiedDateFilter", "2050-06-01T05:30:00") // File should be filtered out .load("examples/src/main/resources/dir1"); afterFilterDF.show(); diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index ac52ee329d831..1d112c52e0306 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -305,7 +305,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, timestampFormat=timestampFormat, multiLine=multiLine, allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep, samplingRatio=samplingRatio, dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding, - locale=locale, pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup, modifiedDateFilter=modifiedDateFilter) + locale=locale, pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup, + modifiedDateFilter=modifiedDateFilter) if isinstance(path, basestring): path = [path] if type(path) == list: @@ -366,7 +367,8 @@ def parquet(self, *paths, **options): modifiedDateFilter = options.get('modifiedDateFilter', None) recursiveFileLookup = options.get('recursiveFileLookup', None) self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter, - recursiveFileLookup=recursiveFileLookup, modifiedDateFilter=modifiedDateFilter) + recursiveFileLookup=recursiveFileLookup, + modifiedDateFilter=modifiedDateFilter) return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) @ignore_unicode_prefix @@ -553,7 +555,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine, charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio, enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep, - pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup, modifiedDateFilter=modifiedDateFilter) + pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup, + modifiedDateFilter=modifiedDateFilter) if isinstance(path, basestring): path = [path] if type(path) == list: @@ -581,7 +584,8 @@ def func(iterator): raise TypeError("path can be only string, list or RDD") @since(1.5) - def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedDateFilter=None): + def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None, + modifiedDateFilter=None): """Loads ORC files, returning the result as a :class:`DataFrame`. :param mergeSchema: sets whether we should merge schemas collected from all @@ -602,7 +606,8 @@ def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=N [('a', 'bigint'), ('b', 'int'), ('c', 'int')] """ self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter, - recursiveFileLookup=recursiveFileLookup, modifiedDateFilter=modifiedDateFilter) + recursiveFileLookup=recursiveFileLookup, + modifiedDateFilter=modifiedDateFilter) if isinstance(path, basestring): path = [path] return self._df(self._jreader.orc(_to_seq(self._spark._sc, path))) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index b88516f60fc7b..d2992712d7a9f 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -530,14 +530,16 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, timestampFormat=timestampFormat, multiLine=multiLine, allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep, locale=locale, dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding, - pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup, modifiedDateFilter=modifiedDateFilter) + pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup, + modifiedDateFilter=modifiedDateFilter) if isinstance(path, basestring): return self._df(self._jreader.json(path)) else: raise TypeError("path can be only a single string") @since(2.3) - def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedDateFilter=None): + def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None, + modifiedDateFilter=None): """Loads a ORC file stream, returning the result as a :class:`DataFrame`. .. note:: Evolving. @@ -562,14 +564,16 @@ def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=N True """ self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter, - recursiveFileLookup=recursiveFileLookup, modifiedDateFilter=modifiedDateFilter) + recursiveFileLookup=recursiveFileLookup, + modifiedDateFilter=modifiedDateFilter) if isinstance(path, basestring): return self._df(self._jreader.orc(path)) else: raise TypeError("path can be only a single string") @since(2.0) - def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedDateFilter=None): + def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None, + modifiedDateFilter=None): """ Loads a Parquet file stream, returning the result as a :class:`DataFrame`. @@ -596,7 +600,8 @@ def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLook True """ self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter, - recursiveFileLookup=recursiveFileLookup,modifiedDateFilter=modifiedDateFilter) + recursiveFileLookup=recursiveFileLookup, + modifiedDateFilter=modifiedDateFilter) if isinstance(path, basestring): return self._df(self._jreader.parquet(path)) else: @@ -605,7 +610,8 @@ def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLook @ignore_unicode_prefix @since(2.0) def text(self, path, wholetext=False, lineSep=None, pathGlobFilter=None, - recursiveFileLookup=None, modifiedDateFilter=None): + recursiveFileLookup=None, + modifiedDateFilter=None): """ Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a string column named "value", and followed by partitioned columns if there @@ -781,7 +787,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine, charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep, - pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup, modifiedDateFilter=modifiedDateFilter) + pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup, + modifiedDateFilter=modifiedDateFilter) if isinstance(path, basestring): return self._df(self._jreader.csv(path)) else: @@ -1268,4 +1275,4 @@ def _test(): if __name__ == "__main__": - _test() + _test() \ No newline at end of file