From c40c461eaefd1d376538fb2c0e9e52550dfe1d78 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 22 Jun 2015 10:40:03 -0700 Subject: [PATCH 1/8] Regression test. --- python/pyspark/sql/tests.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index b5fbb7d098820..df79dbb7f606d 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -539,6 +539,37 @@ def test_save_and_load(self): shutil.rmtree(tmpPath) + def test_save_and_load_builder(self): + df = self.df + tmpPath = tempfile.mkdtemp() + shutil.rmtree(tmpPath) + df.write.json(tmpPath) + actual = self.sqlCtx.read.json(tmpPath) + self.assertEqual(sorted(df.collect()), sorted(actual.collect())) + + schema = StructType([StructField("value", StringType(), True)]) + actual = self.sqlCtx.read.json(tmpPath, schema) + self.assertEqual(sorted(df.select("value").collect()), sorted(actual.collect())) + + df.write.mode("overwrite").json(tmpPath) + actual = self.sqlCtx.read.json(tmpPath) + self.assertEqual(sorted(df.collect()), sorted(actual.collect())) + + df.write.mode("overwrite").options(noUse="this options will not be used in save.")\ + .format("json").save(path=tmpPath) + actual = self.sqlCtx.read.format("json").load(path=tmpPath, + noUse="this options will not be used in load.") + self.assertEqual(sorted(df.collect()), sorted(actual.collect())) + + defaultDataSourceName = self.sqlCtx.getConf("spark.sql.sources.default", + "org.apache.spark.sql.parquet") + self.sqlCtx.sql("SET spark.sql.sources.default=org.apache.spark.sql.json") + actual = self.sqlCtx.load(path=tmpPath) + self.assertEqual(sorted(df.collect()), sorted(actual.collect())) + self.sqlCtx.sql("SET spark.sql.sources.default=" + defaultDataSourceName) + + shutil.rmtree(tmpPath) + def test_help_command(self): # Regression test for SPARK-5464 rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}']) From 88eb6c45cfe61f4922df90fe5fd8b555976b31a4 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 22 Jun 2015 10:40:14 -0700 Subject: [PATCH 2/8] If mode is "error", do not call mode method. --- python/pyspark/sql/readwriter.py | 50 ++++++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index f036644acc961..f0ac232d5385d 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -276,7 +276,15 @@ def save(self, path=None, format=None, mode="error", **options): >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ - self.mode(mode).options(**options) + if mode is not "error": + # At the JVM side, the default value of mode is already set to "error". + # So, if mode at here is "error", we will not call mode method. + # This behavior is used to prevent us accidentally overriding the mode because + # user can call mode method directly. + # We leave "error" as the default in the method signature, so users can + # see what is the default value in Python doc. + self.mode(mode) + self.options(**options) if format is not None: self.format(format) if path is None: @@ -314,7 +322,15 @@ def saveAsTable(self, name, format=None, mode="error", **options): :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error) :param options: all other string options """ - self.mode(mode).options(**options) + if mode is not "error": + # At the JVM side, the default value of mode is already set to "error". + # So, if mode at here is "error", we will not call mode method. + # This behavior is used to prevent us accidentally overriding the mode because + # user can call mode method directly. + # We leave "error" as the default in the method signature, so users can + # see what is the default value in Python doc. + self.mode(mode) + self.options(**options) if format is not None: self.format(format) self._jwrite.saveAsTable(name) @@ -333,7 +349,15 @@ def json(self, path, mode="error"): >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data')) """ - self._jwrite.mode(mode).json(path) + if mode is not "error": + # At the JVM side, the default value of mode is already set to "error". + # So, if mode at here is "error", we will not call mode method. + # This behavior is used to prevent us accidentally overriding the mode because + # user can call mode method directly. + # We leave "error" as the default in the method signature, so users can + # see what is the default value in Python doc. + self.mode(mode) + self._jwrite.json(path) @since(1.4) def parquet(self, path, mode="error"): @@ -349,7 +373,15 @@ def parquet(self, path, mode="error"): >>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ - self._jwrite.mode(mode).parquet(path) + if mode is not "error": + # At the JVM side, the default value of mode is already set to "error". + # So, if mode at here is "error", we will not call mode method. + # This behavior is used to prevent us accidentally overriding the mode because + # user can call mode method directly. + # We leave "error" as the default in the method signature, so users can + # see what is the default value in Python doc. + self.mode(mode) + self._jwrite.parquet(path) @since(1.4) def jdbc(self, url, table, mode="error", properties={}): @@ -370,10 +402,18 @@ def jdbc(self, url, table, mode="error", properties={}): arbitrary string tag/value. Normally at least a "user" and "password" property should be included. """ + if mode is not "error": + # At the JVM side, the default value of mode is already set to "error". + # So, if mode at here is "error", we will not call mode method. + # This behavior is used to prevent us accidentally overriding the mode because + # user can call mode method directly. + # We leave "error" as the default in the method signature, so users can + # see what is the default value in Python doc. + self.mode(mode) jprop = JavaClass("java.util.Properties", self._sqlContext._sc._gateway._gateway_client)() for k in properties: jprop.setProperty(k, properties[k]) - self._jwrite.mode(mode).jdbc(url, table, jprop) + self._jwrite.jdbc(url, table, jprop) def _test(): From d696dff43db06fb15eda4234d7becf25a531c93b Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 22 Jun 2015 10:53:41 -0700 Subject: [PATCH 3/8] Python style. --- python/pyspark/sql/tests.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index df79dbb7f606d..13f4556943ac8 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -557,8 +557,9 @@ def test_save_and_load_builder(self): df.write.mode("overwrite").options(noUse="this options will not be used in save.")\ .format("json").save(path=tmpPath) - actual = self.sqlCtx.read.format("json").load(path=tmpPath, - noUse="this options will not be used in load.") + actual =\ + self.sqlCtx.read.format("json")\ + .load(path=tmpPath, noUse="this options will not be used in load.") self.assertEqual(sorted(df.collect()), sorted(actual.collect())) defaultDataSourceName = self.sqlCtx.getConf("spark.sql.sources.default", From 7fbc24ba35fd5c423bc37ddfeae09cebbc909b2e Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 22 Jun 2015 11:51:31 -0700 Subject: [PATCH 4/8] Use None instead of "error" as the default value of mode since JVM-side already uses "error" as the default value. --- python/pyspark/sql/readwriter.py | 50 ++++++++++---------------------- 1 file changed, 15 insertions(+), 35 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index f0ac232d5385d..38e3690a4aa17 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -257,7 +257,7 @@ def partitionBy(self, *cols): return self @since(1.4) - def save(self, path=None, format=None, mode="error", **options): + def save(self, path=None, format=None, mode=None, **options): """Saves the contents of the :class:`DataFrame` to a data source. The data source is specified by the ``format`` and a set of ``options``. @@ -276,13 +276,9 @@ def save(self, path=None, format=None, mode="error", **options): >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ - if mode is not "error": + if mode is not None: # At the JVM side, the default value of mode is already set to "error". - # So, if mode at here is "error", we will not call mode method. - # This behavior is used to prevent us accidentally overriding the mode because - # user can call mode method directly. - # We leave "error" as the default in the method signature, so users can - # see what is the default value in Python doc. + # We will only call mode method if the provided mode is not None. self.mode(mode) self.options(**options) if format is not None: @@ -304,7 +300,7 @@ def insertInto(self, tableName, overwrite=False): self._jwrite.mode("overwrite" if overwrite else "append").insertInto(tableName) @since(1.4) - def saveAsTable(self, name, format=None, mode="error", **options): + def saveAsTable(self, name, format=None, mode=None, **options): """Saves the content of the :class:`DataFrame` as the specified table. In the case the table already exists, behavior of this function depends on the @@ -322,13 +318,9 @@ def saveAsTable(self, name, format=None, mode="error", **options): :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error) :param options: all other string options """ - if mode is not "error": + if mode is not None: # At the JVM side, the default value of mode is already set to "error". - # So, if mode at here is "error", we will not call mode method. - # This behavior is used to prevent us accidentally overriding the mode because - # user can call mode method directly. - # We leave "error" as the default in the method signature, so users can - # see what is the default value in Python doc. + # We will only call mode method if the provided mode is not None. self.mode(mode) self.options(**options) if format is not None: @@ -336,7 +328,7 @@ def saveAsTable(self, name, format=None, mode="error", **options): self._jwrite.saveAsTable(name) @since(1.4) - def json(self, path, mode="error"): + def json(self, path, mode=None): """Saves the content of the :class:`DataFrame` in JSON format at the specified path. :param path: the path in any Hadoop supported file system @@ -349,18 +341,14 @@ def json(self, path, mode="error"): >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data')) """ - if mode is not "error": + if mode is not None: # At the JVM side, the default value of mode is already set to "error". - # So, if mode at here is "error", we will not call mode method. - # This behavior is used to prevent us accidentally overriding the mode because - # user can call mode method directly. - # We leave "error" as the default in the method signature, so users can - # see what is the default value in Python doc. + # We will only call mode method if the provided mode is not None. self.mode(mode) self._jwrite.json(path) @since(1.4) - def parquet(self, path, mode="error"): + def parquet(self, path, mode=None): """Saves the content of the :class:`DataFrame` in Parquet format at the specified path. :param path: the path in any Hadoop supported file system @@ -373,18 +361,14 @@ def parquet(self, path, mode="error"): >>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ - if mode is not "error": + if mode is not None: # At the JVM side, the default value of mode is already set to "error". - # So, if mode at here is "error", we will not call mode method. - # This behavior is used to prevent us accidentally overriding the mode because - # user can call mode method directly. - # We leave "error" as the default in the method signature, so users can - # see what is the default value in Python doc. + # We will only call mode method if the provided mode is not None. self.mode(mode) self._jwrite.parquet(path) @since(1.4) - def jdbc(self, url, table, mode="error", properties={}): + def jdbc(self, url, table, mode=None, properties={}): """Saves the content of the :class:`DataFrame` to a external database table via JDBC. .. note:: Don't create too many partitions in parallel on a large cluster;\ @@ -402,13 +386,9 @@ def jdbc(self, url, table, mode="error", properties={}): arbitrary string tag/value. Normally at least a "user" and "password" property should be included. """ - if mode is not "error": + if mode is not None: # At the JVM side, the default value of mode is already set to "error". - # So, if mode at here is "error", we will not call mode method. - # This behavior is used to prevent us accidentally overriding the mode because - # user can call mode method directly. - # We leave "error" as the default in the method signature, so users can - # see what is the default value in Python doc. + # We will only call mode method if the provided mode is not None. self.mode(mode) jprop = JavaClass("java.util.Properties", self._sqlContext._sc._gateway._gateway_client)() for k in properties: From 889eb253b88aa1622b820dd26b0cf7f11f86cd70 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 22 Jun 2015 12:15:05 -0700 Subject: [PATCH 5/8] Minor refactoring and add partitionBy to save, saveAsTable, and parquet. --- python/pyspark/sql/readwriter.py | 43 +++++++++++--------------------- 1 file changed, 14 insertions(+), 29 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 38e3690a4aa17..1c1519b3f1651 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -218,7 +218,10 @@ def mode(self, saveMode): >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ - self._jwrite = self._jwrite.mode(saveMode) + # At the JVM side, the default value of mode is already set to "error". + # So, if the given saveMode is None, we will not call JVM-side's mode method. + if saveMode is not None: + self._jwrite = self._jwrite.mode(saveMode) return self @since(1.4) @@ -253,11 +256,12 @@ def partitionBy(self, *cols): """ if len(cols) == 1 and isinstance(cols[0], (list, tuple)): cols = cols[0] - self._jwrite = self._jwrite.partitionBy(_to_seq(self._sqlContext._sc, cols)) + if len(cols) > 0: + self._jwrite = self._jwrite.partitionBy(_to_seq(self._sqlContext._sc, cols)) return self @since(1.4) - def save(self, path=None, format=None, mode=None, **options): + def save(self, path=None, format=None, mode=None, partitionBy=(), **options): """Saves the contents of the :class:`DataFrame` to a data source. The data source is specified by the ``format`` and a set of ``options``. @@ -276,11 +280,7 @@ def save(self, path=None, format=None, mode=None, **options): >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ - if mode is not None: - # At the JVM side, the default value of mode is already set to "error". - # We will only call mode method if the provided mode is not None. - self.mode(mode) - self.options(**options) + self.partitionBy(partitionBy).mode(mode).options(**options) if format is not None: self.format(format) if path is None: @@ -300,7 +300,7 @@ def insertInto(self, tableName, overwrite=False): self._jwrite.mode("overwrite" if overwrite else "append").insertInto(tableName) @since(1.4) - def saveAsTable(self, name, format=None, mode=None, **options): + def saveAsTable(self, name, format=None, mode=None, partitionBy=(), **options): """Saves the content of the :class:`DataFrame` as the specified table. In the case the table already exists, behavior of this function depends on the @@ -318,11 +318,7 @@ def saveAsTable(self, name, format=None, mode=None, **options): :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error) :param options: all other string options """ - if mode is not None: - # At the JVM side, the default value of mode is already set to "error". - # We will only call mode method if the provided mode is not None. - self.mode(mode) - self.options(**options) + self.partitionBy(partitionBy).mode(mode).options(**options) if format is not None: self.format(format) self._jwrite.saveAsTable(name) @@ -341,14 +337,10 @@ def json(self, path, mode=None): >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data')) """ - if mode is not None: - # At the JVM side, the default value of mode is already set to "error". - # We will only call mode method if the provided mode is not None. - self.mode(mode) - self._jwrite.json(path) + self._jwrite.mode(mode).json(path) @since(1.4) - def parquet(self, path, mode=None): + def parquet(self, path, mode=None, partitionBy=()): """Saves the content of the :class:`DataFrame` in Parquet format at the specified path. :param path: the path in any Hadoop supported file system @@ -361,10 +353,7 @@ def parquet(self, path, mode=None): >>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ - if mode is not None: - # At the JVM side, the default value of mode is already set to "error". - # We will only call mode method if the provided mode is not None. - self.mode(mode) + self.partitionBy(partitionBy).mode(mode) self._jwrite.parquet(path) @since(1.4) @@ -386,14 +375,10 @@ def jdbc(self, url, table, mode=None, properties={}): arbitrary string tag/value. Normally at least a "user" and "password" property should be included. """ - if mode is not None: - # At the JVM side, the default value of mode is already set to "error". - # We will only call mode method if the provided mode is not None. - self.mode(mode) jprop = JavaClass("java.util.Properties", self._sqlContext._sc._gateway._gateway_client)() for k in properties: jprop.setProperty(k, properties[k]) - self._jwrite.jdbc(url, table, jprop) + self._jwrite.mode(mode).jdbc(url, table, jprop) def _test(): From d21290a259f6e966f188afa1b3b7033c19f77fb9 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 22 Jun 2015 12:17:49 -0700 Subject: [PATCH 6/8] Python doc. --- python/pyspark/sql/readwriter.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 1c1519b3f1651..45c5fd6b71f75 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -276,6 +276,7 @@ def save(self, path=None, format=None, mode=None, partitionBy=(), **options): * ``overwrite``: Overwrite existing data. * ``ignore``: Silently ignore this operation if data already exists. * ``error`` (default case): Throw an exception if data already exists. + :param partitionBy: names of partitioning columns :param options: all other string options >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data')) @@ -316,6 +317,7 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=(), **options): :param name: the table name :param format: the format used to save :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error) + :param partitionBy: names of partitioning columns :param options: all other string options """ self.partitionBy(partitionBy).mode(mode).options(**options) @@ -350,7 +352,7 @@ def parquet(self, path, mode=None, partitionBy=()): * ``overwrite``: Overwrite existing data. * ``ignore``: Silently ignore this operation if data already exists. * ``error`` (default case): Throw an exception if data already exists. - + :param partitionBy: names of partitioning columns >>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ self.partitionBy(partitionBy).mode(mode) From d37abd2e379d44b1f4a620db68bc1680e70ae3ba Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 22 Jun 2015 12:20:56 -0700 Subject: [PATCH 7/8] style. --- python/pyspark/sql/readwriter.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 45c5fd6b71f75..4f32251cb718c 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -353,6 +353,7 @@ def parquet(self, path, mode=None, partitionBy=()): * ``ignore``: Silently ignore this operation if data already exists. * ``error`` (default case): Throw an exception if data already exists. :param partitionBy: names of partitioning columns + >>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ self.partitionBy(partitionBy).mode(mode) From f972d5d33df367faa5fc514a7787f09553684b9c Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 22 Jun 2015 12:53:55 -0700 Subject: [PATCH 8/8] davies's comment. --- python/pyspark/sql/readwriter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 4f32251cb718c..1b7bc0f9a12be 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -339,7 +339,7 @@ def json(self, path, mode=None): >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data')) """ - self._jwrite.mode(mode).json(path) + self.mode(mode)._jwrite.json(path) @since(1.4) def parquet(self, path, mode=None, partitionBy=()):