diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index eda0b60f8b1e7..a70c664a71fdb 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -86,7 +86,7 @@ class LogisticRegressionModel(LinearClassificationModel): ... LabeledPoint(0.0, [0.0, 1.0]), ... LabeledPoint(1.0, [1.0, 0.0]), ... ] - >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data)) + >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data), iterations=10) >>> lrm.predict([1.0, 0.0]) 1 >>> lrm.predict([0.0, 1.0]) @@ -95,7 +95,7 @@ class LogisticRegressionModel(LinearClassificationModel): [1, 0] >>> lrm.clearThreshold() >>> lrm.predict([0.0, 1.0]) - 0.123... + 0.279... >>> sparse_data = [ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})), @@ -103,7 +103,7 @@ class LogisticRegressionModel(LinearClassificationModel): ... LabeledPoint(0.0, SparseVector(2, {0: 1.0})), ... LabeledPoint(1.0, SparseVector(2, {1: 2.0})) ... ] - >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data)) + >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data), iterations=10) >>> lrm.predict(array([0.0, 1.0])) 1 >>> lrm.predict(array([1.0, 0.0])) @@ -129,7 +129,8 @@ class LogisticRegressionModel(LinearClassificationModel): ... LabeledPoint(1.0, [1.0, 0.0, 0.0]), ... LabeledPoint(2.0, [0.0, 0.0, 1.0]) ... ] - >>> mcm = LogisticRegressionWithLBFGS.train(data=sc.parallelize(multi_class_data), numClasses=3) + >>> data = sc.parallelize(multi_class_data) + >>> mcm = LogisticRegressionWithLBFGS.train(data, iterations=10, numClasses=3) >>> mcm.predict([0.0, 0.5, 0.0]) 0 >>> mcm.predict([0.8, 0.0, 0.0]) @@ -298,7 +299,7 @@ def train(cls, data, iterations=100, initialWeights=None, regParam=0.01, regType ... LabeledPoint(0.0, [0.0, 1.0]), ... LabeledPoint(1.0, [1.0, 0.0]), ... ] - >>> lrm = LogisticRegressionWithLBFGS.train(sc.parallelize(data)) + >>> lrm = LogisticRegressionWithLBFGS.train(sc.parallelize(data), iterations=10) >>> lrm.predict([1.0, 0.0]) 1 >>> lrm.predict([0.0, 1.0]) @@ -330,14 +331,14 @@ class SVMModel(LinearClassificationModel): ... LabeledPoint(1.0, [2.0]), ... LabeledPoint(1.0, [3.0]) ... ] - >>> svm = SVMWithSGD.train(sc.parallelize(data)) + >>> svm = SVMWithSGD.train(sc.parallelize(data), iterations=10) >>> svm.predict([1.0]) 1 >>> svm.predict(sc.parallelize([[1.0]])).collect() [1] >>> svm.clearThreshold() >>> svm.predict(array([1.0])) - 1.25... + 1.44... >>> sparse_data = [ ... LabeledPoint(0.0, SparseVector(2, {0: -1.0})), @@ -345,7 +346,7 @@ class SVMModel(LinearClassificationModel): ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})), ... LabeledPoint(1.0, SparseVector(2, {1: 2.0})) ... ] - >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data)) + >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data), iterations=10) >>> svm.predict(SparseVector(2, {1: 1.0})) 1 >>> svm.predict(SparseVector(2, {0: -1.0})) diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py index a0117c57133ae..4bc6351bdf02f 100644 --- a/python/pyspark/mllib/regression.py +++ b/python/pyspark/mllib/regression.py @@ -108,7 +108,8 @@ class LinearRegressionModel(LinearRegressionModelBase): ... LabeledPoint(3.0, [2.0]), ... LabeledPoint(2.0, [3.0]) ... ] - >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=np.array([1.0])) + >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10, + ... initialWeights=np.array([1.0])) >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5 True >>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5 @@ -135,12 +136,13 @@ class LinearRegressionModel(LinearRegressionModelBase): ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})), ... LabeledPoint(2.0, SparseVector(1, {0: 3.0})) ... ] - >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) + >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10, + ... initialWeights=array([1.0])) >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 True >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 True - >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=100, step=1.0, + >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10, step=1.0, ... miniBatchFraction=1.0, initialWeights=array([1.0]), regParam=0.1, regType="l2", ... intercept=True, validateData=True) >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 @@ -238,7 +240,7 @@ class LassoModel(LinearRegressionModelBase): ... LabeledPoint(3.0, [2.0]), ... LabeledPoint(2.0, [3.0]) ... ] - >>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) + >>> lrm = LassoWithSGD.train(sc.parallelize(data), iterations=10, initialWeights=array([1.0])) >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5 True >>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5 @@ -265,12 +267,13 @@ class LassoModel(LinearRegressionModelBase): ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})), ... LabeledPoint(2.0, SparseVector(1, {0: 3.0})) ... ] - >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) + >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10, + ... initialWeights=array([1.0])) >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5 True >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 True - >>> lrm = LassoWithSGD.train(sc.parallelize(data), iterations=100, step=1.0, + >>> lrm = LassoWithSGD.train(sc.parallelize(data), iterations=10, step=1.0, ... regParam=0.01, miniBatchFraction=1.0, initialWeights=array([1.0]), intercept=True, ... validateData=True) >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5 @@ -321,7 +324,8 @@ class RidgeRegressionModel(LinearRegressionModelBase): ... LabeledPoint(3.0, [2.0]), ... LabeledPoint(2.0, [3.0]) ... ] - >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) + >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), iterations=10, + ... initialWeights=array([1.0])) >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5 True >>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5 @@ -348,12 +352,13 @@ class RidgeRegressionModel(LinearRegressionModelBase): ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})), ... LabeledPoint(2.0, SparseVector(1, {0: 3.0})) ... ] - >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) + >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10, + ... initialWeights=array([1.0])) >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5 True >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 True - >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), iterations=100, step=1.0, + >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), iterations=10, step=1.0, ... regParam=0.01, miniBatchFraction=1.0, initialWeights=array([1.0]), intercept=True, ... validateData=True) >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5 @@ -396,7 +401,7 @@ def _test(): from pyspark import SparkContext import pyspark.mllib.regression globs = pyspark.mllib.regression.__dict__.copy() - globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) + globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 849c88341a967..1487470f56d43 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -36,6 +36,7 @@ else: import unittest +from pyspark import SparkContext from pyspark.mllib.common import _to_java_object_rdd from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\ DenseMatrix, SparseMatrix, Vectors, Matrices @@ -47,7 +48,6 @@ from pyspark.mllib.feature import StandardScaler from pyspark.serializers import PickleSerializer from pyspark.sql import SQLContext -from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase _have_scipy = False try: @@ -58,6 +58,12 @@ pass ser = PickleSerializer() +sc = SparkContext('local[4]', "MLlib tests") + + +class MLlibTestCase(unittest.TestCase): + def setUp(self): + self.sc = sc def _squared_distance(a, b): @@ -67,7 +73,7 @@ def _squared_distance(a, b): return b.squared_distance(a) -class VectorTests(PySparkTestCase): +class VectorTests(MLlibTestCase): def _test_serialize(self, v): self.assertEqual(v, ser.loads(ser.dumps(v))) @@ -196,7 +202,7 @@ def test_sparse_matrix(self): self.assertTrue(array_equal(sm1t.toArray(), expected)) -class ListTests(PySparkTestCase): +class ListTests(MLlibTestCase): """ Test MLlib algorithms on plain lists, to make sure they're passed through @@ -239,7 +245,7 @@ def test_gmm(self): [-6, -7], ]) clusters = GaussianMixture.train(data, 2, convergenceTol=0.001, - maxIterations=100, seed=56) + maxIterations=10, seed=56) labels = clusters.predict(data).collect() self.assertEquals(labels[0], labels[1]) self.assertEquals(labels[2], labels[3]) @@ -250,9 +256,9 @@ def test_gmm_deterministic(self): y = range(0, 100, 10) data = self.sc.parallelize([[a, b] for a, b in zip(x, y)]) clusters1 = GaussianMixture.train(data, 5, convergenceTol=0.001, - maxIterations=100, seed=63) + maxIterations=10, seed=63) clusters2 = GaussianMixture.train(data, 5, convergenceTol=0.001, - maxIterations=100, seed=63) + maxIterations=10, seed=63) for c1, c2 in zip(clusters1.weights, clusters2.weights): self.assertEquals(round(c1, 7), round(c2, 7)) @@ -271,13 +277,13 @@ def test_classification(self): temp_dir = tempfile.mkdtemp() - lr_model = LogisticRegressionWithSGD.train(rdd) + lr_model = LogisticRegressionWithSGD.train(rdd, iterations=10) self.assertTrue(lr_model.predict(features[0]) <= 0) self.assertTrue(lr_model.predict(features[1]) > 0) self.assertTrue(lr_model.predict(features[2]) <= 0) self.assertTrue(lr_model.predict(features[3]) > 0) - svm_model = SVMWithSGD.train(rdd) + svm_model = SVMWithSGD.train(rdd, iterations=10) self.assertTrue(svm_model.predict(features[0]) <= 0) self.assertTrue(svm_model.predict(features[1]) > 0) self.assertTrue(svm_model.predict(features[2]) <= 0) @@ -291,7 +297,7 @@ def test_classification(self): categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories dt_model = DecisionTree.trainClassifier( - rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo) + rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo, maxBins=4) self.assertTrue(dt_model.predict(features[0]) <= 0) self.assertTrue(dt_model.predict(features[1]) > 0) self.assertTrue(dt_model.predict(features[2]) <= 0) @@ -303,7 +309,8 @@ def test_classification(self): self.assertEqual(same_dt_model.toDebugString(), dt_model.toDebugString()) rf_model = RandomForest.trainClassifier( - rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=100) + rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=10, + maxBins=4) self.assertTrue(rf_model.predict(features[0]) <= 0) self.assertTrue(rf_model.predict(features[1]) > 0) self.assertTrue(rf_model.predict(features[2]) <= 0) @@ -315,7 +322,7 @@ def test_classification(self): self.assertEqual(same_rf_model.toDebugString(), rf_model.toDebugString()) gbt_model = GradientBoostedTrees.trainClassifier( - rdd, categoricalFeaturesInfo=categoricalFeaturesInfo) + rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4) self.assertTrue(gbt_model.predict(features[0]) <= 0) self.assertTrue(gbt_model.predict(features[1]) > 0) self.assertTrue(gbt_model.predict(features[2]) <= 0) @@ -344,19 +351,19 @@ def test_regression(self): rdd = self.sc.parallelize(data) features = [p.features.tolist() for p in data] - lr_model = LinearRegressionWithSGD.train(rdd) + lr_model = LinearRegressionWithSGD.train(rdd, iterations=10) self.assertTrue(lr_model.predict(features[0]) <= 0) self.assertTrue(lr_model.predict(features[1]) > 0) self.assertTrue(lr_model.predict(features[2]) <= 0) self.assertTrue(lr_model.predict(features[3]) > 0) - lasso_model = LassoWithSGD.train(rdd) + lasso_model = LassoWithSGD.train(rdd, iterations=10) self.assertTrue(lasso_model.predict(features[0]) <= 0) self.assertTrue(lasso_model.predict(features[1]) > 0) self.assertTrue(lasso_model.predict(features[2]) <= 0) self.assertTrue(lasso_model.predict(features[3]) > 0) - rr_model = RidgeRegressionWithSGD.train(rdd) + rr_model = RidgeRegressionWithSGD.train(rdd, iterations=10) self.assertTrue(rr_model.predict(features[0]) <= 0) self.assertTrue(rr_model.predict(features[1]) > 0) self.assertTrue(rr_model.predict(features[2]) <= 0) @@ -364,35 +371,35 @@ def test_regression(self): categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories dt_model = DecisionTree.trainRegressor( - rdd, categoricalFeaturesInfo=categoricalFeaturesInfo) + rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, maxBins=4) self.assertTrue(dt_model.predict(features[0]) <= 0) self.assertTrue(dt_model.predict(features[1]) > 0) self.assertTrue(dt_model.predict(features[2]) <= 0) self.assertTrue(dt_model.predict(features[3]) > 0) rf_model = RandomForest.trainRegressor( - rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=100, seed=1) + rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=10, maxBins=4, seed=1) self.assertTrue(rf_model.predict(features[0]) <= 0) self.assertTrue(rf_model.predict(features[1]) > 0) self.assertTrue(rf_model.predict(features[2]) <= 0) self.assertTrue(rf_model.predict(features[3]) > 0) gbt_model = GradientBoostedTrees.trainRegressor( - rdd, categoricalFeaturesInfo=categoricalFeaturesInfo) + rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4) self.assertTrue(gbt_model.predict(features[0]) <= 0) self.assertTrue(gbt_model.predict(features[1]) > 0) self.assertTrue(gbt_model.predict(features[2]) <= 0) self.assertTrue(gbt_model.predict(features[3]) > 0) try: - LinearRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0])) - LassoWithSGD.train(rdd, initialWeights=array([1.0, 1.0])) - RidgeRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0])) + LinearRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), iterations=10) + LassoWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), iterations=10) + RidgeRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), iterations=10) except ValueError: self.fail() -class StatTests(PySparkTestCase): +class StatTests(MLlibTestCase): # SPARK-4023 def test_col_with_different_rdds(self): # numpy @@ -422,7 +429,7 @@ def test_col_norms(self): self.assertTrue(math.fabs(summary2.normL2()[0] - expectedNormL2) < 1e-14) -class VectorUDTTests(PySparkTestCase): +class VectorUDTTests(MLlibTestCase): dv0 = DenseVector([]) dv1 = DenseVector([1.0, 2.0]) @@ -456,7 +463,7 @@ def test_infer_schema(self): @unittest.skipIf(not _have_scipy, "SciPy not installed") -class SciPyTests(PySparkTestCase): +class SciPyTests(MLlibTestCase): """ Test both vector operations and MLlib algorithms with SciPy sparse matrices, @@ -597,7 +604,7 @@ def test_regression(self): self.assertTrue(dt_model.predict(features[3]) > 0) -class ChiSqTestTests(PySparkTestCase): +class ChiSqTestTests(MLlibTestCase): def test_goodness_of_fit(self): from numpy import inf @@ -695,13 +702,13 @@ def test_right_number_of_results(self): self.assertIsNotNone(chi[1000]) -class SerDeTest(PySparkTestCase): +class SerDeTest(MLlibTestCase): def test_to_java_object_rdd(self): # SPARK-6660 data = RandomRDDs.uniformRDD(self.sc, 10, 5, seed=0) self.assertEqual(_to_java_object_rdd(data).count(), 10) -class FeatureTest(PySparkTestCase): +class FeatureTest(MLlibTestCase): def test_idf_model(self): data = [ Vectors.dense([1, 2, 6, 0, 2, 3, 1, 1, 0, 0, 3]), @@ -714,13 +721,8 @@ def test_idf_model(self): self.assertEqual(len(idf), 11) -class Word2VecTests(PySparkTestCase): +class Word2VecTests(MLlibTestCase): def test_word2vec_setters(self): - data = [ - ["I", "have", "a", "pen"], - ["I", "like", "soccer", "very", "much"], - ["I", "live", "in", "Tokyo"] - ] model = Word2Vec() \ .setVectorSize(2) \ .setLearningRate(0.01) \ @@ -749,7 +751,7 @@ def test_word2vec_get_vectors(self): self.assertEquals(len(model.getVectors()), 3) -class StandardScalerTests(PySparkTestCase): +class StandardScalerTests(MLlibTestCase): def test_model_setters(self): data = [ [1.0, 2.0, 3.0], @@ -777,3 +779,4 @@ def test_model_transform(self): unittest.main() if not _have_scipy: print("NOTE: SciPy tests were skipped as it does not seem to be installed") + sc.stop() diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py index 0fe6e4fabe43a..cfcbea573fd22 100644 --- a/python/pyspark/mllib/tree.py +++ b/python/pyspark/mllib/tree.py @@ -482,13 +482,13 @@ def trainClassifier(cls, data, categoricalFeaturesInfo, ... LabeledPoint(1.0, [3.0]) ... ] >>> - >>> model = GradientBoostedTrees.trainClassifier(sc.parallelize(data), {}) + >>> model = GradientBoostedTrees.trainClassifier(sc.parallelize(data), {}, numIterations=10) >>> model.numTrees() - 100 + 10 >>> model.totalNumNodes() - 300 + 30 >>> print(model) # it already has newline - TreeEnsembleModel classifier with 100 trees + TreeEnsembleModel classifier with 10 trees >>> model.predict([2.0]) 1.0 @@ -541,11 +541,12 @@ def trainRegressor(cls, data, categoricalFeaturesInfo, ... LabeledPoint(1.0, SparseVector(2, {1: 2.0})) ... ] >>> - >>> model = GradientBoostedTrees.trainRegressor(sc.parallelize(sparse_data), {}) + >>> data = sc.parallelize(sparse_data) + >>> model = GradientBoostedTrees.trainRegressor(data, {}, numIterations=10) >>> model.numTrees() - 100 + 10 >>> model.totalNumNodes() - 102 + 12 >>> model.predict(SparseVector(2, {1: 1.0})) 1.0 >>> model.predict(SparseVector(2, {0: 1.0})) diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index b54baa57ec28a..1d0b16cade8bb 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -486,7 +486,7 @@ def sorted(self, iterator, key=None, reverse=False): goes above the limit. """ global MemoryBytesSpilled, DiskBytesSpilled - batch, limit = 100, self._next_limit() + batch, limit = 100, self.memory_limit chunks, current_chunk = [], [] iterator = iter(iterator) while True: @@ -497,7 +497,7 @@ def sorted(self, iterator, key=None, reverse=False): break used_memory = get_used_memory() - if used_memory > self.memory_limit: + if used_memory > limit: # sort them inplace will save memory current_chunk.sort(key=key, reverse=reverse) path = self._get_path(len(chunks)) @@ -513,13 +513,14 @@ def load(f): chunks.append(load(open(path, 'rb'))) current_chunk = [] gc.collect() + batch //= 2 limit = self._next_limit() MemoryBytesSpilled += (used_memory - get_used_memory()) << 20 DiskBytesSpilled += os.path.getsize(path) os.unlink(path) # data will be deleted after close elif not chunks: - batch = min(batch * 2, 10000) + batch = min(int(batch * 1.5), 10000) current_chunk.sort(key=key, reverse=reverse) if not chunks: diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 23e84283679e1..fe43c374f1cb1 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -109,7 +109,7 @@ def setUpClass(cls): os.unlink(cls.tempdir.name) cls.sqlCtx = SQLContext(cls.sc) cls.testData = [Row(key=i, value=str(i)) for i in range(100)] - rdd = cls.sc.parallelize(cls.testData) + rdd = cls.sc.parallelize(cls.testData, 2) cls.df = rdd.toDF() @classmethod @@ -303,7 +303,7 @@ def test_apply_schema(self): abstract = "byte1 short1 float1 time1 map1{} struct1(b) list1[]" schema = _parse_schema_abstract(abstract) typedSchema = _infer_schema_type(rdd.first(), schema) - df = self.sqlCtx.applySchema(rdd, typedSchema) + df = self.sqlCtx.createDataFrame(rdd, typedSchema) r = (127, -32768, 1.0, datetime(2010, 1, 1, 1, 1, 1), {"a": 1}, Row(b=2), [1, 2, 3]) self.assertEqual(r, tuple(df.first())) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 33f958a601f3a..5fa1e5ef081ab 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -16,14 +16,23 @@ # import os +import sys from itertools import chain import time import operator -import unittest import tempfile import struct from functools import reduce +if sys.version_info[:2] <= (2, 6): + try: + import unittest2 as unittest + except ImportError: + sys.stderr.write('Please install unittest2 to test with Python 2.6 or earlier') + sys.exit(1) +else: + import unittest + from pyspark.context import SparkConf, SparkContext, RDD from pyspark.streaming.context import StreamingContext from pyspark.streaming.kafka import KafkaUtils @@ -31,19 +40,25 @@ class PySparkStreamingTestCase(unittest.TestCase): - timeout = 20 # seconds - duration = 1 + timeout = 4 # seconds + duration = .2 - def setUp(self): - class_name = self.__class__.__name__ + @classmethod + def setUpClass(cls): + class_name = cls.__name__ conf = SparkConf().set("spark.default.parallelism", 1) - self.sc = SparkContext(appName=class_name, conf=conf) - self.sc.setCheckpointDir("/tmp") - # TODO: decrease duration to speed up tests + cls.sc = SparkContext(appName=class_name, conf=conf) + cls.sc.setCheckpointDir("/tmp") + + @classmethod + def tearDownClass(cls): + cls.sc.stop() + + def setUp(self): self.ssc = StreamingContext(self.sc, self.duration) def tearDown(self): - self.ssc.stop() + self.ssc.stop(False) def wait_for(self, result, n): start_time = time.time() @@ -363,13 +378,13 @@ def func(dstream): class WindowFunctionTests(PySparkStreamingTestCase): - timeout = 20 + timeout = 5 def test_window(self): input = [range(1), range(2), range(3), range(4), range(5)] def func(dstream): - return dstream.window(3, 1).count() + return dstream.window(.6, .2).count() expected = [[1], [3], [6], [9], [12], [9], [5]] self._test_func(input, func, expected) @@ -378,7 +393,7 @@ def test_count_by_window(self): input = [range(1), range(2), range(3), range(4), range(5)] def func(dstream): - return dstream.countByWindow(3, 1) + return dstream.countByWindow(.6, .2) expected = [[1], [3], [6], [9], [12], [9], [5]] self._test_func(input, func, expected) @@ -387,7 +402,7 @@ def test_count_by_window_large(self): input = [range(1), range(2), range(3), range(4), range(5), range(6)] def func(dstream): - return dstream.countByWindow(5, 1) + return dstream.countByWindow(1, .2) expected = [[1], [3], [6], [10], [15], [20], [18], [15], [11], [6]] self._test_func(input, func, expected) @@ -396,7 +411,7 @@ def test_count_by_value_and_window(self): input = [range(1), range(2), range(3), range(4), range(5), range(6)] def func(dstream): - return dstream.countByValueAndWindow(5, 1) + return dstream.countByValueAndWindow(1, .2) expected = [[1], [2], [3], [4], [5], [6], [6], [6], [6], [6]] self._test_func(input, func, expected) @@ -405,7 +420,7 @@ def test_group_by_key_and_window(self): input = [[('a', i)] for i in range(5)] def func(dstream): - return dstream.groupByKeyAndWindow(3, 1).mapValues(list) + return dstream.groupByKeyAndWindow(.6, .2).mapValues(list) expected = [[('a', [0])], [('a', [0, 1])], [('a', [0, 1, 2])], [('a', [1, 2, 3])], [('a', [2, 3, 4])], [('a', [3, 4])], [('a', [4])]] @@ -436,8 +451,8 @@ def test_stop_only_streaming_context(self): def test_stop_multiple_times(self): self._add_input_stream() self.ssc.start() - self.ssc.stop() - self.ssc.stop() + self.ssc.stop(False) + self.ssc.stop(False) def test_queue_stream(self): input = [list(range(i + 1)) for i in range(3)] @@ -495,10 +510,7 @@ def func(rdds): self.assertEqual([2, 3, 1], self._take(dstream, 3)) -class CheckpointTests(PySparkStreamingTestCase): - - def setUp(self): - pass +class CheckpointTests(unittest.TestCase): def test_get_or_create(self): inputd = tempfile.mkdtemp() @@ -518,12 +530,12 @@ def setup(): return ssc cpd = tempfile.mkdtemp("test_streaming_cps") - self.ssc = ssc = StreamingContext.getOrCreate(cpd, setup) + ssc = StreamingContext.getOrCreate(cpd, setup) ssc.start() def check_output(n): while not os.listdir(outputd): - time.sleep(0.1) + time.sleep(0.01) time.sleep(1) # make sure mtime is larger than the previous one with open(os.path.join(inputd, str(n)), 'w') as f: f.writelines(["%d\n" % i for i in range(10)]) @@ -553,12 +565,15 @@ def check_output(n): ssc.stop(True, True) time.sleep(1) - self.ssc = ssc = StreamingContext.getOrCreate(cpd, setup) + ssc = StreamingContext.getOrCreate(cpd, setup) ssc.start() check_output(3) + ssc.stop(True, True) class KafkaStreamTests(PySparkStreamingTestCase): + timeout = 20 # seconds + duration = 1 def setUp(self): super(KafkaStreamTests, self).setUp() diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 75f39d9e75f38..ea63a396da5b8 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -31,7 +31,6 @@ import time import zipfile import random -import itertools import threading import hashlib @@ -49,6 +48,11 @@ xrange = range basestring = str +if sys.version >= "3": + from io import StringIO +else: + from StringIO import StringIO + from pyspark.conf import SparkConf from pyspark.context import SparkContext @@ -196,7 +200,7 @@ def test_external_sort_in_rdd(self): sc = SparkContext(conf=conf) l = list(range(10240)) random.shuffle(l) - rdd = sc.parallelize(l, 2) + rdd = sc.parallelize(l, 4) self.assertEqual(sorted(l), rdd.sortBy(lambda x: x).collect()) sc.stop() @@ -300,6 +304,18 @@ def test_hash_serializer(self): hash(FlattenedValuesSerializer(PickleSerializer())) +class QuietTest(object): + def __init__(self, sc): + self.log4j = sc._jvm.org.apache.log4j + + def __enter__(self): + self.old_level = self.log4j.LogManager.getRootLogger().getLevel() + self.log4j.LogManager.getRootLogger().setLevel(self.log4j.Level.FATAL) + + def __exit__(self, exc_type, exc_val, exc_tb): + self.log4j.LogManager.getRootLogger().setLevel(self.old_level) + + class PySparkTestCase(unittest.TestCase): def setUp(self): @@ -371,15 +387,11 @@ def test_add_py_file(self): # To ensure that we're actually testing addPyFile's effects, check that # this job fails due to `userlibrary` not being on the Python path: # disable logging in log4j temporarily - log4j = self.sc._jvm.org.apache.log4j - old_level = log4j.LogManager.getRootLogger().getLevel() - log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL) - def func(x): from userlibrary import UserClass return UserClass().hello() - self.assertRaises(Exception, self.sc.parallelize(range(2)).map(func).first) - log4j.LogManager.getRootLogger().setLevel(old_level) + with QuietTest(self.sc): + self.assertRaises(Exception, self.sc.parallelize(range(2)).map(func).first) # Add the file, so the job should now succeed: path = os.path.join(SPARK_HOME, "python/test_support/userlibrary.py") @@ -496,7 +508,8 @@ def test_deleting_input_files(self): filtered_data = data.filter(lambda x: True) self.assertEqual(1, filtered_data.count()) os.unlink(tempFile.name) - self.assertRaises(Exception, lambda: filtered_data.count()) + with QuietTest(self.sc): + self.assertRaises(Exception, lambda: filtered_data.count()) def test_sampling_default_seed(self): # Test for SPARK-3995 (default seed setting) @@ -536,9 +549,9 @@ def test_namedtuple_in_rdd(self): self.assertEqual([jon, jane], theDoes.collect()) def test_large_broadcast(self): - N = 100000 + N = 10000 data = [[float(i) for i in range(300)] for i in range(N)] - bdata = self.sc.broadcast(data) # 270MB + bdata = self.sc.broadcast(data) # 27MB m = self.sc.parallelize(range(1), 1).map(lambda x: len(bdata.value)).sum() self.assertEqual(N, m) @@ -569,7 +582,7 @@ def test_multiple_broadcasts(self): self.assertEqual(checksum, csum) def test_large_closure(self): - N = 1000000 + N = 200000 data = [float(i) for i in xrange(N)] rdd = self.sc.parallelize(range(1), 1).map(lambda x: len(data)) self.assertEqual(N, rdd.first()) @@ -604,17 +617,18 @@ def test_zip_with_different_number_of_items(self): # different number of partitions b = self.sc.parallelize(range(100, 106), 3) self.assertRaises(ValueError, lambda: a.zip(b)) - # different number of batched items in JVM - b = self.sc.parallelize(range(100, 104), 2) - self.assertRaises(Exception, lambda: a.zip(b).count()) - # different number of items in one pair - b = self.sc.parallelize(range(100, 106), 2) - self.assertRaises(Exception, lambda: a.zip(b).count()) - # same total number of items, but different distributions - a = self.sc.parallelize([2, 3], 2).flatMap(range) - b = self.sc.parallelize([3, 2], 2).flatMap(range) - self.assertEqual(a.count(), b.count()) - self.assertRaises(Exception, lambda: a.zip(b).count()) + with QuietTest(self.sc): + # different number of batched items in JVM + b = self.sc.parallelize(range(100, 104), 2) + self.assertRaises(Exception, lambda: a.zip(b).count()) + # different number of items in one pair + b = self.sc.parallelize(range(100, 106), 2) + self.assertRaises(Exception, lambda: a.zip(b).count()) + # same total number of items, but different distributions + a = self.sc.parallelize([2, 3], 2).flatMap(range) + b = self.sc.parallelize([3, 2], 2).flatMap(range) + self.assertEqual(a.count(), b.count()) + self.assertRaises(Exception, lambda: a.zip(b).count()) def test_count_approx_distinct(self): rdd = self.sc.parallelize(range(1000)) @@ -877,7 +891,12 @@ def test_profiler(self): func_names = [func_name for fname, n, func_name in stat_list] self.assertTrue("heavy_foo" in func_names) + old_stdout = sys.stdout + sys.stdout = io = StringIO() self.sc.show_profiles() + self.assertTrue("heavy_foo" in io.getvalue()) + sys.stdout = old_stdout + d = tempfile.gettempdir() self.sc.dump_profiles(d) self.assertTrue("rdd_%d.pstats" % id in os.listdir(d)) @@ -901,7 +920,7 @@ def show(self, id): def do_computation(self): def heavy_foo(x): - for i in range(1 << 20): + for i in range(1 << 18): x = 1 rdd = self.sc.parallelize(range(100)) @@ -1417,7 +1436,7 @@ def test_termination_sigterm(self): self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM)) -class WorkerTests(PySparkTestCase): +class WorkerTests(ReusedPySparkTestCase): def test_cancel_task(self): temp = tempfile.NamedTemporaryFile(delete=True) temp.close() @@ -1432,7 +1451,10 @@ def sleep(x): # start job in background thread def run(): - self.sc.parallelize(range(1), 1).foreach(sleep) + try: + self.sc.parallelize(range(1), 1).foreach(sleep) + except Exception: + pass import threading t = threading.Thread(target=run) t.daemon = True @@ -1473,7 +1495,8 @@ def test_after_exception(self): def raise_exception(_): raise Exception() rdd = self.sc.parallelize(range(100), 1) - self.assertRaises(Exception, lambda: rdd.foreach(raise_exception)) + with QuietTest(self.sc): + self.assertRaises(Exception, lambda: rdd.foreach(raise_exception)) self.assertEqual(100, rdd.map(str).count()) def test_after_jvm_exception(self): @@ -1484,7 +1507,8 @@ def test_after_jvm_exception(self): filtered_data = data.filter(lambda x: True) self.assertEqual(1, filtered_data.count()) os.unlink(tempFile.name) - self.assertRaises(Exception, lambda: filtered_data.count()) + with QuietTest(self.sc): + self.assertRaises(Exception, lambda: filtered_data.count()) rdd = self.sc.parallelize(range(100), 1) self.assertEqual(100, rdd.map(str).count()) @@ -1522,14 +1546,11 @@ def test_with_different_versions_of_python(self): rdd.count() version = sys.version_info sys.version_info = (2, 0, 0) - log4j = self.sc._jvm.org.apache.log4j - old_level = log4j.LogManager.getRootLogger().getLevel() - log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL) try: - self.assertRaises(Py4JJavaError, lambda: rdd.count()) + with QuietTest(self.sc): + self.assertRaises(Py4JJavaError, lambda: rdd.count()) finally: sys.version_info = version - log4j.LogManager.getRootLogger().setLevel(old_level) class SparkSubmitTests(unittest.TestCase): @@ -1751,9 +1772,14 @@ def test_with_stop(self): def test_progress_api(self): with SparkContext() as sc: sc.setJobGroup('test_progress_api', '', True) - rdd = sc.parallelize(range(10)).map(lambda x: time.sleep(100)) - t = threading.Thread(target=rdd.collect) + + def run(): + try: + rdd.count() + except Exception: + pass + t = threading.Thread(target=run) t.daemon = True t.start() # wait for scheduler to start diff --git a/python/run-tests b/python/run-tests index ed3e819ef30c1..88b63b84fdc27 100755 --- a/python/run-tests +++ b/python/run-tests @@ -28,6 +28,7 @@ cd "$FWDIR/python" FAILED=0 LOG_FILE=unit-tests.log +START=$(date +"%s") rm -f $LOG_FILE @@ -35,8 +36,8 @@ rm -f $LOG_FILE rm -rf metastore warehouse function run_test() { - echo "Running test: $1" | tee -a $LOG_FILE - + echo -en "Running test: $1 ... " | tee -a $LOG_FILE + start=$(date +"%s") SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 > $LOG_FILE 2>&1 FAILED=$((PIPESTATUS[0]||$FAILED)) @@ -48,6 +49,9 @@ function run_test() { echo "Had test failures; see logs." echo -en "\033[0m" # No color exit -1 + else + now=$(date +"%s") + echo "ok ($(($now - $start))s)" fi } @@ -161,9 +165,8 @@ if [ $(which pypy) ]; then fi if [[ $FAILED == 0 ]]; then - echo -en "\033[32m" # Green - echo "Tests passed." - echo -en "\033[0m" # No color + now=$(date +"%s") + echo -e "\033[32mTests passed \033[0min $(($now - $START)) seconds" fi # TODO: in the long-run, it would be nice to use a test runner like `nose`.