Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactivate tests removed in #3259 #3263

Merged
merged 4 commits into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .buildkite/gen-pipeline.sh
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ run_gloo_integration() {
run_test "${test}" "${queue}" \
":factory: Elastic Spark TensorFlow Tests (${test})" \
"bash -c \"cd /horovod/test/integration && /spark_env.sh HOROVOD_LOG_LEVEL=DEBUG pytest --forked -v --log-cli-level 10 --log-cli-format '[%(asctime)-15s %(levelname)s %(filename)s:%(lineno)d %(funcName)s()] %(message)s' --capture=no --continue-on-collection-errors --junit-xml=/artifacts/junit.gloo.elastic.spark.tf.xml ${elastic_spark_tensorflow}\"" \
40
30
fi

# Elastic Horovod on Spark tests are very expensive (high timeout)
Expand All @@ -309,7 +309,7 @@ run_gloo_integration() {
run_test "${test}" "${queue}" \
":factory: Elastic Spark Torch Tests (${test})" \
"bash -c \"cd /horovod/test/integration && /spark_env.sh HOROVOD_LOG_LEVEL=DEBUG pytest --forked -v --log-cli-level 10 --log-cli-format '[%(asctime)-15s %(levelname)s %(filename)s:%(lineno)d %(funcName)s()] %(message)s' --capture=no --continue-on-collection-errors --junit-xml=/artifacts/junit.gloo.elastic.spark.torch.xml test_elastic_spark_torch.py\"" \
40
30
fi

}
Expand All @@ -332,7 +332,7 @@ run_spark_integration() {
run_test "${test}" "${queue}" \
":spark: Spark PyTests (${test})" \
"bash -c \"cd /horovod/test/integration && (ls -1 test_spark*.py | xargs -n 1 /bin/bash /pytest_standalone.sh spark)\"" \
40
30
fi

if [[ ${test} != *"tf2"* && ${test} != *"tfhead"* ]]; then
Expand Down Expand Up @@ -420,7 +420,7 @@ for test in ${tests[@]-}; do
run_mpi ${test} "cpu" ${oneccl_cmd_ofi}

# always run spark tests which use MPI and Gloo
#run_spark_integration ${test} "cpu"
run_spark_integration ${test} "cpu"

# no runner application, world size = 1
run_single_integration ${test} "cpu" ${oneccl_cmd_mpi}
Expand All @@ -432,7 +432,7 @@ for test in ${tests[@]-}; do
fi

# always run spark tests which use MPI and Gloo
#run_spark_integration ${test} "cpu"
run_spark_integration ${test} "cpu"

# no runner application, world size = 1
run_single_integration ${test} "cpu"
Expand Down Expand Up @@ -474,6 +474,6 @@ for test in ${tests[@]-}; do
run_mpi_integration ${test} "2x-gpu-v510"
fi

#run_spark_integration ${test} "2x-gpu-v510"
run_spark_integration ${test} "2x-gpu-v510"
fi
done
387 changes: 369 additions & 18 deletions .github/workflows/ci.yaml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Dockerfile.test.cpu
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ RUN pip install --no-cache-dir -U --force pip setuptools requests pytest mock py
# Add launch helper scripts
RUN echo "env SPARK_HOME=/spark SPARK_DRIVER_MEM=512m PYSPARK_PYTHON=/usr/bin/python${PYTHON_VERSION} PYSPARK_DRIVER_PYTHON=/usr/bin/python${PYTHON_VERSION} \"\$@\"" > /spark_env.sh
RUN echo /spark_env.sh pytest -v --capture=no --continue-on-collection-errors --junit-xml=/artifacts/junit.\$1.\${HOROVOD_RANK:-\${OMPI_COMM_WORLD_RANK:-\${PMI_RANK}}}.\$2.xml \${@:2} > /pytest.sh
RUN echo /spark_env.sh pytest -v --capture=no --continue-on-collection-errors --junit-xml=/artifacts/junit.\$1.standalone.\$2.xml \${@:2} > /pytest_standalone.sh
RUN echo /spark_env.sh pytest -v --capture=no --continue-on-collection-errors --junit-xml=/artifacts/junit.\$1.standalone.\$2.xml --forked \${@:2} > /pytest_standalone.sh
RUN chmod a+x /spark_env.sh
RUN chmod a+x /pytest.sh
RUN chmod a+x /pytest_standalone.sh
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.test.gpu
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ RUN pip install --no-cache-dir -U --force pip setuptools requests pytest mock py
# Add launch helper scripts
RUN echo "env SPARK_HOME=/spark SPARK_DRIVER_MEM=512m PYSPARK_PYTHON=/usr/bin/python${PYTHON_VERSION} PYSPARK_DRIVER_PYTHON=/usr/bin/python${PYTHON_VERSION} \"\$@\"" > /spark_env.sh
RUN echo /spark_env.sh pytest -v --capture=no --continue-on-collection-errors --junit-xml=/artifacts/junit.\$1.\${HOROVOD_RANK:-\${OMPI_COMM_WORLD_RANK:-\${PMI_RANK}}}.\$2.xml \${@:2} > /pytest.sh
RUN echo /spark_env.sh pytest -v --capture=no --continue-on-collection-errors --junit-xml=/artifacts/junit.\$1.standalone.\$2.xml \${@:2} > /pytest_standalone.sh
RUN echo /spark_env.sh pytest -v --capture=no --continue-on-collection-errors --junit-xml=/artifacts/junit.\$1.standalone.\$2.xml --forked \${@:2} > /pytest_standalone.sh
RUN chmod a+x /spark_env.sh
RUN chmod a+x /pytest.sh
RUN chmod a+x /pytest_standalone.sh
Expand Down
11 changes: 11 additions & 0 deletions examples/spark/pytorch/pytorch_lightning_spark_mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

# Spark PyTorch Lightning tests conflict with Tensorflow 2.5.x: https://github.com/horovod/horovod/pull/3263
try:
# tensorflow has to be imported BEFORE pytorch_lightning, otherwise we see the segfault right away
import tensorflow as tf
from distutils.version import LooseVersion
if LooseVersion('2.5.0') <= LooseVersion(tf.__version__) < LooseVersion('2.6.0'):
print('Skipping test as Pytorch Lightning conflicts with present Tensorflow 2.5.x', file=sys.stderr)
sys.exit(0)
except ImportError:
pass

from pytorch_lightning import LightningModule

import torch
Expand Down
69 changes: 66 additions & 3 deletions test/integration/test_spark_lightning.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import FloatType, IntegerType

# Spark PyTorch Lightning tests conflict with Tensorflow 2.5.x: https://github.com/horovod/horovod/pull/3263
skip_lightning_tests = False
try:
# tensorflow has to be imported BEFORE pytorch_lightning, otherwise we see the segfault right away
import tensorflow as tf
from distutils.version import LooseVersion
if LooseVersion('2.5.0') <= LooseVersion(tf.__version__) < LooseVersion('2.6.0'):
skip_lightning_tests = True
except ImportError:
pass

import pytorch_lightning as pl

import horovod
Expand Down Expand Up @@ -119,6 +130,10 @@ def __init__(self, *args, **kwargs):
warnings.simplefilter('module')

def test_fit_model(self):
if skip_lightning_tests:
self.skipTest('Spark PyTorch Lightning tests conflict with Tensorflow 2.5.x: '
'https://github.com/horovod/horovod/pull/3263')

maxhgerlach marked this conversation as resolved.
Show resolved Hide resolved
model = create_xor_model()

with spark_session('test_fit_model') as spark:
Expand Down Expand Up @@ -167,6 +182,10 @@ def test_terminate_on_nan_flag(self):
assert torch_estimator.getTerminateOnNan() == True

def test_legacy_fit_model(self):
if skip_lightning_tests:
self.skipTest('Spark PyTorch Lightning tests conflict with Tensorflow 2.5.x: '
'https://github.com/horovod/horovod/pull/3263')

model = create_legacy_xor_model()
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
loss = F.binary_cross_entropy
Expand Down Expand Up @@ -374,6 +393,10 @@ def test_torch_param_serialize(self):
assert serialized_dummy_param is None

def test_direct_parquet_train(self):
if skip_lightning_tests:
self.skipTest('Spark PyTorch Lightning tests conflict with Tensorflow 2.5.x: '
'https://github.com/horovod/horovod/pull/3263')

with spark_session('test_direct_parquet_train') as spark:
df = create_noisy_xor_data_with_val(spark)

Expand Down Expand Up @@ -413,7 +436,11 @@ def test_direct_parquet_train(self):
assert predictions.count() == df.count()

def test_direct_parquet_train_with_no_val_column(self):
with spark_session('test_direct_parquet_train') as spark:
if skip_lightning_tests:
self.skipTest('Spark PyTorch Lightning tests conflict with Tensorflow 2.5.x: '
'https://github.com/horovod/horovod/pull/3263')

with spark_session('test_direct_parquet_train_with_no_val_column') as spark:
df_train = create_noisy_xor_data(spark)
df_val = create_noisy_xor_data(spark)

Expand Down Expand Up @@ -541,6 +568,10 @@ def fn_add(output, label, reduction=None):
Test that horovod.spark.run_elastic works properly in a simple setup.
"""
def test_happy_run_elastic(self):
if skip_lightning_tests:
self.skipTest('Spark PyTorch Lightning tests conflict with Tensorflow 2.5.x: '
'https://github.com/horovod/horovod/pull/3263')

if not gloo_built():
self.skipTest("Gloo is not available")

Expand All @@ -555,6 +586,10 @@ def test_happy_run_elastic(self):
Test that horovod.spark.run_elastic works properly in a fault-tolerant situation.
"""
def test_happy_run_elastic_fault_tolerant(self):
if skip_lightning_tests:
self.skipTest('Spark PyTorch Lightning tests conflict with Tensorflow 2.5.x: '
'https://github.com/horovod/horovod/pull/3263')

if not gloo_built():
self.skipTest("Gloo is not available")

Expand Down Expand Up @@ -601,6 +636,10 @@ def test_happy_run_elastic_fault_tolerant_fails(self):
Test dummy callback function from pytorch lightning trainer.
"""
def test_dummy_callback(self):
if skip_lightning_tests:
self.skipTest('Spark PyTorch Lightning tests conflict with Tensorflow 2.5.x: '
'https://github.com/horovod/horovod/pull/3263')

from pytorch_lightning.callbacks import Callback
model = create_xor_model()

Expand Down Expand Up @@ -659,9 +698,13 @@ def on_train_end(self, trainer, model):
assert pred.dtype == torch.float32

"""
Test callback function for learning rate schedualer and monitor.
Test callback function for learning rate scheduler and monitor.
"""
def test_lr_schedualler_callback(self):
def test_lr_scheduler_callback(self):
if skip_lightning_tests:
self.skipTest('Spark PyTorch Lightning tests conflict with Tensorflow 2.5.x: '
'https://github.com/horovod/horovod/pull/3263')

from pytorch_lightning.callbacks import LearningRateMonitor

class LRTestingModel(XOR):
Expand Down Expand Up @@ -711,6 +754,10 @@ def lambda_func(epoch):
Test callback function for model checkpoint.
"""
def test_model_checkpoint_callback(self):
if skip_lightning_tests:
self.skipTest('Spark PyTorch Lightning tests conflict with Tensorflow 2.5.x: '
'https://github.com/horovod/horovod/pull/3263')

from pytorch_lightning.callbacks.model_checkpoint import ModelCheckpoint

with spark_session('test_fit_model') as spark:
Expand Down Expand Up @@ -747,6 +794,10 @@ def test_model_checkpoint_callback(self):
Test callback function for early stop.
"""
def test_early_stop_callback(self):
if skip_lightning_tests:
self.skipTest('Spark PyTorch Lightning tests conflict with Tensorflow 2.5.x: '
'https://github.com/horovod/horovod/pull/3263')

from pytorch_lightning.callbacks.early_stopping import EarlyStopping

with spark_session('test_fit_model') as spark:
Expand Down Expand Up @@ -786,6 +837,10 @@ def test_early_stop_callback(self):
Test train model with inmemory_cache_all
"""
def test_train_with_inmemory_cache_all(self):
if skip_lightning_tests:
self.skipTest('Spark PyTorch Lightning tests conflict with Tensorflow 2.5.x: '
'https://github.com/horovod/horovod/pull/3263')

with spark_session('test_fit_model') as spark:
df = create_noisy_xor_data(spark)
model = create_xor_model()
Expand Down Expand Up @@ -816,6 +871,10 @@ def test_train_with_inmemory_cache_all(self):
Test train model with custom data module (using PytorchAsyncDataLoader)
"""
def test_train_with_custom_data_module(self):
if skip_lightning_tests:
self.skipTest('Spark PyTorch Lightning tests conflict with Tensorflow 2.5.x: '
'https://github.com/horovod/horovod/pull/3263')

from horovod.spark.data_loaders.pytorch_data_loaders import PytorchAsyncDataLoader
class CustomDataModule(pl.LightningDataModule):
"""Custom DataModule for Lightning Estimator, using PytorchAsyncDataLoader"""
Expand Down Expand Up @@ -917,6 +976,10 @@ def val_dataloader(self):
Test override trainer args.
"""
def test_model_override_trainer_args(self):
if skip_lightning_tests:
self.skipTest('Spark PyTorch Lightning tests conflict with Tensorflow 2.5.x: '
'https://github.com/horovod/horovod/pull/3263')

from pytorch_lightning.callbacks.model_checkpoint import ModelCheckpoint

with spark_session('test_fit_model') as spark:
Expand Down
32 changes: 32 additions & 0 deletions test/single/data/expected_buildkite_gpu_heads_pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,35 @@ steps:
automatic: true
agents:
queue: 2x-gpu-v510
- label: ':spark: Spark Torch MNIST (test-gpu-gloo-py3_8-tfhead-keras_none-torchhead-mxnethead-pyspark3_1_2)'
command: bash -c "OMP_NUM_THREADS=1 /spark_env.sh python /horovod/examples/spark/pytorch/pytorch_spark_mnist.py --num-proc 2 --work-dir /work --data-dir /data --epochs 3"
artifact_paths: "artifacts/**"
plugins:
- docker-compose#v3.5.0:
run: test-gpu-gloo-py3_8-tfhead-keras_none-torchhead-mxnethead-pyspark3_1_2
volumes: "./artifacts:/artifacts"
config: docker-compose.test.yml
pull-retries: 3
- ecr#v1.2.0:
login: true
timeout_in_minutes: 10
retry:
automatic: true
agents:
queue: 2x-gpu-v510
- label: ':spark: Spark Lightning MNIST (test-gpu-gloo-py3_8-tfhead-keras_none-torchhead-mxnethead-pyspark3_1_2)'
command: bash -c "OMP_NUM_THREADS=1 /spark_env.sh python /horovod/examples/spark/pytorch/pytorch_lightning_spark_mnist.py --num-proc 2 --work-dir /work --data-dir /data --epochs 3"
artifact_paths: "artifacts/**"
plugins:
- docker-compose#v3.5.0:
run: test-gpu-gloo-py3_8-tfhead-keras_none-torchhead-mxnethead-pyspark3_1_2
volumes: "./artifacts:/artifacts"
config: docker-compose.test.yml
pull-retries: 3
- ecr#v1.2.0:
login: true
timeout_in_minutes: 10
retry:
automatic: true
agents:
queue: 2x-gpu-v510
Loading