Skip to content

Commit

Permalink
[SPARK-24044][PYTHON] Explicitly print out skipped tests from unittes…
Browse files Browse the repository at this point in the history
…t module

## What changes were proposed in this pull request?

This PR proposes to remove duplicated dependency checking logics and also print out skipped tests from unittests.

For example, as below:

```
Skipped tests in pyspark.sql.tests with pypy:
    test_createDataFrame_column_name_encoding (pyspark.sql.tests.ArrowTests) ... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.'
    test_createDataFrame_does_not_modify_input (pyspark.sql.tests.ArrowTests) ... skipped 'Pandas >= 0.19.2 must be installed; however, it was not found.'
...

Skipped tests in pyspark.sql.tests with python3:
    test_createDataFrame_column_name_encoding (pyspark.sql.tests.ArrowTests) ... skipped 'PyArrow >= 0.8.0 must be installed; however, it was not found.'
    test_createDataFrame_does_not_modify_input (pyspark.sql.tests.ArrowTests) ... skipped 'PyArrow >= 0.8.0 must be installed; however, it was not found.'
...
```

Currently, it's not printed out in the console. I think we should better print out skipped tests in the console.

## How was this patch tested?

Manually tested. Also, fortunately, Jenkins has good environment to test the skipped output.

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21107 from HyukjinKwon/skipped-tests-print.
  • Loading branch information
HyukjinKwon authored and BryanCutler committed Apr 26, 2018
1 parent 4f1e386 commit f7435be
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 104 deletions.
16 changes: 11 additions & 5 deletions python/pyspark/ml/tests.py
Expand Up @@ -2136,17 +2136,23 @@ class ImageReaderTest2(PySparkTestCase):
@classmethod
def setUpClass(cls):
super(ImageReaderTest2, cls).setUpClass()
cls.hive_available = True
# Note that here we enable Hive's support.
cls.spark = None
try:
cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
except py4j.protocol.Py4JError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
cls.hive_available = False
except TypeError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
cls.spark = HiveContext._createForTesting(cls.sc)
cls.hive_available = False
if cls.hive_available:
cls.spark = HiveContext._createForTesting(cls.sc)

def setUp(self):
if not self.hive_available:
self.skipTest("Hive is not available.")

@classmethod
def tearDownClass(cls):
Expand Down Expand Up @@ -2662,6 +2668,6 @@ def testDefaultFitMultiple(self):
if __name__ == "__main__":
from pyspark.ml.tests import *
if xmlrunner:
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'), verbosity=2)
else:
unittest.main()
unittest.main(verbosity=2)
4 changes: 2 additions & 2 deletions python/pyspark/mllib/tests.py
Expand Up @@ -1767,9 +1767,9 @@ def test_pca(self):
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")
if xmlrunner:
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'), verbosity=2)
else:
unittest.main()
unittest.main(verbosity=2)
if not _have_scipy:
print("NOTE: SciPy tests were skipped as it does not seem to be installed")
sc.stop()
51 changes: 30 additions & 21 deletions python/pyspark/sql/tests.py
Expand Up @@ -3096,23 +3096,28 @@ def setUpClass(cls):
filename_pattern = (
"sql/core/target/scala-*/test-classes/org/apache/spark/sql/"
"TestQueryExecutionListener.class")
if not glob.glob(os.path.join(SPARK_HOME, filename_pattern)):
raise unittest.SkipTest(
cls.has_listener = bool(glob.glob(os.path.join(SPARK_HOME, filename_pattern)))

if cls.has_listener:
# Note that 'spark.sql.queryExecutionListeners' is a static immutable configuration.
cls.spark = SparkSession.builder \
.master("local[4]") \
.appName(cls.__name__) \
.config(
"spark.sql.queryExecutionListeners",
"org.apache.spark.sql.TestQueryExecutionListener") \
.getOrCreate()

def setUp(self):
if not self.has_listener:
raise self.skipTest(
"'org.apache.spark.sql.TestQueryExecutionListener' is not "
"available. Will skip the related tests.")

# Note that 'spark.sql.queryExecutionListeners' is a static immutable configuration.
cls.spark = SparkSession.builder \
.master("local[4]") \
.appName(cls.__name__) \
.config(
"spark.sql.queryExecutionListeners",
"org.apache.spark.sql.TestQueryExecutionListener") \
.getOrCreate()

@classmethod
def tearDownClass(cls):
cls.spark.stop()
if hasattr(cls, "spark"):
cls.spark.stop()

def tearDown(self):
self.spark._jvm.OnSuccessCall.clear()
Expand Down Expand Up @@ -3196,18 +3201,22 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
def setUpClass(cls):
ReusedPySparkTestCase.setUpClass()
cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
cls.hive_available = True
try:
cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
except py4j.protocol.Py4JError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
cls.hive_available = False
except TypeError:
cls.tearDownClass()
raise unittest.SkipTest("Hive is not available")
cls.hive_available = False
os.unlink(cls.tempdir.name)
cls.spark = HiveContext._createForTesting(cls.sc)
cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
cls.df = cls.sc.parallelize(cls.testData).toDF()
if cls.hive_available:
cls.spark = HiveContext._createForTesting(cls.sc)
cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
cls.df = cls.sc.parallelize(cls.testData).toDF()

def setUp(self):
if not self.hive_available:
self.skipTest("Hive is not available.")

@classmethod
def tearDownClass(cls):
Expand Down Expand Up @@ -5316,6 +5325,6 @@ def test_invalid_args(self):
if __name__ == "__main__":
from pyspark.sql.tests import *
if xmlrunner:
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'), verbosity=2)
else:
unittest.main()
unittest.main(verbosity=2)
4 changes: 2 additions & 2 deletions python/pyspark/streaming/tests.py
Expand Up @@ -1590,11 +1590,11 @@ def search_kinesis_asl_assembly_jar():
sys.stderr.write("[Running %s]\n" % (testcase))
tests = unittest.TestLoader().loadTestsFromTestCase(testcase)
if xmlrunner:
result = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=3).run(tests)
result = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2).run(tests)
if not result.wasSuccessful():
failed = True
else:
result = unittest.TextTestRunner(verbosity=3).run(tests)
result = unittest.TextTestRunner(verbosity=2).run(tests)
if not result.wasSuccessful():
failed = True
sys.exit(failed)
12 changes: 2 additions & 10 deletions python/pyspark/tests.py
Expand Up @@ -2353,15 +2353,7 @@ def test_statcounter_array(self):

