From df646e860f97c09f7fea9a80a058025bd3edac57 Mon Sep 17 00:00:00 2001 From: shane knapp Date: Thu, 16 Aug 2018 17:48:47 -0700 Subject: [PATCH] just testing pyarrow changes --- dev/run-tests-jenkins.py | 3 +- dev/run-tests.py | 2 +- python/pyspark/sql/tests.py | 66 ++++++++++++++++++++++++++++++------- python/pyspark/sql/types.py | 15 +++++++++ python/run-tests.py | 2 +- 5 files changed, 74 insertions(+), 14 deletions(-) diff --git a/dev/run-tests-jenkins.py b/dev/run-tests-jenkins.py index 16af97c7fbeae..1e0a1b51c198e 100755 --- a/dev/run-tests-jenkins.py +++ b/dev/run-tests-jenkins.py @@ -37,6 +37,7 @@ def print_err(msg): def post_message_to_github(msg, ghprb_pull_id): + return print("Attempting to post to Github...") url = "https://api.github.com/repos/apache/spark/issues/" + ghprb_pull_id + "/comments" @@ -182,7 +183,7 @@ def main(): # format: http://linux.die.net/man/1/timeout # must be less than the timeout configured on Jenkins (currently 400m) - tests_timeout = "340m" + tests_timeout = "600m" # Array to capture all test names to run on the pull request. These tests are represented # by their file equivalents in the dev/tests/ directory. diff --git a/dev/run-tests.py b/dev/run-tests.py index d9d3789ac1255..b4b3916350e74 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -531,7 +531,7 @@ def main(): hadoop_version = os.environ.get("AMPLAB_JENKINS_BUILD_PROFILE", "hadoop2.6") test_env = "amplab_jenkins" # add path for Python3 in Jenkins if we're calling from a Jenkins machine - os.environ["PATH"] = "/home/anaconda/envs/py3k/bin:" + os.environ.get("PATH") + os.environ["PATH"] = "/home/anaconda/envs/py35/bin:" + os.environ.get("PATH") else: # else we're running locally and can use local settings build_tool = "sbt" diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 91ed600afedd7..4a0d53f79290f 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -4050,6 +4050,8 @@ class ArrowTests(ReusedSQLTestCase): def setUpClass(cls): from datetime import date, datetime from decimal import Decimal + from distutils.version import LooseVersion + import pyarrow as pa ReusedSQLTestCase.setUpClass() # Synchronize default timezone between Python and Java @@ -4078,6 +4080,13 @@ def setUpClass(cls): (u"c", 3, 30, 0.8, 6.0, Decimal("6.0"), date(2100, 3, 3), datetime(2100, 3, 3, 3, 3, 3))] + # TODO: remove version check once minimum pyarrow version is 0.10.0 + if LooseVersion("0.10.0") <= LooseVersion(pa.__version__): + cls.schema.add(StructField("9_binary_t", BinaryType(), True)) + cls.data[0] = cls.data[0] + (bytearray(b"a"),) + cls.data[1] = cls.data[1] + (bytearray(b"bb"),) + cls.data[2] = cls.data[2] + (bytearray(b"ccc"),) + @classmethod def tearDownClass(cls): del os.environ["TZ"] @@ -4115,12 +4124,23 @@ def test_toPandas_fallback_enabled(self): self.assertPandasEqual(pdf, pd.DataFrame({u'map': [{u'a': 1}]})) def test_toPandas_fallback_disabled(self): + from distutils.version import LooseVersion + import pyarrow as pa + schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)]) df = self.spark.createDataFrame([(None,)], schema=schema) with QuietTest(self.sc): with self.assertRaisesRegexp(Exception, 'Unsupported type'): df.toPandas() + # TODO: remove BinaryType check once minimum pyarrow version is 0.10.0 + if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): + schema = StructType([StructField("binary", BinaryType(), True)]) + df = self.spark.createDataFrame([(None,)], schema=schema) + with QuietTest(self.sc): + with self.assertRaisesRegexp(Exception, 'Unsupported type.*BinaryType'): + df.toPandas() + def test_null_conversion(self): df_null = self.spark.createDataFrame([tuple([None for _ in range(len(self.data[0]))])] + self.data) @@ -4232,19 +4252,22 @@ def test_createDataFrame_with_schema(self): def test_createDataFrame_with_incorrect_schema(self): pdf = self.create_pandas_data_frame() - wrong_schema = StructType(list(reversed(self.schema))) + fields = list(self.schema) + fields[0], fields[7] = fields[7], fields[0] # swap str with timestamp + wrong_schema = StructType(fields) with QuietTest(self.sc): with self.assertRaisesRegexp(Exception, ".*No cast.*string.*timestamp.*"): self.spark.createDataFrame(pdf, schema=wrong_schema) def test_createDataFrame_with_names(self): pdf = self.create_pandas_data_frame() + new_names = list(map(str, range(len(self.schema.fieldNames())))) # Test that schema as a list of column names gets applied - df = self.spark.createDataFrame(pdf, schema=list('abcdefgh')) - self.assertEquals(df.schema.fieldNames(), list('abcdefgh')) + df = self.spark.createDataFrame(pdf, schema=list(new_names)) + self.assertEquals(df.schema.fieldNames(), list(new_names)) # Test that schema as tuple of column names gets applied - df = self.spark.createDataFrame(pdf, schema=tuple('abcdefgh')) - self.assertEquals(df.schema.fieldNames(), list('abcdefgh')) + df = self.spark.createDataFrame(pdf, schema=tuple(new_names)) + self.assertEquals(df.schema.fieldNames(), list(new_names)) def test_createDataFrame_column_name_encoding(self): import pandas as pd @@ -4331,13 +4354,22 @@ def test_createDataFrame_fallback_enabled(self): self.assertEqual(df.collect(), [Row(a={u'a': 1})]) def test_createDataFrame_fallback_disabled(self): + from distutils.version import LooseVersion import pandas as pd + import pyarrow as pa with QuietTest(self.sc): with self.assertRaisesRegexp(TypeError, 'Unsupported type'): self.spark.createDataFrame( pd.DataFrame([[{u'a': 1}]]), "a: map") + # TODO: remove BinaryType check once minimum pyarrow version is 0.10.0 + if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): + with QuietTest(self.sc): + with self.assertRaisesRegexp(TypeError, 'Unsupported type.*BinaryType'): + self.spark.createDataFrame( + pd.DataFrame([[{'a': b'aaa'}]]), "a: binary") + # Regression test for SPARK-23314 def test_timestamp_dst(self): import pandas as pd @@ -4729,6 +4761,24 @@ def test_vectorized_udf_datatype_string(self): bool_f(col('bool'))) self.assertEquals(df.collect(), res.collect()) + def test_vectorized_udf_null_binary(self): + from distutils.version import LooseVersion + import pyarrow as pa + from pyspark.sql.functions import pandas_udf, col + if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): + with QuietTest(self.sc): + with self.assertRaisesRegexp( + NotImplementedError, + 'Invalid returnType.*scalar Pandas UDF.*BinaryType'): + pandas_udf(lambda x: x, BinaryType()) + else: + data = [(bytearray(b"a"),), (None,), (bytearray(b"bb"),), (bytearray(b"ccc"),)] + schema = StructType().add("binary", BinaryType()) + df = self.spark.createDataFrame(data, schema) + str_f = pandas_udf(lambda x: x, BinaryType()) + res = df.select(str_f(col('binary'))) + self.assertEquals(df.collect(), res.collect()) + def test_vectorized_udf_array_type(self): from pyspark.sql.functions import pandas_udf, col data = [([1, 2],), ([3, 4],)] @@ -4835,12 +4885,6 @@ def test_vectorized_udf_unsupported_types(self): 'Invalid returnType.*scalar Pandas UDF.*MapType'): pandas_udf(lambda x: x, MapType(StringType(), IntegerType())) - with QuietTest(self.sc): - with self.assertRaisesRegexp( - NotImplementedError, - 'Invalid returnType.*scalar Pandas UDF.*BinaryType'): - pandas_udf(lambda x: x, BinaryType()) - def test_vectorized_udf_dates(self): from pyspark.sql.functions import pandas_udf, col from datetime import date diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 214d8fe6bbbb6..0b61707c8cc0a 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1578,6 +1578,7 @@ def convert(self, obj, gateway_client): def to_arrow_type(dt): """ Convert Spark data type to pyarrow type """ + from distutils.version import LooseVersion import pyarrow as pa if type(dt) == BooleanType: arrow_type = pa.bool_() @@ -1597,6 +1598,12 @@ def to_arrow_type(dt): arrow_type = pa.decimal128(dt.precision, dt.scale) elif type(dt) == StringType: arrow_type = pa.string() + elif type(dt) == BinaryType: + # TODO: remove version check once minimum pyarrow version is 0.10.0 + if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): + raise TypeError("Unsupported type in conversion to Arrow: " + str(dt) + + "\nPlease install pyarrow >= 0.10.0 for BinaryType support.") + arrow_type = pa.binary() elif type(dt) == DateType: arrow_type = pa.date32() elif type(dt) == TimestampType: @@ -1623,6 +1630,8 @@ def to_arrow_schema(schema): def from_arrow_type(at): """ Convert pyarrow type to Spark data type. """ + from distutils.version import LooseVersion + import pyarrow as pa import pyarrow.types as types if types.is_boolean(at): spark_type = BooleanType() @@ -1642,6 +1651,12 @@ def from_arrow_type(at): spark_type = DecimalType(precision=at.precision, scale=at.scale) elif types.is_string(at): spark_type = StringType() + elif types.is_binary(at): + # TODO: remove version check once minimum pyarrow version is 0.10.0 + if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): + raise TypeError("Unsupported type in conversion from Arrow: " + str(at) + + "\nPlease install pyarrow >= 0.10.0 for BinaryType support.") + spark_type = BinaryType() elif types.is_date32(at): spark_type = DateType() elif types.is_timestamp(at): diff --git a/python/run-tests.py b/python/run-tests.py index 4c90926cfa350..dfebc0800b777 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -162,7 +162,7 @@ def run_individual_python_test(target_dir, test_name, pyspark_python): def get_default_python_executables(): - python_execs = [x for x in ["python2.7", "python3.4", "pypy"] if which(x)] + python_execs = [x for x in ["python2.7", "python3.5", "pypy"] if which(x)] if "python2.7" not in python_execs: LOGGER.warning("Not testing against `python2.7` because it could not be found; falling" " back to `python` instead")