From 421e68da3f8a121fee05aa5c903406e66e70eb20 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Mon, 10 Aug 2015 16:27:42 -0700 Subject: [PATCH 01/12] added _ssc_wait_checked for ml streaming tests --- python/pyspark/mllib/tests.py | 49 +++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 3f5a02af12e39..ca43d0fc06ebd 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -86,10 +86,21 @@ def tearDown(self): self.ssc.stop(False) @staticmethod - def _ssc_wait(start_time, end_time, sleep_time): + def _ssc_wait(start_time, end_time): while time() - start_time < end_time: sleep(0.01) + @staticmethod + def _ssc_wait_checked(start_time, end_time, term_check): + """ + :param term_check: Function which checks a termination condition. + If true, this method returns early. + """ + while time() - start_time < end_time: + if term_check(): + return + sleep(0.01) + def _squared_distance(a, b): if isinstance(a, Vector): @@ -1001,8 +1012,10 @@ def test_accuracy_for_single_center(self): t = time() self.ssc.start() - self._ssc_wait(t, 10.0, 0.01) - self.assertEquals(stkm.latestModel().clusterWeights, [25.0]) + def termCheck(): + return stkm.latestModel().clusterWeights == [25.0] + self._ssc_wait_checked(t, 20.0, termCheck) + self.assertTrue(termCheck()) realCenters = array_sum(array(centers), axis=0) for i in range(5): modelCenters = stkm.latestModel().centers[0][i] @@ -1041,10 +1054,12 @@ def test_trainOn_model(self): self.ssc.start() # Give enough time to train the model. - self._ssc_wait(t, 6.0, 0.01) - finalModel = stkm.latestModel() - self.assertTrue(all(finalModel.centers == array(initCenters))) - self.assertEquals(finalModel.clusterWeights, [5.0, 5.0, 5.0, 5.0]) + def termCheck(): + finalModel = stkm.latestModel() + all(finalModel.centers == array(initCenters)) and \ + finalModel.clusterWeight == [5.0, 5.0, 5.0, 5.0] + self._ssc_wait_checked(t, 20.0, termCheck) + self.assertTrue(termCheck()) def test_predictOn_model(self): """Test that the model predicts correctly on toy data.""" @@ -1068,7 +1083,7 @@ def update(rdd): predict_val.foreachRDD(update) t = time() self.ssc.start() - self._ssc_wait(t, 6.0, 0.01) + self._ssc_wait(t, 6.0) self.assertEquals(result, [[0], [1], [2], [3]]) def test_trainOn_predictOn(self): @@ -1097,7 +1112,7 @@ def collect(rdd): t = time() self.ssc.start() - self._ssc_wait(t, 6.0, 0.01) + self._ssc_wait(t, 6.0) self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]]) @@ -1158,7 +1173,7 @@ def test_parameter_accuracy(self): t = time() self.ssc.start() - self._ssc_wait(t, 20.0, 0.01) + self._ssc_wait(t, 20.0) rel = (1.5 - slr.latestModel().weights.array[0]) / 1.5 self.assertAlmostEqual(rel, 0.1, 1) @@ -1181,7 +1196,7 @@ def test_convergence(self): t = time() self.ssc.start() - self._ssc_wait(t, 15.0, 0.01) + self._ssc_wait(t, 15.0) t_models = array(models) diff = t_models[1:] - t_models[:-1] @@ -1210,7 +1225,7 @@ def test_predictions(self): predict_stream.foreachRDD(lambda x: true_predicted.append(x.collect())) t = time() self.ssc.start() - self._ssc_wait(t, 5.0, 0.01) + self._ssc_wait(t, 5.0) # Test that the accuracy error is no more than 0.4 on each batch. for batch in true_predicted: @@ -1244,7 +1259,7 @@ def collect_errors(rdd): t = time() self.ssc.start() - self._ssc_wait(t, 20.0, 0.01) + self._ssc_wait(t, 20.0) # Test that the improvement in error is atleast 0.3 self.assertTrue(errors[1] - errors[-1] > 0.3) @@ -1277,7 +1292,7 @@ def test_parameter_accuracy(self): t = time() slr.trainOn(input_stream) self.ssc.start() - self._ssc_wait(t, 10, 0.01) + self._ssc_wait(t, 10) self.assertArrayAlmostEqual( slr.latestModel().weights.array, [10., 10.], 1) self.assertAlmostEqual(slr.latestModel().intercept, 0.0, 1) @@ -1301,7 +1316,7 @@ def test_parameter_convergence(self): t = time() slr.trainOn(input_stream) self.ssc.start() - self._ssc_wait(t, 10, 0.01) + self._ssc_wait(t, 10) model_weights = array(model_weights) diff = model_weights[1:] - model_weights[:-1] @@ -1329,7 +1344,7 @@ def test_prediction(self): output_stream.foreachRDD(lambda x: samples.append(x.collect())) self.ssc.start() - self._ssc_wait(t, 5, 0.01) + self._ssc_wait(t, 5) # Test that mean absolute error on each batch is less than 0.1 for batch in samples: @@ -1364,7 +1379,7 @@ def func(rdd): output_stream = slr.predictOnValues(output_stream) output_stream.foreachRDD(func) self.ssc.start() - self._ssc_wait(t, 10, 0.01) + self._ssc_wait(t, 10) self.assertTrue(mean_absolute_errors[1] - mean_absolute_errors[-1] > 2) From 3c171b0aced0b7c18ff406e10b763dc3d4609598 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Mon, 10 Aug 2015 16:33:43 -0700 Subject: [PATCH 02/12] reverted small fix to make wip review easier --- python/pyspark/mllib/tests.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index ca43d0fc06ebd..6bb5c0a8ae25d 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -86,7 +86,7 @@ def tearDown(self): self.ssc.stop(False) @staticmethod - def _ssc_wait(start_time, end_time): + def _ssc_wait(start_time, end_time, sleep_time): while time() - start_time < end_time: sleep(0.01) @@ -1083,7 +1083,7 @@ def update(rdd): predict_val.foreachRDD(update) t = time() self.ssc.start() - self._ssc_wait(t, 6.0) + self._ssc_wait(t, 6.0, 0.01) self.assertEquals(result, [[0], [1], [2], [3]]) def test_trainOn_predictOn(self): @@ -1112,7 +1112,7 @@ def collect(rdd): t = time() self.ssc.start() - self._ssc_wait(t, 6.0) + self._ssc_wait(t, 6.0, 0.01) self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]]) @@ -1173,7 +1173,7 @@ def test_parameter_accuracy(self): t = time() self.ssc.start() - self._ssc_wait(t, 20.0) + self._ssc_wait(t, 20.0, 0.01) rel = (1.5 - slr.latestModel().weights.array[0]) / 1.5 self.assertAlmostEqual(rel, 0.1, 1) @@ -1196,7 +1196,7 @@ def test_convergence(self): t = time() self.ssc.start() - self._ssc_wait(t, 15.0) + self._ssc_wait(t, 15.0, 0.01) t_models = array(models) diff = t_models[1:] - t_models[:-1] @@ -1225,7 +1225,7 @@ def test_predictions(self): predict_stream.foreachRDD(lambda x: true_predicted.append(x.collect())) t = time() self.ssc.start() - self._ssc_wait(t, 5.0) + self._ssc_wait(t, 5.0, 0.01) # Test that the accuracy error is no more than 0.4 on each batch. for batch in true_predicted: @@ -1259,7 +1259,7 @@ def collect_errors(rdd): t = time() self.ssc.start() - self._ssc_wait(t, 20.0) + self._ssc_wait(t, 20.0, 0.01) # Test that the improvement in error is atleast 0.3 self.assertTrue(errors[1] - errors[-1] > 0.3) @@ -1292,7 +1292,7 @@ def test_parameter_accuracy(self): t = time() slr.trainOn(input_stream) self.ssc.start() - self._ssc_wait(t, 10) + self._ssc_wait(t, 10, 0.01) self.assertArrayAlmostEqual( slr.latestModel().weights.array, [10., 10.], 1) self.assertAlmostEqual(slr.latestModel().intercept, 0.0, 1) @@ -1316,7 +1316,7 @@ def test_parameter_convergence(self): t = time() slr.trainOn(input_stream) self.ssc.start() - self._ssc_wait(t, 10) + self._ssc_wait(t, 10, 0.01) model_weights = array(model_weights) diff = model_weights[1:] - model_weights[:-1] @@ -1344,7 +1344,7 @@ def test_prediction(self): output_stream.foreachRDD(lambda x: samples.append(x.collect())) self.ssc.start() - self._ssc_wait(t, 5) + self._ssc_wait(t, 5, 0.01) # Test that mean absolute error on each batch is less than 0.1 for batch in samples: @@ -1379,7 +1379,7 @@ def func(rdd): output_stream = slr.predictOnValues(output_stream) output_stream.foreachRDD(func) self.ssc.start() - self._ssc_wait(t, 10) + self._ssc_wait(t, 10, 0.01) self.assertTrue(mean_absolute_errors[1] - mean_absolute_errors[-1] > 2) From 3fb7c0c9c8714843c08f1e116cddd78b689e07e9 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Mon, 10 Aug 2015 17:01:19 -0700 Subject: [PATCH 03/12] something like this --- python/pyspark/mllib/tests.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 6bb5c0a8ae25d..54fa029d460fb 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -1015,7 +1015,7 @@ def test_accuracy_for_single_center(self): def termCheck(): return stkm.latestModel().clusterWeights == [25.0] self._ssc_wait_checked(t, 20.0, termCheck) - self.assertTrue(termCheck()) + self.assertEquals(stkm.latestModel().clusterWeights, [25.0]) realCenters = array_sum(array(centers), axis=0) for i in range(5): modelCenters = stkm.latestModel().centers[0][i] @@ -1040,7 +1040,7 @@ def test_trainOn_model(self): stkm.setInitialCenters( centers=initCenters, weights=[1.0, 1.0, 1.0, 1.0]) - # Create a toy dataset by setting a tiny offest for each point. + # Create a toy dataset by setting a tiny offset for each point. offsets = [[0, 0.1], [0, -0.1], [0.1, 0], [-0.1, 0]] batches = [] for offset in offsets: @@ -1060,6 +1060,9 @@ def termCheck(): finalModel.clusterWeight == [5.0, 5.0, 5.0, 5.0] self._ssc_wait_checked(t, 20.0, termCheck) self.assertTrue(termCheck()) + finalModel = stkm.latestModel() + self.assertTrue(all(finalModel.centers == array(initCenters))) + self.assertEquals(finalModel.clusterWeights, [5.0, 5.0, 5.0, 5.0]) def test_predictOn_model(self): """Test that the model predicts correctly on toy data.""" From ef49b2b6e3e95b3490fd38999e023b153272243a Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Mon, 10 Aug 2015 17:16:18 -0700 Subject: [PATCH 04/12] style fix --- python/pyspark/mllib/tests.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 54fa029d460fb..8dcdd54db9ead 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -1012,6 +1012,7 @@ def test_accuracy_for_single_center(self): t = time() self.ssc.start() + def termCheck(): return stkm.latestModel().clusterWeights == [25.0] self._ssc_wait_checked(t, 20.0, termCheck) @@ -1056,8 +1057,8 @@ def test_trainOn_model(self): # Give enough time to train the model. def termCheck(): finalModel = stkm.latestModel() - all(finalModel.centers == array(initCenters)) and \ - finalModel.clusterWeight == [5.0, 5.0, 5.0, 5.0] + return (all(finalModel.centers == array(initCenters)) and + finalModel.clusterWeight == [5.0, 5.0, 5.0, 5.0]) self._ssc_wait_checked(t, 20.0, termCheck) self.assertTrue(termCheck()) finalModel = stkm.latestModel() From ff1ee1b064cbb7590c123f699409c457084406f9 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Mon, 10 Aug 2015 18:50:27 -0700 Subject: [PATCH 05/12] fix to termCheck --- python/pyspark/mllib/tests.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 8dcdd54db9ead..055c3f04789f2 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -1057,8 +1057,11 @@ def test_trainOn_model(self): # Give enough time to train the model. def termCheck(): finalModel = stkm.latestModel() - return (all(finalModel.centers == array(initCenters)) and - finalModel.clusterWeight == [5.0, 5.0, 5.0, 5.0]) + if finalModel is not None: + return (all(finalModel.centers == array(initCenters)) and + finalModel.clusterWeight == [5.0, 5.0, 5.0, 5.0]) + else: + return False self._ssc_wait_checked(t, 20.0, termCheck) self.assertTrue(termCheck()) finalModel = stkm.latestModel() From afbe8b1179096bc29bd7ef78985f9f5161b7430a Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Tue, 11 Aug 2015 12:44:35 -0700 Subject: [PATCH 06/12] small fix --- python/pyspark/mllib/tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 055c3f04789f2..ca17742f4f627 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -1059,7 +1059,7 @@ def termCheck(): finalModel = stkm.latestModel() if finalModel is not None: return (all(finalModel.centers == array(initCenters)) and - finalModel.clusterWeight == [5.0, 5.0, 5.0, 5.0]) + finalModel.clusterWeights == [5.0, 5.0, 5.0, 5.0]) else: return False self._ssc_wait_checked(t, 20.0, termCheck) From 48f43c8ee297a3e3e654dd103271369e9c8eab22 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Tue, 11 Aug 2015 20:03:07 -0700 Subject: [PATCH 07/12] removed ssc_wait and replaced it with eventually --- python/pyspark/mllib/tests.py | 139 +++++++++++++++++++--------------- 1 file changed, 79 insertions(+), 60 deletions(-) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index ca17742f4f627..c6e501eef8008 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -32,6 +32,9 @@ from py4j.protocol import Py4JJavaError +if sys.version > '3': + basestring = str + if sys.version_info[:2] <= (2, 6): try: import unittest2 as unittest @@ -86,20 +89,25 @@ def tearDown(self): self.ssc.stop(False) @staticmethod - def _ssc_wait(start_time, end_time, sleep_time): - while time() - start_time < end_time: - sleep(0.01) - - @staticmethod - def _ssc_wait_checked(start_time, end_time, term_check): + def _eventually(condition, timeout=20.0): """ - :param term_check: Function which checks a termination condition. - If true, this method returns early. + Wait a given amount of time for a condition to be met, else fail with an error. + :param condition: Function that checks for termination conditions and throws an + AssertionError if the conditions are not met. + This is used for both checking termination conditions and + printing an error message upon timeout. + :param timeout: Number of seconds to wait. Default 20 seconds. """ - while time() - start_time < end_time: - if term_check(): + start_time = time() + lastErrorMsg = None + while time() - start_time < timeout: + try: + condition() return + except AssertionError as e: + lastErrorMsg = e sleep(0.01) + raise lastErrorMsg def _squared_distance(a, b): @@ -1010,13 +1018,12 @@ def test_accuracy_for_single_center(self): [self.sc.parallelize(batch, 1) for batch in batches]) stkm.trainOn(input_stream) - t = time() self.ssc.start() - def termCheck(): - return stkm.latestModel().clusterWeights == [25.0] - self._ssc_wait_checked(t, 20.0, termCheck) - self.assertEquals(stkm.latestModel().clusterWeights, [25.0]) + def condition(): + self.assertEquals(stkm.latestModel().clusterWeights, [25.0]) + self._eventually(condition) + realCenters = array_sum(array(centers), axis=0) for i in range(5): modelCenters = stkm.latestModel().centers[0][i] @@ -1051,22 +1058,14 @@ def test_trainOn_model(self): batches = [self.sc.parallelize(batch, 1) for batch in batches] input_stream = self.ssc.queueStream(batches) stkm.trainOn(input_stream) - t = time() self.ssc.start() # Give enough time to train the model. - def termCheck(): + def condition(): finalModel = stkm.latestModel() - if finalModel is not None: - return (all(finalModel.centers == array(initCenters)) and - finalModel.clusterWeights == [5.0, 5.0, 5.0, 5.0]) - else: - return False - self._ssc_wait_checked(t, 20.0, termCheck) - self.assertTrue(termCheck()) - finalModel = stkm.latestModel() - self.assertTrue(all(finalModel.centers == array(initCenters))) - self.assertEquals(finalModel.clusterWeights, [5.0, 5.0, 5.0, 5.0]) + self.assertTrue(all(finalModel.centers == array(initCenters))) + self.assertEquals(finalModel.clusterWeights, [5.0, 5.0, 5.0, 5.0]) + self._eventually(condition) def test_predictOn_model(self): """Test that the model predicts correctly on toy data.""" @@ -1088,10 +1087,10 @@ def update(rdd): result.append(rdd_collect) predict_val.foreachRDD(update) - t = time() self.ssc.start() - self._ssc_wait(t, 6.0, 0.01) - self.assertEquals(result, [[0], [1], [2], [3]]) + def condition(): + self.assertEquals(result, [[0], [1], [2], [3]]) + self._eventually(condition) def test_trainOn_predictOn(self): """Test that prediction happens on the updated model.""" @@ -1117,10 +1116,11 @@ def collect(rdd): predict_stream = stkm.predictOn(input_stream) predict_stream.foreachRDD(collect) - t = time() self.ssc.start() - self._ssc_wait(t, 6.0, 0.01) - self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]]) + def condition(): + self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]]) + + self._eventually(condition) class LinearDataGeneratorTests(MLlibTestCase): @@ -1178,11 +1178,12 @@ def test_parameter_accuracy(self): slr.setInitialWeights([0.0]) slr.trainOn(input_stream) - t = time() self.ssc.start() - self._ssc_wait(t, 20.0, 0.01) - rel = (1.5 - slr.latestModel().weights.array[0]) / 1.5 - self.assertAlmostEqual(rel, 0.1, 1) + def condition(): + rel = (1.5 - slr.latestModel().weights.array[0]) / 1.5 + self.assertAlmostEqual(rel, 0.1, 1) + + self._eventually(condition) def test_convergence(self): """ @@ -1201,13 +1202,16 @@ def test_convergence(self): input_stream.foreachRDD( lambda x: models.append(slr.latestModel().weights[0])) - t = time() self.ssc.start() - self._ssc_wait(t, 15.0, 0.01) + + def condition(): + self.assertEquals(len(models), len(input_batches)) + + self._eventually(condition, 60.0) + t_models = array(models) diff = t_models[1:] - t_models[:-1] - - # Test that weights improve with a small tolerance, + # Test that weights improve with a small tolerance self.assertTrue(all(diff >= -0.1)) self.assertTrue(array_sum(diff > 0) > 1) @@ -1230,9 +1234,12 @@ def test_predictions(self): predict_stream = slr.predictOnValues(input_stream) true_predicted = [] predict_stream.foreachRDD(lambda x: true_predicted.append(x.collect())) - t = time() self.ssc.start() - self._ssc_wait(t, 5.0, 0.01) + + def condition(): + self.assertEquals(len(true_predicted), len(input_batches)) + + self._eventually(condition) # Test that the accuracy error is no more than 0.4 on each batch. for batch in true_predicted: @@ -1264,11 +1271,13 @@ def collect_errors(rdd): ps = slr.predictOnValues(predict_stream) ps.foreachRDD(lambda x: collect_errors(x)) - t = time() self.ssc.start() - self._ssc_wait(t, 20.0, 0.01) - # Test that the improvement in error is atleast 0.3 + def condition(): + self.assertEquals(len(errors), len(predict_batches)) + + self._eventually(condition) + # Test that the improvement in error is > 0.3 self.assertTrue(errors[1] - errors[-1] > 0.3) @@ -1296,13 +1305,15 @@ def test_parameter_accuracy(self): batches.append(sc.parallelize(batch)) input_stream = self.ssc.queueStream(batches) - t = time() slr.trainOn(input_stream) self.ssc.start() - self._ssc_wait(t, 10, 0.01) - self.assertArrayAlmostEqual( - slr.latestModel().weights.array, [10., 10.], 1) - self.assertAlmostEqual(slr.latestModel().intercept, 0.0, 1) + + def condition(): + self.assertArrayAlmostEqual( + slr.latestModel().weights.array, [10., 10.], 1) + self.assertAlmostEqual(slr.latestModel().intercept, 0.0, 1) + + self._eventually(condition) def test_parameter_convergence(self): """Test that the model parameters improve with streaming data.""" @@ -1320,13 +1331,16 @@ def test_parameter_convergence(self): input_stream = self.ssc.queueStream(batches) input_stream.foreachRDD( lambda x: model_weights.append(slr.latestModel().weights[0])) - t = time() slr.trainOn(input_stream) self.ssc.start() - self._ssc_wait(t, 10, 0.01) - model_weights = array(model_weights) - diff = model_weights[1:] - model_weights[:-1] + def condition(): + self.assertEquals(len(model_weights), len(batches)) + + self._eventually(condition) + + w = array(model_weights) + diff = w[1:] - w[:-1] self.assertTrue(all(diff >= -0.1)) def test_prediction(self): @@ -1345,13 +1359,16 @@ def test_prediction(self): sc.parallelize(batch).map(lambda lp: (lp.label, lp.features))) input_stream = self.ssc.queueStream(batches) - t = time() output_stream = slr.predictOnValues(input_stream) samples = [] output_stream.foreachRDD(lambda x: samples.append(x.collect())) self.ssc.start() - self._ssc_wait(t, 5, 0.01) + + def condition(): + self.assertEquals(len(samples), len(batches)) + + self._eventually(condition) # Test that mean absolute error on each batch is less than 0.1 for batch in samples: @@ -1378,15 +1395,17 @@ def func(rdd): true, predicted = zip(*rdd.collect()) mean_absolute_errors.append(mean(abs(true) - abs(predicted))) - model_weights = [] input_stream = self.ssc.queueStream(batches) output_stream = self.ssc.queueStream(predict_batches) - t = time() slr.trainOn(input_stream) output_stream = slr.predictOnValues(output_stream) output_stream.foreachRDD(func) self.ssc.start() - self._ssc_wait(t, 10, 0.01) + + def condition(): + self.assertEquals(len(mean_absolute_errors), len(predict_batches)) + + self._eventually(condition) self.assertTrue(mean_absolute_errors[1] - mean_absolute_errors[-1] > 2) From 3717fc439be094b74b453d040233b7c11e838bd9 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Tue, 11 Aug 2015 20:29:36 -0700 Subject: [PATCH 08/12] style fixes --- python/pyspark/mllib/tests.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index c6e501eef8008..2b03775f6382a 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -1088,8 +1088,10 @@ def update(rdd): predict_val.foreachRDD(update) self.ssc.start() + def condition(): self.assertEquals(result, [[0], [1], [2], [3]]) + self._eventually(condition) def test_trainOn_predictOn(self): @@ -1117,6 +1119,7 @@ def collect(rdd): predict_stream.foreachRDD(collect) self.ssc.start() + def condition(): self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]]) @@ -1179,6 +1182,7 @@ def test_parameter_accuracy(self): slr.trainOn(input_stream) self.ssc.start() + def condition(): rel = (1.5 - slr.latestModel().weights.array[0]) / 1.5 self.assertAlmostEqual(rel, 0.1, 1) From 5e4932741f2fe3cbff84963effb6a55f1afb2ff9 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Tue, 11 Aug 2015 23:26:39 -0700 Subject: [PATCH 09/12] warning about condition --- python/pyspark/mllib/tests.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 2b03775f6382a..424053347cdd2 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -96,6 +96,9 @@ def _eventually(condition, timeout=20.0): AssertionError if the conditions are not met. This is used for both checking termination conditions and printing an error message upon timeout. + Note that this method must work correctly, regardless of when it is + called during the streaming execution (e.g., even before any results + have been created). :param timeout: Number of seconds to wait. Default 20 seconds. """ start_time = time() From 002e83814a6f6396a91471fcdb598f8ef8ea3319 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Wed, 12 Aug 2015 15:29:07 -0700 Subject: [PATCH 10/12] probably improved tests --- python/pyspark/mllib/tests.py | 95 +++++++++++++++++++++++------------ 1 file changed, 64 insertions(+), 31 deletions(-) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 424053347cdd2..0641aa6daa94b 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -89,28 +89,42 @@ def tearDown(self): self.ssc.stop(False) @staticmethod - def _eventually(condition, timeout=20.0): + def _eventually(condition, timeout=20.0, catch_assertions=False): """ - Wait a given amount of time for a condition to be met, else fail with an error. - :param condition: Function that checks for termination conditions and throws an - AssertionError if the conditions are not met. - This is used for both checking termination conditions and - printing an error message upon timeout. - Note that this method must work correctly, regardless of when it is - called during the streaming execution (e.g., even before any results + Wait a given amount of time for a condition to pass, else fail with an error. + This is a helper utility for streaming ML tests. + :param condition: Function that checks for termination conditions. + condition() can return: + - True: Conditions met. Return without error. + - other value: Conditions not met yet. Continue. Upon timeout, + include last such value in error message. + Note that this method may be called at any time during + streaming execution (e.g., even before any results have been created). :param timeout: Number of seconds to wait. Default 20 seconds. + :param catch_assertions: If False (default), do not catch AssertionErrors. + If True, catch AssertionErrors; continue, but save + error to throw upon timeout. """ start_time = time() - lastErrorMsg = None + lastValue = None while time() - start_time < timeout: - try: - condition() + if catch_assertions: + try: + lastValue = condition() + except AssertionError as e: + lastValue = e + else: + lastValue = condition() + if lastValue == True: # Note: This is NOT the same as "if lastValue:" return - except AssertionError as e: - lastErrorMsg = e sleep(0.01) - raise lastErrorMsg + if isinstance(lastValue, AssertionError): + raise lastValue + else: + raise AssertionError( + "Test failed due to timeout after %g sec, with last condition returning: %s"\ + % (timeout, lastValue)) def _squared_distance(a, b): @@ -1025,7 +1039,8 @@ def test_accuracy_for_single_center(self): def condition(): self.assertEquals(stkm.latestModel().clusterWeights, [25.0]) - self._eventually(condition) + return True + self._eventually(condition, catch_assertions=True) realCenters = array_sum(array(centers), axis=0) for i in range(5): @@ -1068,7 +1083,8 @@ def condition(): finalModel = stkm.latestModel() self.assertTrue(all(finalModel.centers == array(initCenters))) self.assertEquals(finalModel.clusterWeights, [5.0, 5.0, 5.0, 5.0]) - self._eventually(condition) + return True + self._eventually(condition, catch_assertions=True) def test_predictOn_model(self): """Test that the model predicts correctly on toy data.""" @@ -1094,8 +1110,9 @@ def update(rdd): def condition(): self.assertEquals(result, [[0], [1], [2], [3]]) + return True - self._eventually(condition) + self._eventually(condition, catch_assertions=True) def test_trainOn_predictOn(self): """Test that prediction happens on the updated model.""" @@ -1125,8 +1142,9 @@ def collect(rdd): def condition(): self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]]) + return True - self._eventually(condition) + self._eventually(condition, catch_assertions=True) class LinearDataGeneratorTests(MLlibTestCase): @@ -1189,8 +1207,9 @@ def test_parameter_accuracy(self): def condition(): rel = (1.5 - slr.latestModel().weights.array[0]) / 1.5 self.assertAlmostEqual(rel, 0.1, 1) + return True - self._eventually(condition) + self._eventually(condition, catch_assertions=True) def test_convergence(self): """ @@ -1213,8 +1232,10 @@ def test_convergence(self): def condition(): self.assertEquals(len(models), len(input_batches)) + return True - self._eventually(condition, 60.0) + # We want all batches to finish for this test. + self._eventually(condition, 60.0, catch_assertions=True) t_models = array(models) diff = t_models[1:] - t_models[:-1] @@ -1245,8 +1266,9 @@ def test_predictions(self): def condition(): self.assertEquals(len(true_predicted), len(input_batches)) + return True - self._eventually(condition) + self._eventually(condition, catch_assertions=True) # Test that the accuracy error is no more than 0.4 on each batch. for batch in true_predicted: @@ -1281,11 +1303,14 @@ def collect_errors(rdd): self.ssc.start() def condition(): - self.assertEquals(len(errors), len(predict_batches)) + # Test that the improvement in error is > 0.3 + if len(errors) == len(predict_batches): + self.assertGreater(errors[1] - errors[-1], 0.3) + if len(errors) >= 3 and errors[1] - errors[-1] > 0.3: + return True + return "Latest errors: " + ", ".join(map(lambda x: str(x), errors)) self._eventually(condition) - # Test that the improvement in error is > 0.3 - self.assertTrue(errors[1] - errors[-1] > 0.3) class StreamingLinearRegressionWithTests(MLLibStreamingTestCase): @@ -1319,8 +1344,9 @@ def condition(): self.assertArrayAlmostEqual( slr.latestModel().weights.array, [10., 10.], 1) self.assertAlmostEqual(slr.latestModel().intercept, 0.0, 1) + return True - self._eventually(condition) + self._eventually(condition, catch_assertions=True) def test_parameter_convergence(self): """Test that the model parameters improve with streaming data.""" @@ -1343,8 +1369,10 @@ def test_parameter_convergence(self): def condition(): self.assertEquals(len(model_weights), len(batches)) + return True - self._eventually(condition) + # We want all batches to finish for this test. + self._eventually(condition, catch_assertions=True) w = array(model_weights) diff = w[1:] - w[:-1] @@ -1374,8 +1402,10 @@ def test_prediction(self): def condition(): self.assertEquals(len(samples), len(batches)) + return True - self._eventually(condition) + # We want all batches to finish for this test. + self._eventually(condition, catch_assertions=True) # Test that mean absolute error on each batch is less than 0.1 for batch in samples: @@ -1396,11 +1426,11 @@ def test_train_prediction(self): predict_batches = [ b.map(lambda lp: (lp.label, lp.features)) for b in batches] - mean_absolute_errors = [] + errors = [] def func(rdd): true, predicted = zip(*rdd.collect()) - mean_absolute_errors.append(mean(abs(true) - abs(predicted))) + errors.append(mean(abs(true) - abs(predicted))) input_stream = self.ssc.queueStream(batches) output_stream = self.ssc.queueStream(predict_batches) @@ -1410,10 +1440,13 @@ def func(rdd): self.ssc.start() def condition(): - self.assertEquals(len(mean_absolute_errors), len(predict_batches)) + if len(errors) == len(predict_batches): + self.assertGreater(errors[1] - errors[-1], 2) + if len(errors) >= 3 and errors[1] - errors[-1] > 2: + return True + return "Latest errors: " + ", ".join(map(lambda x: str(x), errors)) self._eventually(condition) - self.assertTrue(mean_absolute_errors[1] - mean_absolute_errors[-1] > 2) class MLUtilsTests(MLlibTestCase): From 28978336c9c66341682a1a7209e61c04a48af708 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Wed, 12 Aug 2015 15:58:01 -0700 Subject: [PATCH 11/12] python style fixes --- python/pyspark/mllib/tests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 0641aa6daa94b..0cb4c8396c64e 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -116,14 +116,14 @@ def _eventually(condition, timeout=20.0, catch_assertions=False): lastValue = e else: lastValue = condition() - if lastValue == True: # Note: This is NOT the same as "if lastValue:" + if lastValue is True: return sleep(0.01) if isinstance(lastValue, AssertionError): raise lastValue else: raise AssertionError( - "Test failed due to timeout after %g sec, with last condition returning: %s"\ + "Test failed due to timeout after %g sec, with last condition returning: %s" % (timeout, lastValue)) From a4c3f1e42319f62ecb9d45b5d3a4cd48d669da10 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Thu, 13 Aug 2015 14:56:03 -0700 Subject: [PATCH 12/12] increased default pyspark ml streaming test eventually timeout to 30 sec from 20 --- python/pyspark/mllib/tests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 0cb4c8396c64e..5097c5e8ba4cd 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -89,7 +89,7 @@ def tearDown(self): self.ssc.stop(False) @staticmethod - def _eventually(condition, timeout=20.0, catch_assertions=False): + def _eventually(condition, timeout=30.0, catch_assertions=False): """ Wait a given amount of time for a condition to pass, else fail with an error. This is a helper utility for streaming ML tests. @@ -101,7 +101,7 @@ def _eventually(condition, timeout=20.0, catch_assertions=False): Note that this method may be called at any time during streaming execution (e.g., even before any results have been created). - :param timeout: Number of seconds to wait. Default 20 seconds. + :param timeout: Number of seconds to wait. Default 30 seconds. :param catch_assertions: If False (default), do not catch AssertionErrors. If True, catch AssertionErrors; continue, but save error to throw upon timeout.