if __name__ == "__main__":
from pyspark.tests import *
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")
if not _have_numpy:
print("NOTE: Skipping NumPy tests as it does not seem to be installed")
if xmlrunner:
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'), verbosity=2)
else:
unittest.main()
if not _have_scipy:
print("NOTE: SciPy tests were skipped as it does not seem to be installed")
if not _have_numpy:
print("NOTE: NumPy tests were skipped as it does not seem to be installed")
unittest.main(verbosity=2)
115 changes: 51 additions & 64 deletions python/run-tests.py
Expand Up @@ -32,6 +32,7 @@
else:
import queue as Queue
from distutils.version import LooseVersion
from multiprocessing import Manager


# Append `SPARK_HOME/dev` to the Python path so that we can import the sparktestsupport module
Expand All @@ -50,6 +51,7 @@ def print_red(text):
print('\033[31m' + text + '\033[0m')


SKIPPED_TESTS = Manager().dict()
LOG_FILE = os.path.join(SPARK_HOME, "python/unit-tests.log")
FAILURE_REPORTING_LOCK = Lock()
LOGGER = logging.getLogger()
Expand Down Expand Up @@ -109,8 +111,34 @@ def run_individual_python_test(test_name, pyspark_python):
# this code is invoked from a thread other than the main thread.
os._exit(-1)
else:
per_test_output.close()
LOGGER.info("Finished test(%s): %s (%is)", pyspark_python, test_name, duration)
skipped_counts = 0
try:
per_test_output.seek(0)
# Here expects skipped test output from unittest when verbosity level is
# 2 (or --verbose option is enabled).
decoded_lines = map(lambda line: line.decode(), iter(per_test_output))
skipped_tests = list(filter(
lambda line: re.search('test_.* \(pyspark\..*\) ... skipped ', line),
decoded_lines))
skipped_counts = len(skipped_tests)
if skipped_counts > 0:
key = (pyspark_python, test_name)
SKIPPED_TESTS[key] = skipped_tests
per_test_output.close()
except:
import traceback
print_red("\nGot an exception while trying to store "
"skipped test output:\n%s" % traceback.format_exc())
# Here, we use os._exit() instead of sys.exit() in order to force Python to exit even if
# this code is invoked from a thread other than the main thread.
os._exit(-1)
if skipped_counts != 0:
LOGGER.info(
"Finished test(%s): %s (%is) ... %s tests were skipped", pyspark_python, test_name,
duration, skipped_counts)
else:
LOGGER.info(
"Finished test(%s): %s (%is)", pyspark_python, test_name, duration)


def get_default_python_executables():
Expand Down Expand Up @@ -152,65 +180,17 @@ def parse_opts():
return opts


def _check_dependencies(python_exec, modules_to_test):
if "COVERAGE_PROCESS_START" in os.environ:
# Make sure if coverage is installed.
try:
subprocess_check_output(
[python_exec, "-c", "import coverage"],
stderr=open(os.devnull, 'w'))
except:
print_red("Coverage is not installed in Python executable '%s' "
"but 'COVERAGE_PROCESS_START' environment variable is set, "
"exiting." % python_exec)
sys.exit(-1)

# If we should test 'pyspark-sql', it checks if PyArrow and Pandas are installed and
# explicitly prints out. See SPARK-23300.
if pyspark_sql in modules_to_test:
# TODO(HyukjinKwon): Relocate and deduplicate these version specifications.
minimum_pyarrow_version = '0.8.0'
minimum_pandas_version = '0.19.2'

try:
pyarrow_version = subprocess_check_output(
[python_exec, "-c", "import pyarrow; print(pyarrow.__version__)"],
universal_newlines=True,
stderr=open(os.devnull, 'w')).strip()
if LooseVersion(pyarrow_version) >= LooseVersion(minimum_pyarrow_version):
LOGGER.info("Will test PyArrow related features against Python executable "
"'%s' in '%s' module." % (python_exec, pyspark_sql.name))
else:
LOGGER.warning(
"Will skip PyArrow related features against Python executable "
"'%s' in '%s' module. PyArrow >= %s is required; however, PyArrow "
"%s was found." % (
python_exec, pyspark_sql.name, minimum_pyarrow_version, pyarrow_version))
except:
LOGGER.warning(
"Will skip PyArrow related features against Python executable "
"'%s' in '%s' module. PyArrow >= %s is required; however, PyArrow "
"was not found." % (python_exec, pyspark_sql.name, minimum_pyarrow_version))

try:
pandas_version = subprocess_check_output(
[python_exec, "-c", "import pandas; print(pandas.__version__)"],
universal_newlines=True,
stderr=open(os.devnull, 'w')).strip()
if LooseVersion(pandas_version) >= LooseVersion(minimum_pandas_version):
LOGGER.info("Will test Pandas related features against Python executable "
"'%s' in '%s' module." % (python_exec, pyspark_sql.name))
else:
LOGGER.warning(
"Will skip Pandas related features against Python executable "
"'%s' in '%s' module. Pandas >= %s is required; however, Pandas "
"%s was found." % (
python_exec, pyspark_sql.name, minimum_pandas_version, pandas_version))
except:
LOGGER.warning(
"Will skip Pandas related features against Python executable "
"'%s' in '%s' module. Pandas >= %s is required; however, Pandas "
"was not found." % (python_exec, pyspark_sql.name, minimum_pandas_version))
def _check_coverage(python_exec):
# Make sure if coverage is installed.
try:
subprocess_check_output(
[python_exec, "-c", "import coverage"],
stderr=open(os.devnull, 'w'))
except:
print_red("Coverage is not installed in Python executable '%s' "
"but 'COVERAGE_PROCESS_START' environment variable is set, "
"exiting." % python_exec)
sys.exit(-1)


def main():
Expand All @@ -237,9 +217,10 @@ def main():

task_queue = Queue.PriorityQueue()
for python_exec in python_execs:
# Check if the python executable has proper dependencies installed to run tests
# for given modules properly.
_check_dependencies(python_exec, modules_to_test)
# Check if the python executable has coverage installed when 'COVERAGE_PROCESS_START'
# environmental variable is set.
if "COVERAGE_PROCESS_START" in os.environ:
_check_coverage(python_exec)

python_implementation = subprocess_check_output(
[python_exec, "-c", "import platform; print(platform.python_implementation())"],
Expand Down Expand Up @@ -281,6 +262,12 @@ def process_queue(task_queue):
total_duration = time.time() - start_time
LOGGER.info("Tests passed in %i seconds", total_duration)

for key, lines in sorted(SKIPPED_TESTS.items()):
pyspark_python, test_name = key
LOGGER.info("\nSkipped tests in %s with %s:" % (test_name, pyspark_python))
for line in lines:
LOGGER.info(" %s" % line.rstrip())


if __name__ == "__main__":
main()

0 comments on commit f7435be

Please sign in to comment.