diff --git a/.buildkite/gen-pipeline.sh b/.buildkite/gen-pipeline.sh index 8e6d37e06f..9c981cdafd 100755 --- a/.buildkite/gen-pipeline.sh +++ b/.buildkite/gen-pipeline.sh @@ -240,6 +240,10 @@ run_mpi_integration() { ":tensorflow: MPI TensorFlow 2.0 Keras MNIST api (${test})" \ "bash -c \"${oneccl_env} python /horovod/examples/tensorflow2/tensorflow2_keras_mnist.py 2 localhost:2 mpi\"" fi + + run_test "${test}" "${queue}" \ + ":tensorflow: MPI TensorFlow 2.0 MNIST Data Service (${test})" \ + "bash -c \"${oneccl_env} horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --mpi python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json\"" fi } @@ -307,6 +311,10 @@ run_gloo_integration() { ":tensorflow: Gloo TensorFlow 2.0 MNIST Elastic api (${test})" \ "python /horovod/examples/elastic/tensorflow2/tensorflow2_mnist_elastic.py 2 2 2 localhost:2,127.0.0.1:2" fi + + run_test "${test}" "${queue}" \ + ":tensorflow: Gloo TensorFlow 2.0 MNIST Data Service (${test})" \ + "bash -c \"horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --gloo python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json\"" else run_test "${test}" "${queue}" \ ":tensorflow: Gloo TensorFlow MNIST (${test})" \ @@ -411,6 +419,12 @@ run_spark_integration() { "bash -c \"OMP_NUM_THREADS=1 /spark_env.sh python /horovod/examples/spark/keras/keras_spark_mnist.py --num-proc 2 --work-dir /work --data-dir /data --epochs 3\"" fi + if [[ ${test} == *"tf2_"* ]] || [[ ${test} == *"tfhead"* ]]; then + run_test "${test}" "${queue}" \ + ":spark: Spark TensorFlow 2.0 MNIST Data Service (${test})" \ + "bash -c \"cd /horovod/examples/spark/tensorflow2; spark-submit --master \\\"local[2]\\\" \\\"/horovod/horovod/spark/tensorflow/compute_worker.py\\\" /tmp/compute.json & OMP_NUM_THREADS=1 /spark_env.sh spark-submit --master \\\"local[2]\\\" --py-files tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py,tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py tensorflow2_mnist_data_service.py /tmp/compute.json\"" + fi + run_test "${test}" "${queue}" \ ":spark: Spark Torch MNIST (${test})" \ "bash -c \"OMP_NUM_THREADS=1 /spark_env.sh python /horovod/examples/spark/pytorch/pytorch_spark_mnist.py --num-proc 2 --work-dir /work --data-dir /data --epochs 3\"" diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 2692c4364a..839f74b8df 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -193,6 +193,7 @@ jobs: Gloo_Single_PyTests: true Gloo_TensorFlow_2_0_Keras_MNIST_api: true Gloo_TensorFlow_2_0_Keras_MNIST_horovodrun: true + Gloo_TensorFlow_2_0_MNIST_Data_Service: true Gloo_TensorFlow_2_0_MNIST_Elastic_api: true Gloo_TensorFlow_2_0_MNIST_Elastic_horovodrun: true Gloo_TensorFlow_2_0_MNIST_api: true @@ -201,6 +202,7 @@ jobs: Single_PyTorch_MNIST: true Spark_Lightning_MNIST: true Spark_PyTests: true + Spark_TensorFlow_2_0_MNIST_Data_Service: true Spark_Torch_MNIST: true build_timeout: 30 @@ -214,6 +216,7 @@ jobs: Gloo_Single_PyTests: true Gloo_TensorFlow_2_0_Keras_MNIST_api: true Gloo_TensorFlow_2_0_Keras_MNIST_horovodrun: true + Gloo_TensorFlow_2_0_MNIST_Data_Service: true Gloo_TensorFlow_2_0_MNIST_Elastic_api: true Gloo_TensorFlow_2_0_MNIST_Elastic_horovodrun: true Gloo_TensorFlow_2_0_MNIST_api: true @@ -222,6 +225,7 @@ jobs: Single_PyTorch_MNIST: true Spark_Lightning_MNIST: true Spark_PyTests: true + Spark_TensorFlow_2_0_MNIST_Data_Service: true Spark_Torch_MNIST: true build_timeout: 30 @@ -235,6 +239,7 @@ jobs: Gloo_Single_PyTests: true Gloo_TensorFlow_2_0_Keras_MNIST_api: true Gloo_TensorFlow_2_0_Keras_MNIST_horovodrun: true + Gloo_TensorFlow_2_0_MNIST_Data_Service: true Gloo_TensorFlow_2_0_MNIST_Elastic_api: true Gloo_TensorFlow_2_0_MNIST_Elastic_horovodrun: true Gloo_TensorFlow_2_0_MNIST_api: true @@ -243,6 +248,7 @@ jobs: Single_PyTorch_MNIST: true Spark_Lightning_MNIST: true Spark_PyTests: true + Spark_TensorFlow_2_0_MNIST_Data_Service: true Spark_Torch_MNIST: true build_timeout: 30 @@ -258,6 +264,7 @@ jobs: Gloo_Single_PyTests: true Gloo_TensorFlow_2_0_Keras_MNIST_api: true Gloo_TensorFlow_2_0_Keras_MNIST_horovodrun: true + Gloo_TensorFlow_2_0_MNIST_Data_Service: true Gloo_TensorFlow_2_0_MNIST_Elastic_api: true Gloo_TensorFlow_2_0_MNIST_Elastic_horovodrun: true Gloo_TensorFlow_2_0_MNIST_api: true @@ -266,6 +273,7 @@ jobs: Single_PyTorch_MNIST: true Spark_Lightning_MNIST: true Spark_PyTests: true + Spark_TensorFlow_2_0_MNIST_Data_Service: true Spark_Torch_MNIST: true build_timeout: 30 @@ -281,6 +289,7 @@ jobs: Gloo_Single_PyTests: true Gloo_TensorFlow_2_0_Keras_MNIST_api: true Gloo_TensorFlow_2_0_Keras_MNIST_horovodrun: true + Gloo_TensorFlow_2_0_MNIST_Data_Service: true Gloo_TensorFlow_2_0_MNIST_Elastic_api: true Gloo_TensorFlow_2_0_MNIST_Elastic_horovodrun: true Gloo_TensorFlow_2_0_MNIST_api: true @@ -289,6 +298,7 @@ jobs: Single_PyTorch_MNIST: true Spark_Lightning_MNIST: true Spark_PyTests: true + Spark_TensorFlow_2_0_MNIST_Data_Service: true Spark_Torch_MNIST: true build_timeout: 30 @@ -301,6 +311,7 @@ jobs: MPI_Single_PyTests: true MPI_TensorFlow_2_0_Keras_MNIST_api: true MPI_TensorFlow_2_0_Keras_MNIST_horovodrun: true + MPI_TensorFlow_2_0_MNIST_Data_Service: true MPI_TensorFlow_2_0_MNIST_api: true MPI_TensorFlow_2_0_MNIST_horovodrun: true Single_MXNet_MNIST: true @@ -317,6 +328,7 @@ jobs: Gloo_Single_PyTests: true Gloo_TensorFlow_2_0_Keras_MNIST_api: true Gloo_TensorFlow_2_0_Keras_MNIST_horovodrun: true + Gloo_TensorFlow_2_0_MNIST_Data_Service: true Gloo_TensorFlow_2_0_MNIST_Elastic_api: true Gloo_TensorFlow_2_0_MNIST_Elastic_horovodrun: true Gloo_TensorFlow_2_0_MNIST_api: true @@ -329,6 +341,7 @@ jobs: MPI_Single_PyTests: true MPI_TensorFlow_2_0_Keras_MNIST_api: true MPI_TensorFlow_2_0_Keras_MNIST_horovodrun: true + MPI_TensorFlow_2_0_MNIST_Data_Service: true MPI_TensorFlow_2_0_MNIST_api: true MPI_TensorFlow_2_0_MNIST_horovodrun: true Run_PyTests_test_interactiverun: true @@ -336,6 +349,7 @@ jobs: Single_PyTorch_MNIST: true Spark_Lightning_MNIST: true Spark_PyTests: true + Spark_TensorFlow_2_0_MNIST_Data_Service: true Spark_Torch_MNIST: true build_timeout: 30 @@ -348,6 +362,7 @@ jobs: MPI_Single_PyTests: true MPI_TensorFlow_2_0_Keras_MNIST_api: true MPI_TensorFlow_2_0_Keras_MNIST_horovodrun: true + MPI_TensorFlow_2_0_MNIST_Data_Service: true MPI_TensorFlow_2_0_MNIST_api: true MPI_TensorFlow_2_0_MNIST_horovodrun: true Run_PyTests_test_interactiverun: true @@ -355,6 +370,7 @@ jobs: Single_PyTorch_MNIST: true Spark_Lightning_MNIST: true Spark_PyTests: true + Spark_TensorFlow_2_0_MNIST_Data_Service: true Spark_Torch_MNIST: true build_timeout: 30 @@ -897,6 +913,33 @@ jobs: docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/Gloo_TensorFlow_2_0_Keras_MNIST_horovodrun_run_3:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m horovodrun -np 2 -H localhost:2 --gloo python /horovod/examples/tensorflow2/tensorflow2_keras_mnist.py shell: bash + - name: "Gloo TensorFlow 2.0 MNIST Data Service [attempt 1 of 3]" + id: Gloo_TensorFlow_2_0_MNIST_Data_Service_run_1 + continue-on-error: true + if: always() && steps.build.outcome == 'success' && matrix.Gloo_TensorFlow_2_0_MNIST_Data_Service && true + run: | + mkdir -p artifacts/${{ matrix.image }}/Gloo_TensorFlow_2_0_MNIST_Data_Service_run_1 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/Gloo_TensorFlow_2_0_MNIST_Data_Service_run_1:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --gloo python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "Gloo TensorFlow 2.0 MNIST Data Service [attempt 2 of 3]" + id: Gloo_TensorFlow_2_0_MNIST_Data_Service_run_2 + continue-on-error: true + if: always() && steps.build.outcome == 'success' && matrix.Gloo_TensorFlow_2_0_MNIST_Data_Service && steps.Gloo_TensorFlow_2_0_MNIST_Data_Service_run_1.outcome == 'failure' + run: | + mkdir -p artifacts/${{ matrix.image }}/Gloo_TensorFlow_2_0_MNIST_Data_Service_run_2 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/Gloo_TensorFlow_2_0_MNIST_Data_Service_run_2:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --gloo python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "Gloo TensorFlow 2.0 MNIST Data Service [attempt 3 of 3]" + id: Gloo_TensorFlow_2_0_MNIST_Data_Service_run_3 + continue-on-error: false + if: always() && steps.build.outcome == 'success' && matrix.Gloo_TensorFlow_2_0_MNIST_Data_Service && steps.Gloo_TensorFlow_2_0_MNIST_Data_Service_run_2.outcome == 'failure' + run: | + mkdir -p artifacts/${{ matrix.image }}/Gloo_TensorFlow_2_0_MNIST_Data_Service_run_3 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/Gloo_TensorFlow_2_0_MNIST_Data_Service_run_3:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --gloo python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + - name: "Gloo TensorFlow 2.0 MNIST Elastic api [attempt 1 of 3]" id: Gloo_TensorFlow_2_0_MNIST_Elastic_api_run_1 continue-on-error: true @@ -1680,6 +1723,87 @@ jobs: docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_Keras_MNIST_horovodrun_ONECCL_OFI_run_3:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "\$(cat /oneccl_env) && echo '/mpirun_command_ofi' > /mpirun_command && \$(cat /mpirun_command) python /horovod/examples/tensorflow2/tensorflow2_keras_mnist.py" shell: bash + - name: "MPI TensorFlow 2.0 MNIST Data Service [attempt 1 of 3]" + id: MPI_TensorFlow_2_0_MNIST_Data_Service_run_1 + continue-on-error: true + if: always() && steps.build.outcome == 'success' && matrix.MPI_TensorFlow_2_0_MNIST_Data_Service && true + run: | + mkdir -p artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_run_1 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_run_1:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c " horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --mpi python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "MPI TensorFlow 2.0 MNIST Data Service [attempt 2 of 3]" + id: MPI_TensorFlow_2_0_MNIST_Data_Service_run_2 + continue-on-error: true + if: always() && steps.build.outcome == 'success' && matrix.MPI_TensorFlow_2_0_MNIST_Data_Service && steps.MPI_TensorFlow_2_0_MNIST_Data_Service_run_1.outcome == 'failure' + run: | + mkdir -p artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_run_2 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_run_2:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c " horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --mpi python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "MPI TensorFlow 2.0 MNIST Data Service [attempt 3 of 3]" + id: MPI_TensorFlow_2_0_MNIST_Data_Service_run_3 + continue-on-error: false + if: always() && steps.build.outcome == 'success' && matrix.MPI_TensorFlow_2_0_MNIST_Data_Service && steps.MPI_TensorFlow_2_0_MNIST_Data_Service_run_2.outcome == 'failure' + run: | + mkdir -p artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_run_3 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_run_3:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c " horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --mpi python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "MPI TensorFlow 2.0 MNIST Data Service [ONECCL MPI] [attempt 1 of 3]" + id: MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_1 + continue-on-error: true + if: always() && steps.build.outcome == 'success' && matrix.MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI && true + run: | + mkdir -p artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_1 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_1:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "\$(cat /oneccl_env) && echo '/mpirun_command_mpi' > /mpirun_command && horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --mpi python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "MPI TensorFlow 2.0 MNIST Data Service [ONECCL MPI] [attempt 2 of 3]" + id: MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_2 + continue-on-error: true + if: always() && steps.build.outcome == 'success' && matrix.MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI && steps.MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_1.outcome == 'failure' + run: | + mkdir -p artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_2 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_2:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "\$(cat /oneccl_env) && echo '/mpirun_command_mpi' > /mpirun_command && horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --mpi python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "MPI TensorFlow 2.0 MNIST Data Service [ONECCL MPI] [attempt 3 of 3]" + id: MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_3 + continue-on-error: false + if: always() && steps.build.outcome == 'success' && matrix.MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI && steps.MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_2.outcome == 'failure' + run: | + mkdir -p artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_3 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_3:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "\$(cat /oneccl_env) && echo '/mpirun_command_mpi' > /mpirun_command && horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --mpi python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "MPI TensorFlow 2.0 MNIST Data Service [ONECCL OFI] [attempt 1 of 3]" + id: MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_1 + continue-on-error: true + if: always() && steps.build.outcome == 'success' && matrix.MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI && true + run: | + mkdir -p artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_1 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_1:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "\$(cat /oneccl_env) && echo '/mpirun_command_ofi' > /mpirun_command && horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --mpi python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "MPI TensorFlow 2.0 MNIST Data Service [ONECCL OFI] [attempt 2 of 3]" + id: MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_2 + continue-on-error: true + if: always() && steps.build.outcome == 'success' && matrix.MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI && steps.MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_1.outcome == 'failure' + run: | + mkdir -p artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_2 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_2:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "\$(cat /oneccl_env) && echo '/mpirun_command_ofi' > /mpirun_command && horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --mpi python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "MPI TensorFlow 2.0 MNIST Data Service [ONECCL OFI] [attempt 3 of 3]" + id: MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_3 + continue-on-error: false + if: always() && steps.build.outcome == 'success' && matrix.MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI && steps.MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_2.outcome == 'failure' + run: | + mkdir -p artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_3 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_3:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "\$(cat /oneccl_env) && echo '/mpirun_command_ofi' > /mpirun_command && horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --mpi python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + - name: "MPI TensorFlow 2.0 MNIST api [attempt 1 of 3]" id: MPI_TensorFlow_2_0_MNIST_api_run_1 continue-on-error: true @@ -2220,6 +2344,33 @@ jobs: docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/Spark_PyTests_run_3:/artifacts" ${{ matrix.image }} /usr/bin/timeout 30m bash -c "cd /horovod/test/integration && (ls -1 test_spark*.py | xargs -n 1 /bin/bash /pytest_standalone.sh spark)" shell: bash + - name: "Spark TensorFlow 2.0 MNIST Data Service [attempt 1 of 3]" + id: Spark_TensorFlow_2_0_MNIST_Data_Service_run_1 + continue-on-error: true + if: always() && steps.build.outcome == 'success' && matrix.Spark_TensorFlow_2_0_MNIST_Data_Service && true + run: | + mkdir -p artifacts/${{ matrix.image }}/Spark_TensorFlow_2_0_MNIST_Data_Service_run_1 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/Spark_TensorFlow_2_0_MNIST_Data_Service_run_1:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "cd /horovod/examples/spark/tensorflow2; spark-submit --master \"local[2]\" \"/horovod/horovod/spark/tensorflow/compute_worker.py\" /tmp/compute.json & OMP_NUM_THREADS=1 /spark_env.sh spark-submit --master \"local[2]\" --py-files tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py,tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "Spark TensorFlow 2.0 MNIST Data Service [attempt 2 of 3]" + id: Spark_TensorFlow_2_0_MNIST_Data_Service_run_2 + continue-on-error: true + if: always() && steps.build.outcome == 'success' && matrix.Spark_TensorFlow_2_0_MNIST_Data_Service && steps.Spark_TensorFlow_2_0_MNIST_Data_Service_run_1.outcome == 'failure' + run: | + mkdir -p artifacts/${{ matrix.image }}/Spark_TensorFlow_2_0_MNIST_Data_Service_run_2 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/Spark_TensorFlow_2_0_MNIST_Data_Service_run_2:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "cd /horovod/examples/spark/tensorflow2; spark-submit --master \"local[2]\" \"/horovod/horovod/spark/tensorflow/compute_worker.py\" /tmp/compute.json & OMP_NUM_THREADS=1 /spark_env.sh spark-submit --master \"local[2]\" --py-files tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py,tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "Spark TensorFlow 2.0 MNIST Data Service [attempt 3 of 3]" + id: Spark_TensorFlow_2_0_MNIST_Data_Service_run_3 + continue-on-error: false + if: always() && steps.build.outcome == 'success' && matrix.Spark_TensorFlow_2_0_MNIST_Data_Service && steps.Spark_TensorFlow_2_0_MNIST_Data_Service_run_2.outcome == 'failure' + run: | + mkdir -p artifacts/${{ matrix.image }}/Spark_TensorFlow_2_0_MNIST_Data_Service_run_3 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/Spark_TensorFlow_2_0_MNIST_Data_Service_run_3:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "cd /horovod/examples/spark/tensorflow2; spark-submit --master \"local[2]\" \"/horovod/horovod/spark/tensorflow/compute_worker.py\" /tmp/compute.json & OMP_NUM_THREADS=1 /spark_env.sh spark-submit --master \"local[2]\" --py-files tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py,tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + - name: "Spark Torch MNIST [attempt 1 of 3]" id: Spark_Torch_MNIST_run_1 continue-on-error: true @@ -2293,6 +2444,7 @@ jobs: Gloo_Single_PyTests: true Gloo_TensorFlow_2_0_Keras_MNIST_api: true Gloo_TensorFlow_2_0_Keras_MNIST_horovodrun: true + Gloo_TensorFlow_2_0_MNIST_Data_Service: true Gloo_TensorFlow_2_0_MNIST_Elastic_api: true Gloo_TensorFlow_2_0_MNIST_Elastic_horovodrun: true Gloo_TensorFlow_2_0_MNIST_api: true @@ -2301,6 +2453,7 @@ jobs: Single_PyTorch_MNIST: true Spark_Lightning_MNIST: true Spark_PyTests: true + Spark_TensorFlow_2_0_MNIST_Data_Service: true Spark_Torch_MNIST: true build_timeout: 30 @@ -2831,6 +2984,33 @@ jobs: docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/Gloo_TensorFlow_2_0_Keras_MNIST_horovodrun_run_3:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m horovodrun -np 2 -H localhost:2 --gloo python /horovod/examples/tensorflow2/tensorflow2_keras_mnist.py shell: bash + - name: "Gloo TensorFlow 2.0 MNIST Data Service [attempt 1 of 3]" + id: Gloo_TensorFlow_2_0_MNIST_Data_Service_run_1 + continue-on-error: true + if: always() && steps.build.outcome == 'success' && matrix.Gloo_TensorFlow_2_0_MNIST_Data_Service && true + run: | + mkdir -p artifacts/${{ matrix.image }}/Gloo_TensorFlow_2_0_MNIST_Data_Service_run_1 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/Gloo_TensorFlow_2_0_MNIST_Data_Service_run_1:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --gloo python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "Gloo TensorFlow 2.0 MNIST Data Service [attempt 2 of 3]" + id: Gloo_TensorFlow_2_0_MNIST_Data_Service_run_2 + continue-on-error: true + if: always() && steps.build.outcome == 'success' && matrix.Gloo_TensorFlow_2_0_MNIST_Data_Service && steps.Gloo_TensorFlow_2_0_MNIST_Data_Service_run_1.outcome == 'failure' + run: | + mkdir -p artifacts/${{ matrix.image }}/Gloo_TensorFlow_2_0_MNIST_Data_Service_run_2 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/Gloo_TensorFlow_2_0_MNIST_Data_Service_run_2:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --gloo python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "Gloo TensorFlow 2.0 MNIST Data Service [attempt 3 of 3]" + id: Gloo_TensorFlow_2_0_MNIST_Data_Service_run_3 + continue-on-error: false + if: always() && steps.build.outcome == 'success' && matrix.Gloo_TensorFlow_2_0_MNIST_Data_Service && steps.Gloo_TensorFlow_2_0_MNIST_Data_Service_run_2.outcome == 'failure' + run: | + mkdir -p artifacts/${{ matrix.image }}/Gloo_TensorFlow_2_0_MNIST_Data_Service_run_3 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/Gloo_TensorFlow_2_0_MNIST_Data_Service_run_3:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --gloo python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + - name: "Gloo TensorFlow 2.0 MNIST Elastic api [attempt 1 of 3]" id: Gloo_TensorFlow_2_0_MNIST_Elastic_api_run_1 continue-on-error: true @@ -3614,6 +3794,87 @@ jobs: docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_Keras_MNIST_horovodrun_ONECCL_OFI_run_3:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "\$(cat /oneccl_env) && echo '/mpirun_command_ofi' > /mpirun_command && \$(cat /mpirun_command) python /horovod/examples/tensorflow2/tensorflow2_keras_mnist.py" shell: bash + - name: "MPI TensorFlow 2.0 MNIST Data Service [attempt 1 of 3]" + id: MPI_TensorFlow_2_0_MNIST_Data_Service_run_1 + continue-on-error: true + if: always() && steps.build.outcome == 'success' && matrix.MPI_TensorFlow_2_0_MNIST_Data_Service && true + run: | + mkdir -p artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_run_1 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_run_1:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c " horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --mpi python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "MPI TensorFlow 2.0 MNIST Data Service [attempt 2 of 3]" + id: MPI_TensorFlow_2_0_MNIST_Data_Service_run_2 + continue-on-error: true + if: always() && steps.build.outcome == 'success' && matrix.MPI_TensorFlow_2_0_MNIST_Data_Service && steps.MPI_TensorFlow_2_0_MNIST_Data_Service_run_1.outcome == 'failure' + run: | + mkdir -p artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_run_2 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_run_2:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c " horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --mpi python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "MPI TensorFlow 2.0 MNIST Data Service [attempt 3 of 3]" + id: MPI_TensorFlow_2_0_MNIST_Data_Service_run_3 + continue-on-error: false + if: always() && steps.build.outcome == 'success' && matrix.MPI_TensorFlow_2_0_MNIST_Data_Service && steps.MPI_TensorFlow_2_0_MNIST_Data_Service_run_2.outcome == 'failure' + run: | + mkdir -p artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_run_3 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_run_3:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c " horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --mpi python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "MPI TensorFlow 2.0 MNIST Data Service [ONECCL MPI] [attempt 1 of 3]" + id: MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_1 + continue-on-error: true + if: always() && steps.build.outcome == 'success' && matrix.MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI && true + run: | + mkdir -p artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_1 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_1:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "\$(cat /oneccl_env) && echo '/mpirun_command_mpi' > /mpirun_command && horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --mpi python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "MPI TensorFlow 2.0 MNIST Data Service [ONECCL MPI] [attempt 2 of 3]" + id: MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_2 + continue-on-error: true + if: always() && steps.build.outcome == 'success' && matrix.MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI && steps.MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_1.outcome == 'failure' + run: | + mkdir -p artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_2 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_2:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "\$(cat /oneccl_env) && echo '/mpirun_command_mpi' > /mpirun_command && horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --mpi python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "MPI TensorFlow 2.0 MNIST Data Service [ONECCL MPI] [attempt 3 of 3]" + id: MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_3 + continue-on-error: false + if: always() && steps.build.outcome == 'success' && matrix.MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI && steps.MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_2.outcome == 'failure' + run: | + mkdir -p artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_3 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_MPI_run_3:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "\$(cat /oneccl_env) && echo '/mpirun_command_mpi' > /mpirun_command && horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --mpi python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "MPI TensorFlow 2.0 MNIST Data Service [ONECCL OFI] [attempt 1 of 3]" + id: MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_1 + continue-on-error: true + if: always() && steps.build.outcome == 'success' && matrix.MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI && true + run: | + mkdir -p artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_1 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_1:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "\$(cat /oneccl_env) && echo '/mpirun_command_ofi' > /mpirun_command && horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --mpi python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "MPI TensorFlow 2.0 MNIST Data Service [ONECCL OFI] [attempt 2 of 3]" + id: MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_2 + continue-on-error: true + if: always() && steps.build.outcome == 'success' && matrix.MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI && steps.MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_1.outcome == 'failure' + run: | + mkdir -p artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_2 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_2:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "\$(cat /oneccl_env) && echo '/mpirun_command_ofi' > /mpirun_command && horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --mpi python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "MPI TensorFlow 2.0 MNIST Data Service [ONECCL OFI] [attempt 3 of 3]" + id: MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_3 + continue-on-error: false + if: always() && steps.build.outcome == 'success' && matrix.MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI && steps.MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_2.outcome == 'failure' + run: | + mkdir -p artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_3 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/MPI_TensorFlow_2_0_MNIST_Data_Service_ONECCL_OFI_run_3:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "\$(cat /oneccl_env) && echo '/mpirun_command_ofi' > /mpirun_command && horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --mpi python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + - name: "MPI TensorFlow 2.0 MNIST api [attempt 1 of 3]" id: MPI_TensorFlow_2_0_MNIST_api_run_1 continue-on-error: true @@ -4154,6 +4415,33 @@ jobs: docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/Spark_PyTests_run_3:/artifacts" ${{ matrix.image }} /usr/bin/timeout 30m bash -c "cd /horovod/test/integration && (ls -1 test_spark*.py | xargs -n 1 /bin/bash /pytest_standalone.sh spark)" shell: bash + - name: "Spark TensorFlow 2.0 MNIST Data Service [attempt 1 of 3]" + id: Spark_TensorFlow_2_0_MNIST_Data_Service_run_1 + continue-on-error: true + if: always() && steps.build.outcome == 'success' && matrix.Spark_TensorFlow_2_0_MNIST_Data_Service && true + run: | + mkdir -p artifacts/${{ matrix.image }}/Spark_TensorFlow_2_0_MNIST_Data_Service_run_1 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/Spark_TensorFlow_2_0_MNIST_Data_Service_run_1:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "cd /horovod/examples/spark/tensorflow2; spark-submit --master \"local[2]\" \"/horovod/horovod/spark/tensorflow/compute_worker.py\" /tmp/compute.json & OMP_NUM_THREADS=1 /spark_env.sh spark-submit --master \"local[2]\" --py-files tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py,tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "Spark TensorFlow 2.0 MNIST Data Service [attempt 2 of 3]" + id: Spark_TensorFlow_2_0_MNIST_Data_Service_run_2 + continue-on-error: true + if: always() && steps.build.outcome == 'success' && matrix.Spark_TensorFlow_2_0_MNIST_Data_Service && steps.Spark_TensorFlow_2_0_MNIST_Data_Service_run_1.outcome == 'failure' + run: | + mkdir -p artifacts/${{ matrix.image }}/Spark_TensorFlow_2_0_MNIST_Data_Service_run_2 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/Spark_TensorFlow_2_0_MNIST_Data_Service_run_2:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "cd /horovod/examples/spark/tensorflow2; spark-submit --master \"local[2]\" \"/horovod/horovod/spark/tensorflow/compute_worker.py\" /tmp/compute.json & OMP_NUM_THREADS=1 /spark_env.sh spark-submit --master \"local[2]\" --py-files tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py,tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + + - name: "Spark TensorFlow 2.0 MNIST Data Service [attempt 3 of 3]" + id: Spark_TensorFlow_2_0_MNIST_Data_Service_run_3 + continue-on-error: false + if: always() && steps.build.outcome == 'success' && matrix.Spark_TensorFlow_2_0_MNIST_Data_Service && steps.Spark_TensorFlow_2_0_MNIST_Data_Service_run_2.outcome == 'failure' + run: | + mkdir -p artifacts/${{ matrix.image }}/Spark_TensorFlow_2_0_MNIST_Data_Service_run_3 + docker-compose -f docker-compose.test.yml run -e GITHUB_ACTIONS --rm --volume "$(pwd)/artifacts/${{ matrix.image }}/Spark_TensorFlow_2_0_MNIST_Data_Service_run_3:/artifacts" ${{ matrix.image }} /usr/bin/timeout 10m bash -c "cd /horovod/examples/spark/tensorflow2; spark-submit --master \"local[2]\" \"/horovod/horovod/spark/tensorflow/compute_worker.py\" /tmp/compute.json & OMP_NUM_THREADS=1 /spark_env.sh spark-submit --master \"local[2]\" --py-files tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py,tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py tensorflow2_mnist_data_service.py /tmp/compute.json" + shell: bash + - name: "Spark Torch MNIST [attempt 1 of 3]" id: Spark_Torch_MNIST_run_1 continue-on-error: true diff --git a/CHANGELOG.md b/CHANGELOG.md index a44cb3e608..ef578e19a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - Spark: expose random seed as an optional parameter. ([#3517](https://github.com/horovod/horovod/pull/3517)) +- Added Horovod job to spin up distributed TensorFlow Data Service. ([#3525](https://github.com/horovod/horovod/pull/3525)) + ### Changed - MXNet: Updated allreduce functions to newer `op` API. ([#3299](https://github.com/horovod/horovod/pull/3299)) diff --git a/Dockerfile.test.cpu b/Dockerfile.test.cpu index e9aad4f729..9f537d89a6 100644 --- a/Dockerfile.test.cpu +++ b/Dockerfile.test.cpu @@ -250,6 +250,9 @@ RUN sed -i "s/dataset.take(20000/dataset.take(100/" /horovod/examples/tensorflow # Hack TensorFlow 2.0 example to be smaller. RUN sed -i "s/dataset.take(10000/dataset.take(100/" /horovod/examples/tensorflow2/tensorflow2_mnist.py +# Hack TensorFlow 2.0 Data Service example to be smaller. +RUN sed -i "s/ epochs=24/ epochs=4/" /horovod/examples/tensorflow2/tensorflow2_mnist_data_service_train_fn_*_side_dispatcher.py + # Hack Keras MNIST advanced example to be smaller. RUN sed -i "s/'--epochs', type=int, default=24,/'--epochs', type=int, default=9,/" /horovod/examples/keras/keras_mnist_advanced.py RUN sed -i "s/model.add(Conv2D(32, kernel_size=(3, 3),/model.add(Conv2D(1, kernel_size=(3, 3),/" /horovod/examples/keras/keras_mnist_advanced.py diff --git a/Dockerfile.test.gpu b/Dockerfile.test.gpu index 88fab2ed2e..9a037844a1 100644 --- a/Dockerfile.test.gpu +++ b/Dockerfile.test.gpu @@ -224,6 +224,9 @@ RUN sed -i "s/dataset.take(20000/dataset.take(100/" /horovod/examples/tensorflow # Hack TensorFlow 2.0 example to be smaller. RUN sed -i "s/dataset.take(10000/dataset.take(100/" /horovod/examples/tensorflow2/tensorflow2_mnist.py +# Hack TensorFlow 2.0 Data Service example to be smaller. +RUN sed -i "s/ epochs=24/ epochs=4/" /horovod/examples/tensorflow2/tensorflow2_mnist_data_service_train_fn_*_side_dispatcher.py + # Hack Keras MNIST advanced example to be smaller. RUN sed -i "s/'--epochs', type=int, default=24,/'--epochs', type=int, default=9,/" /horovod/examples/keras/keras_mnist_advanced.py diff --git a/docs/tensorflow.rst b/docs/tensorflow.rst index 9297115281..4af573c55d 100644 --- a/docs/tensorflow.rst +++ b/docs/tensorflow.rst @@ -184,3 +184,152 @@ TensorFlow v2 Example (from the `MNIST `_ +allows to move CPU intensive processing of your dataset from your training process to a cluster of +CPU-rich processes. + +With Horovod, it is easy to spin up a TensorFlow Data Service on your Horovod cluster and to connect +your Horovod training job to it. + +Run the following command to run a TensorFlow Data Service via Horovod: + +.. code-block:: bash + + horovodrun -np 4 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json + +This starts a TensorFlow Data Service (here called compute job) with one dispatcher and four workers. + +.. note:: The config file is written by the compute job and has to be located on a path that is accessible + to all nodes that run the compute job, e.g. a distributed file system. + +Your training job can then move CPU intensive dataset operations to this data service by +calling ``.send_to_data_service(…)`` on the TensorFlow dataset: + +.. code-block:: python + + from horovod.tensorflow.data.compute_service import TfDataServiceConfig + + hvd.init() + rank = hvd.rank() + size = hvd.size() + + compute_config = TfDataServiceConfig.read('/tmp/compute.json', wait_for_file_creation=True) + + dataset = dataset.repeat() \ + .shuffle(10000) \ + .batch(128) \ + .send_to_data_service(compute_config, rank, size) \ + .prefetch(tf.data.experimental.AUTOTUNE) + +All transformations before calling ``send_to_data_service`` will be executed by the data service, +while all transformations after it are executed locally by the training script. + +You can find the `tensorflow2_mnist_data_service.py `_ +example in the examples directory. + +First start the data service as shown above. While the data service is running, start the example training script: + +.. code-block:: bash + + horovodrun -np 2 python tensorflow2_mnist_data_service.py /tmp/compute.json + +The compute job normally runs on CPU nodes while the training job runs on GPU nodes. This allows to run CPU intensive +dataset transformation on CPU nodes while running GPU intensive training on GPU nodes. There can be multiple CPUs +dedicated to one GPU task. + +Use the ``--hosts`` argument to run compute and train job on CPU (here ``cpu-node-1`` and ``cpu-node-2``) +and GPU nodes (here ``gpu-node-1`` and ``gpu-node-2``), respectively: + +.. code-block:: bash + + horovodrun -np 4 --hosts cpu-node-1:2,cpu-node-2:2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json + horovodrun -np 2 --hosts gpu-node-1:1,gpu-node-2:1 python tensorflow2_mnist_data_service.py /tmp/compute.json + +.. note:: + + Please make sure you understand how TensorFlow Data Service distributes dataset transformations: + See the `distribute `_ transformation. + +Multiple Dispatchers +~~~~~~~~~~~~~~~~~~~~ + +The data service allows for multiple dispatchers, one per training task. Each dispatcher gets the same number of workers. +As workers are dedicated to a single dispatcher, workers get dedicated to a single training task. +The size of your compute job (``-np 4``) has to be a multiple of the number of dispatchers (``--dispatchers 2``): + +.. code-block:: bash + + horovodrun -np 4 python -m horovod.tensorflow.data.compute_worker --dispatchers 2 /tmp/compute.json + +This requires the number of dispatchers (``--dispatchers 2``) to match the size of your training job (``-np 2``): + +.. code-block:: bash + + horovodrun -np 2 python tensorflow2_mnist_data_service.py /tmp/compute.json + +Single Dispatchers +~~~~~~~~~~~~~~~~~~ + +With a single dispatcher, TensorFlow allows to reuse the dataset across all training tasks. This is done on a +first-come-first-serve basis, or round robin. The only supported processing mode is ``"distributed_epoch"``. + +Training-side dispatchers +~~~~~~~~~~~~~~~~~~~~~~~~~ + +The dispatchers by default run inside the compute job. You can, however, also run them inside the training job. +Add ``--dispatcher-side training`` to tell the compute job that dispatchers are started by the training job. + +.. code-block:: bash + + horovodrun -np 4 python -m horovod.tensorflow.data.compute_worker --dispatcher-side training /tmp/compute.json + +The training script then starts the dispatchers via ``with tf_data_service(…)`` and distributes the dataset itself: + +.. code-block:: python + + hvd.init() + rank = hvd.rank() + size = hvd.size() + + compute_config = TfDataServiceConfig.read('/tmp/compute.json', wait_for_file_creation=True) + + with tf_data_service(compute_config, rank) as dispatcher_address: + + dataset = dataset.repeat() \ + .shuffle(10000) \ + .batch(128) \ + .apply(tf.data.experimental.service.distribute( + processing_mode="distributed_epoch", + service=dispatcher_address, + job_name='job' if reuse_dataset else None, + consumer_index=rank if round_robin else None, + num_consumers=size if round_robin else None)) \ + .prefetch(tf.data.experimental.AUTOTUNE) + +To see the specific changes needed to make the training job run dispatchers, +simply diff the training-side example with the compute-side example: + +.. code-block:: bash + + diff -w examples/tensorflow2/tensorflow2_mnist_data_service_train_fn_* + +Compute job on Spark cluster +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The compute job can be started on a Spark cluster using ``spark-submit``: + +.. code-block:: bash + + worker_py=$(python -c "import horovod.spark.tensorflow.compute_worker as worker; print(worker.__file__)") + spark-submit --master "local[4]" "$worker_py" /tmp/compute.json + + +While the compute job is running, start the training job: + + cd examples/spark/tensorflow2 + spark-submit --master "local[2]" --py-files tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py,tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py tensorflow2_mnist_data_service.py /tmp/compute.json + +As usual, the config file has to be located on a path that is accessible to all nodes that run the compute job. diff --git a/examples/spark/tensorflow2/tensorflow2_mnist_data_service.py b/examples/spark/tensorflow2/tensorflow2_mnist_data_service.py new file mode 100644 index 0000000000..4a0b55459f --- /dev/null +++ b/examples/spark/tensorflow2/tensorflow2_mnist_data_service.py @@ -0,0 +1,84 @@ +# Copyright 2022 G-Research. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +import argparse +import os +import sys + +from pyspark import SparkConf +from pyspark.sql import SparkSession + +from horovod.spark import run +from horovod.tensorflow.data.compute_service import TfDataServiceConfig +from tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher import train_fn as train_fn_compute_side +from tensorflow2_mnist_data_service_train_fn_training_side_dispatcher import train_fn as train_fn_training_side + +os.environ["CUDA_VISIBLE_DEVICES"] = "-1" + + +# This exemplifies how to use the Tensorflow Compute Service with Horovod. +# The Tensorflow Dispatcher can reside with the training script, or the compute service. +# If you use only one of these options, you can ignore the respective code of the other option in this example. +if __name__ == '__main__': + parser = argparse.ArgumentParser() + + parser.add_argument("configfile", type=str, + help=f"The path to the compute service config file.") + + parser.add_argument("--reuse-dataset", required=False, action="store_true", default=False, + help=f"Reusing the dataset allows the training tasks to reads from a single dataset " + f"in a first-come-first-serve manner.", + dest="reuse_dataset") + + parser.add_argument("--round-robin", required=False, action="store_true", default=False, + help=f"Reusing the dataset can be done round-robin instead first-come-first-serve.", + dest="round_robin") + + parsed_args = parser.parse_args() + + compute_config = TfDataServiceConfig.read(parsed_args.configfile, wait_for_file_creation=True) + + conf = SparkConf() + spark = SparkSession.builder.config(conf=conf).getOrCreate() + spark_context = spark.sparkContext + training_tasks = spark_context.defaultParallelism + + if compute_config.dispatchers > 1 and training_tasks != compute_config.dispatchers: + print(f'The number of training tasks ({training_tasks}) must match ' + f'the number of dispatchers ({compute_config.dispatchers}) configured in the ' + f'data service config file ({parsed_args.configfile}).', file=sys.stderr) + sys.exit(1) + + # pick the right train_fn depending on the dispatcher side + if compute_config.dispatcher_side == 'training': + train_fn = train_fn_training_side + elif compute_config.dispatcher_side == 'compute': + train_fn = train_fn_compute_side + else: + raise ValueError(f'Unsupported dispatcher side: {compute_config.dispatcher_side}') + + # run the distributed training + run(train_fn, + args=(compute_config,), + kwargs={ + 'reuse_dataset': parsed_args.reuse_dataset, + 'round_robin': parsed_args.round_robin + }, + num_proc=training_tasks, + stdout=sys.stdout, + stderr=sys.stderr) + + compute = compute_config.compute_client(verbose=2) + compute.shutdown() diff --git a/examples/spark/tensorflow2/tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py b/examples/spark/tensorflow2/tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py new file mode 120000 index 0000000000..3a34b09432 --- /dev/null +++ b/examples/spark/tensorflow2/tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py @@ -0,0 +1 @@ +../../tensorflow2/tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py \ No newline at end of file diff --git a/examples/spark/tensorflow2/tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py b/examples/spark/tensorflow2/tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py new file mode 120000 index 0000000000..46e10361c6 --- /dev/null +++ b/examples/spark/tensorflow2/tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py @@ -0,0 +1 @@ +../../tensorflow2/tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py \ No newline at end of file diff --git a/examples/tensorflow2/tensorflow2_mnist_data_service.py b/examples/tensorflow2/tensorflow2_mnist_data_service.py new file mode 100644 index 0000000000..fb724a5e2a --- /dev/null +++ b/examples/tensorflow2/tensorflow2_mnist_data_service.py @@ -0,0 +1,76 @@ +# Copyright 2022 G-Research. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +import argparse +import os +import sys + +from horovod.runner.common.util import env +from horovod.tensorflow.data.compute_service import TfDataServiceConfig +from tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher import train_fn as train_fn_compute_side +from tensorflow2_mnist_data_service_train_fn_training_side_dispatcher import train_fn as train_fn_training_side + +os.environ["CUDA_VISIBLE_DEVICES"] = "-1" + + +# This exemplifies how to use the Tensorflow Compute Service with Horovod. +# The Tensorflow Dispatcher can reside with the training script, or the compute service. +# If you use only one of these options, you can ignore the respective code of the other option in this example. +if __name__ == '__main__': + parser = argparse.ArgumentParser() + + parser.add_argument("configfile", type=str, + help=f"The path to the compute service config file.") + + parser.add_argument("--training-tasks", required=False, type=int, + help=f"The number of training tasks when there is only one dispatcher. " + f"Otherwise there are as many training tasks as there are dispatchers.", + dest="training_tasks") + + parser.add_argument("--reuse-dataset", required=False, action="store_true", default=False, + help=f"Reusing the dataset allows the training tasks to reads from a single dispatcher " + f"in a first-come-first-serve manner.", + dest="reuse_dataset") + + parser.add_argument("--round-robin", required=False, action="store_true", default=False, + help=f"Reusing the dataset can be done round-robin instead first-come-first-serve.", + dest="round_robin") + + parsed_args = parser.parse_args() + + compute_config = TfDataServiceConfig.read(parsed_args.configfile, wait_for_file_creation=True) + + rank, size = env.get_env_rank_and_size() + + if compute_config.dispatchers > 1 and compute_config.dispatchers != size: + print(f'Unless there is only one dispatcher, the number of training tasks ({size}) must match ' + f'the number of dispatchers ({compute_config.dispatchers}) configured in the ' + f'data service config file ({parsed_args.compute_service_config_file}).', file=sys.stderr) + sys.exit(1) + + # pick the right train_fn depending on the dispatcher side + if compute_config.dispatcher_side == 'training': + train_fn = train_fn_training_side + elif compute_config.dispatcher_side == 'compute': + train_fn = train_fn_compute_side + else: + raise ValueError(f'Unsupported dispatcher side: {compute_config.dispatcher_side}') + + # run the distributed training + train_fn(compute_config, reuse_dataset=parsed_args.reuse_dataset, round_robin=parsed_args.round_robin) + + if rank == 0: + compute = compute_config.compute_client(verbose=2) + compute.shutdown() diff --git a/examples/tensorflow2/tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py b/examples/tensorflow2/tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py new file mode 100644 index 0000000000..524487f752 --- /dev/null +++ b/examples/tensorflow2/tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py @@ -0,0 +1,94 @@ +# Copyright 2022 G-Research. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +import os + +import tensorflow as tf +from filelock import FileLock + +import horovod.tensorflow.keras as hvd +from horovod.tensorflow.data.compute_service import TfDataServiceConfig + + +# arguments reuse_dataset and round_robin only used when single dispatcher is present +def train_fn(compute_config: TfDataServiceConfig, reuse_dataset: bool = False, round_robin: bool = False): + # Horovod: initialize Horovod. + hvd.init() + + # Horovod: pin GPU to be used to process local rank (one GPU per process) + gpus = tf.config.experimental.list_physical_devices('GPU') + for gpu in gpus: + tf.config.experimental.set_memory_growth(gpu, True) + if gpus: + tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU') + + # this lock guarantees only one training task downloads the dataset + with FileLock(os.path.expanduser("~/.horovod_lock")): + (mnist_images, mnist_labels), _ = tf.keras.datasets.mnist.load_data() + + dataset = tf.data.Dataset.from_tensor_slices( + (tf.cast(mnist_images[..., tf.newaxis] / 255.0, tf.float32), + tf.cast(mnist_labels, tf.int64)) + ) + + # Allow tf.data service to pre-process the pipeline + dataset = dataset.repeat() \ + .shuffle(10000) \ + .batch(128) \ + .send_to_data_service(compute_config, hvd.rank(), hvd.size(), + reuse_dataset=reuse_dataset, round_robin=round_robin) \ + .prefetch(tf.data.experimental.AUTOTUNE) + + mnist_model = tf.keras.Sequential([ + tf.keras.layers.Conv2D(32, [3, 3], activation='relu'), + tf.keras.layers.Conv2D(64, [3, 3], activation='relu'), + tf.keras.layers.MaxPooling2D(pool_size=(2, 2)), + tf.keras.layers.Dropout(0.25), + tf.keras.layers.Flatten(), + tf.keras.layers.Dense(128, activation='relu'), + tf.keras.layers.Dropout(0.5), + tf.keras.layers.Dense(10, activation='softmax') + ]) + + # Horovod: adjust learning rate based on number of GPUs. + scaled_lr = 0.001 * hvd.size() + opt = tf.optimizers.Adam(scaled_lr) + + # Horovod: add Horovod DistributedOptimizer. + opt = hvd.DistributedOptimizer( + opt, backward_passes_per_step=1, average_aggregated_gradients=True) + + # Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow + # uses hvd.DistributedOptimizer() to compute gradients. + mnist_model.compile(loss=tf.losses.SparseCategoricalCrossentropy(), + optimizer=opt, + metrics=['accuracy'], + experimental_run_tf_function=False) + + callbacks = [ + hvd.callbacks.BroadcastGlobalVariablesCallback(0), + hvd.callbacks.LearningRateWarmupCallback(initial_lr=scaled_lr, warmup_epochs=3, verbose=1), + ] + + # Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them. + if hvd.rank() == 0: + callbacks.append(tf.keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5')) + + # Horovod: write logs on worker 0. + verbose = 1 if hvd.rank() == 0 else 0 + + # Train the model. + # Horovod: adjust number of steps based on number of GPUs. + mnist_model.fit(dataset, steps_per_epoch=32 // hvd.size(), callbacks=callbacks, epochs=24, verbose=verbose) diff --git a/examples/tensorflow2/tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py b/examples/tensorflow2/tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py new file mode 100644 index 0000000000..4a86e12807 --- /dev/null +++ b/examples/tensorflow2/tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py @@ -0,0 +1,99 @@ +# Copyright 2022 G-Research. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +import os + +import tensorflow as tf +from filelock import FileLock + +import horovod.tensorflow.keras as hvd +from horovod.tensorflow.data.compute_service import TfDataServiceConfig, tf_data_service + + +# arguments reuse_dataset and round_robin only used when single dispatcher is present +def train_fn(compute_config: TfDataServiceConfig, reuse_dataset: bool = False, round_robin: bool = False): + # Horovod: initialize Horovod. + hvd.init() + + # Horovod: pin GPU to be used to process local rank (one GPU per process) + gpus = tf.config.experimental.list_physical_devices('GPU') + for gpu in gpus: + tf.config.experimental.set_memory_growth(gpu, True) + if gpus: + tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU') + + with tf_data_service(compute_config, hvd.rank()) as dispatcher_address: + # this lock guarantees only one training task downloads the dataset + with FileLock(os.path.expanduser("~/.horovod_lock")): + (mnist_images, mnist_labels), _ = tf.keras.datasets.mnist.load_data() + + dataset = tf.data.Dataset.from_tensor_slices( + (tf.cast(mnist_images[..., tf.newaxis] / 255.0, tf.float32), + tf.cast(mnist_labels, tf.int64)) + ) + + # Allow tf.data service to pre-process the pipeline + dataset = dataset.repeat() \ + .shuffle(10000) \ + .batch(128) \ + .apply(tf.data.experimental.service.distribute( + service=dispatcher_address, + processing_mode="distributed_epoch", + job_name='job' if reuse_dataset else None, + consumer_index=hvd.rank() if round_robin else None, + num_consumers=hvd.size() if round_robin else None)) \ + .prefetch(tf.data.experimental.AUTOTUNE) + + mnist_model = tf.keras.Sequential([ + tf.keras.layers.Conv2D(32, [3, 3], activation='relu'), + tf.keras.layers.Conv2D(64, [3, 3], activation='relu'), + tf.keras.layers.MaxPooling2D(pool_size=(2, 2)), + tf.keras.layers.Dropout(0.25), + tf.keras.layers.Flatten(), + tf.keras.layers.Dense(128, activation='relu'), + tf.keras.layers.Dropout(0.5), + tf.keras.layers.Dense(10, activation='softmax') + ]) + + # Horovod: adjust learning rate based on number of GPUs. + scaled_lr = 0.001 * hvd.size() + opt = tf.optimizers.Adam(scaled_lr) + + # Horovod: add Horovod DistributedOptimizer. + opt = hvd.DistributedOptimizer( + opt, backward_passes_per_step=1, average_aggregated_gradients=True) + + # Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow + # uses hvd.DistributedOptimizer() to compute gradients. + mnist_model.compile(loss=tf.losses.SparseCategoricalCrossentropy(), + optimizer=opt, + metrics=['accuracy'], + experimental_run_tf_function=False) + + callbacks = [ + hvd.callbacks.BroadcastGlobalVariablesCallback(0), + hvd.callbacks.LearningRateWarmupCallback(initial_lr=scaled_lr, warmup_epochs=3, verbose=1), + ] + + # Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them. + if hvd.rank() == 0: + callbacks.append(tf.keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5')) + + # Horovod: write logs on worker 0. + verbose = 1 if hvd.rank() == 0 else 0 + + # Train the model. + # Horovod: adjust number of steps based on number of GPUs. + mnist_model.fit(dataset, steps_per_epoch=32 // hvd.size(), callbacks=callbacks, epochs=24, verbose=verbose) diff --git a/horovod/runner/common/service/compute_service.py b/horovod/runner/common/service/compute_service.py new file mode 100644 index 0000000000..64d2e73833 --- /dev/null +++ b/horovod/runner/common/service/compute_service.py @@ -0,0 +1,248 @@ +# Copyright 2022 G-Research. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +import threading + +from horovod.runner.common.util import timeout, network +from horovod.runner.common.util.network import AckResponse +from horovod.runner.common.util.timeout import TimeoutException +from horovod.runner.util.threads import in_thread + +""" +Items sent over the Wire between Compute Client and Service. +""" + + +class RegisterDispatcherRequest(object): + """Registers a dispatcher server address along with a dispatcher id.""" + + def __init__(self, dispatcher_id, dispatcher_address): + self.dispatcher_id = dispatcher_id + """Id of the dispatcher server (0 indexed)""" + + self.dispatcher_address = dispatcher_address + """Address of the dispatcher""" + + +class WaitForDispatcherRegistrationRequest(object): + """Wait for the given dispatcher to register. Blocks until the dispatcher registers or the timeout occurs.""" + + def __init__(self, dispatcher_id, timeout): + self.dispatcher_id = dispatcher_id + """Dispatcher id""" + + self.timeout = timeout + """Wait timeout in seconds""" + + +class WaitForDispatcherRegistrationResponse(object): + """Response that the dispatcher has registered, providing its address.""" + def __init__(self, dispatcher_address): + self.dispatcher_address = dispatcher_address + """Address of the requested dispatcher.""" + + +class RegisterDispatcherWorkerRequest(object): + """Registers a worker server for a dispatcher server.""" + + def __init__(self, dispatcher_id, worker_id): + self.dispatcher_id = dispatcher_id + """Id of the dispatcher server (0 indexed)""" + + self.worker_id = worker_id + """Id of the worker server (0 indexed)""" + + +class WaitForDispatcherWorkerRegistrationRequest(object): + """Wait for all workers of the given dispatcher to register. + Blocks until all workers registers or the timeout occurs.""" + + def __init__(self, dispatcher_id, timeout): + self.dispatcher_id = dispatcher_id + """Dispatcher id""" + + self.timeout = timeout + """Wait timeout in seconds""" + + +class ShutdownRequest(object): + """Initiate the shutdown of the compute service as it is no longer needed.""" + + +class WaitForShutdownRequest(object): + """Wait for the compute service to shutdown. Blocks until the shutdown is initiated.""" + + +""" +ComputeService is used to communicate between training driver, training tasks, compute driver and compute tasks. +It is ML framework agnostic, though currently only used for Tensorflow data service (tf.data.experimental.service). +It orchestrates synchronization between tf.DispatchServer, tf.WorkerServer, training and compute tasks. + +ComputeClient is used to query and change the internal state of the ComputeService. +""" + + +class ComputeService(network.BasicService): + NAME = "Compute service" + + def __init__(self, dispatchers, workers_per_dispatcher, key, nics=None): + if dispatchers <= 0: + raise ValueError(f'The number of dispatchers must be larger than 0: {dispatchers}') + if workers_per_dispatcher <= 0: + raise ValueError(f'The number of workers per dispatcher must be larger than 0: {workers_per_dispatcher}') + + self._max_dispatcher_id = dispatchers - 1 + self._dispatcher_addresses = [None] * dispatchers + self._workers_per_dispatcher = workers_per_dispatcher + self._dispatcher_worker_ids = [set()] * dispatchers + self._shutdown = False + self._wait_cond = threading.Condition() + + super().__init__(ComputeService.NAME, key, nics) + + def _handle(self, req, client_address): + if isinstance(req, RegisterDispatcherRequest): + self._wait_cond.acquire() + try: + if not 0 <= req.dispatcher_id <= self._max_dispatcher_id: + return IndexError(f'Dispatcher id must be within [0..{self._max_dispatcher_id}]: ' + f'{req.dispatcher_id}') + + if self._dispatcher_addresses[req.dispatcher_id] is not None and \ + self._dispatcher_addresses[req.dispatcher_id] != req.dispatcher_address: + return ValueError(f'Dispatcher with id {req.dispatcher_id} has already been registered under ' + f'different address {self._dispatcher_addresses[req.dispatcher_id]}: ' + f'{req.dispatcher_address}') + + self._dispatcher_addresses[req.dispatcher_id] = req.dispatcher_address + self._wait_cond.notify_all() + finally: + self._wait_cond.release() + return network.AckResponse() + + if isinstance(req, WaitForDispatcherRegistrationRequest): + self._wait_cond.acquire() + try: + if not 0 <= req.dispatcher_id <= self._max_dispatcher_id: + return IndexError(f'Dispatcher id must be within [0..{self._max_dispatcher_id}]: ' + f'{req.dispatcher_id}') + + tmout = timeout.Timeout(timeout=req.timeout, + message='Timed out waiting for {activity}. Try to find out what takes ' + 'the dispatcher so long to register or increase timeout.') + + while self._dispatcher_addresses[req.dispatcher_id] is None: + self._wait_cond.wait(tmout.remaining()) + tmout.check_time_out_for(f'dispatcher {req.dispatcher_id} to register') + except TimeoutException as e: + return e + finally: + self._wait_cond.release() + return WaitForDispatcherRegistrationResponse(self._dispatcher_addresses[req.dispatcher_id]) + + if isinstance(req, RegisterDispatcherWorkerRequest): + self._wait_cond.acquire() + try: + if not 0 <= req.dispatcher_id <= self._max_dispatcher_id: + return IndexError(f'Dispatcher id must be within [0..{self._max_dispatcher_id}]: ' + f'{req.dispatcher_id}') + + self._dispatcher_worker_ids[req.dispatcher_id].update({req.worker_id}) + self._wait_cond.notify_all() + finally: + self._wait_cond.release() + return network.AckResponse() + + if isinstance(req, WaitForDispatcherWorkerRegistrationRequest): + # if there is only a single dispatcher, wait for that one instead of the requested one + dispatcher_id = req.dispatcher_id if self._max_dispatcher_id > 0 else 0 + + self._wait_cond.acquire() + try: + if not 0 <= req.dispatcher_id <= self._max_dispatcher_id: + return IndexError(f'Dispatcher id must be within [0..{self._max_dispatcher_id}]: ' + f'{req.dispatcher_id}') + + tmout = timeout.Timeout(timeout=req.timeout, + message='Timed out waiting for {activity}. Try to find out what takes ' + 'the workers so long to register or increase timeout.') + + while len(self._dispatcher_worker_ids[dispatcher_id]) < self._workers_per_dispatcher: + self._wait_cond.wait(tmout.remaining()) + tmout.check_time_out_for(f'workers for dispatcher {dispatcher_id} to register') + except TimeoutException as e: + return e + finally: + self._wait_cond.release() + return network.AckResponse() + + if isinstance(req, ShutdownRequest): + in_thread(self.shutdown) + return network.AckResponse() + + if isinstance(req, WaitForShutdownRequest): + self._wait_cond.acquire() + try: + while not self._shutdown: + self._wait_cond.wait() + finally: + self._wait_cond.release() + return network.AckResponse() + + return super()._handle(req, client_address) + + def shutdown(self): + self._wait_cond.acquire() + try: + # notify all requests that are waiting for shutdown + self._shutdown = True + self._wait_cond.notify_all() + finally: + self._wait_cond.release() + + # this will wait until all requests have been completed + super(ComputeService, self).shutdown() + + +class ComputeClient(network.BasicClient): + + def __init__(self, compute_addresses, key, verbose=1): + super().__init__(ComputeService.NAME, compute_addresses, key, verbose) + + def register_dispatcher(self, dispatcher_id, dispatcher_address): + self._send(RegisterDispatcherRequest(dispatcher_id, dispatcher_address)) + + def wait_for_dispatcher_registration(self, dispatcher_id, timeout) -> str: + resp = self._send(WaitForDispatcherRegistrationRequest(dispatcher_id, timeout)) + return resp.dispatcher_address + + def register_worker_for_dispatcher(self, dispatcher_id, worker_id): + self._send(RegisterDispatcherWorkerRequest(dispatcher_id, worker_id)) + + def wait_for_dispatcher_worker_registration(self, dispatcher_id, timeout): + self._send(WaitForDispatcherWorkerRegistrationRequest(dispatcher_id, timeout)) + + def shutdown(self): + self._send(ShutdownRequest()) + + def wait_for_shutdown(self): + self._send(WaitForShutdownRequest()) + + def _send(self, req, stream=None): + """Raise exceptions that we retrieve for any request.""" + resp = super(ComputeClient, self)._send(req, stream) + if isinstance(resp, Exception): + raise resp + return resp diff --git a/horovod/runner/common/util/timeout.py b/horovod/runner/common/util/timeout.py index 2e5ad77d09..8d9a5da0b5 100644 --- a/horovod/runner/common/util/timeout.py +++ b/horovod/runner/common/util/timeout.py @@ -16,6 +16,12 @@ import time +class TimeoutException(Exception): + def __init__(self, message): + self.message = message + super(TimeoutException, self).__init__(message) + + class Timeout(object): def __init__(self, timeout, message): self._timeout = timeout @@ -30,7 +36,7 @@ def timed_out(self): def check_time_out_for(self, activity): if self.timed_out(): - raise Exception( + raise TimeoutException( '{}{} Timeout after {} seconds.'.format( self._message.format(activity=activity), '.' if not self._message.rstrip().endswith('.') else '', diff --git a/horovod/spark/tensorflow/__init__.py b/horovod/spark/tensorflow/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/horovod/spark/tensorflow/compute_worker.py b/horovod/spark/tensorflow/compute_worker.py new file mode 100644 index 0000000000..8d7772a3ee --- /dev/null +++ b/horovod/spark/tensorflow/compute_worker.py @@ -0,0 +1,81 @@ +# Copyright 2022 G-Research. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +import argparse +import logging +import signal +import sys + +from pyspark.sql import SparkSession + +from horovod.runner.common.service.compute_service import ComputeService +from horovod.runner.common.util import secret +from horovod.spark import run +from horovod.tensorflow.data.compute_service import TfDataServiceConfig, compute_worker_fn + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + + parser.add_argument("configfile", type=str, + help=f"The path to store the compute service config file.") + + parser.add_argument("--dispatchers", required=False, default=1, type=int, + help=f"The number of dispatcher to support.", + dest="dispatchers") + + parser.add_argument("--dispatcher-side", required=False, default='compute', type=str, + help=f"Where do the dispatcher run? On 'compute' side or 'training' side.", + dest="dispatcher_side") + + parsed_args = parser.parse_args() + + spark = SparkSession.builder.getOrCreate() + spark_context = spark.sparkContext + workers = spark_context.defaultParallelism + + if workers % parsed_args.dispatchers: + raise ValueError(f'Number of processes ({workers}) must be ' + f'a multiple of number of dispatchers ({parsed_args.dispatchers}).') + workers_per_dispatcher = workers // parsed_args.dispatchers + + key = secret.make_secret_key() + compute = ComputeService(parsed_args.dispatchers, workers_per_dispatcher, key=key) + + compute_config = TfDataServiceConfig( + dispatchers=parsed_args.dispatchers, + workers_per_dispatcher=workers_per_dispatcher, + dispatcher_side=parsed_args.dispatcher_side, + addresses=compute.addresses(), + key=key + ) + compute_config.write(parsed_args.configfile) + + def _exit_gracefully(): + logging.info('Spark driver receiving SIGTERM. Exiting gracefully') + spark_context.stop() + + signal.signal(signal.SIGTERM, _exit_gracefully) + + ret = run(compute_worker_fn, + args=(compute_config,), + stdout=sys.stdout, + stderr=sys.stderr, + num_proc=workers, + verbose=2) + + compute.shutdown() + spark.stop() + + sys.exit(ret) diff --git a/horovod/tensorflow/data/__init__.py b/horovod/tensorflow/data/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/horovod/tensorflow/data/compute_service.py b/horovod/tensorflow/data/compute_service.py new file mode 100644 index 0000000000..0e88cfe3cf --- /dev/null +++ b/horovod/tensorflow/data/compute_service.py @@ -0,0 +1,207 @@ +# Copyright 2022 G-Research. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +import binascii +import dataclasses +import json +import logging +import os +import time +from contextlib import contextmanager +from pathlib import Path +from tempfile import NamedTemporaryFile +from typing import Mapping, Sequence, Tuple, Any, Optional + +import tensorflow as tf + +import horovod.tensorflow as hvd +from horovod.runner.common.service.compute_service import ComputeClient + + +@dataclasses.dataclass(frozen=True) +class TfDataServiceConfig: + dispatchers: int + workers_per_dispatcher: int + dispatcher_side: str + addresses: Mapping[str, Sequence[Tuple[str, int]]] + key: bytes + timeout: int = 60 + + def compute_client(self, verbose=1) -> ComputeClient: + return ComputeClient(self.addresses, self.key, verbose=verbose) + + def to_dict(self) -> Mapping[str, Any]: + config = self.__dict__.copy() + config['key'] = binascii.hexlify(config.get('key')).decode() + return config + + @staticmethod + def from_dict(config: Mapping[str, Any]) -> 'TfDataServiceConfig': + config = dict(**config) + config['key'] = binascii.unhexlify(config.get('key')) + config['addresses'] = {intf: [(addr[0], addr[1]) for addr in addrs] + for intf, addrs in config.get('addresses').items()} + + return TfDataServiceConfig( + dispatchers=config.get('dispatchers'), + workers_per_dispatcher=config.get('workers_per_dispatcher'), + dispatcher_side=config.get('dispatcher_side'), + addresses=config.get('addresses'), + key=config.get('key'), + timeout=config.get('timeout') + ) + + def write(self, filename: str): + path = Path(filename) + with NamedTemporaryFile('w', dir=str(path.parent), prefix=str(path.name), delete=False) as w: + # write the complete config into a different file first + logging.info(f'Writing config to {w.name}') + w.write(json.dumps(self.to_dict())) + + # move the finished config file into place, this happens inside the same directory so it should be quick + logging.info(f'Renaming config from {w.name} to {filename}') + os.rename(w.name, filename) + + @staticmethod + def read(filename: str, wait_for_file_creation: bool = False) -> 'TfDataServiceConfig': + while wait_for_file_creation: + if os.path.exists(filename): + break + time.sleep(1) + + with open(filename, 'r') as r: + return TfDataServiceConfig.from_dict(json.load(r)) + + +@contextmanager +def tf_data_service(compute_config: TfDataServiceConfig, rank: int) -> str: + """ + Provides the address of the TF Dispatcher. + """ + + compute = compute_config.compute_client(verbose=2) + + dispatcher_server = None + if compute_config.dispatcher_side == 'training': + if compute_config.dispatchers > 1 or compute_config.dispatchers == 1 and rank == 0: + if compute_config.dispatchers == 1: + logging.info(f"Setting up Dispatcher for all tasks") + else: + logging.info(f"Setting up Dispatcher for task {rank}") + + dispatcher_server = tf.data.experimental.service.DispatchServer() + logging.debug(f"Registering Dispatcher {rank} at {dispatcher_server.target}") + compute.register_dispatcher(rank, dispatcher_server.target) + logging.info(f"Registered Dispatcher {rank} at {dispatcher_server.target}") + + dispatcher_id = rank if compute_config.dispatchers > 1 else 0 + dispatcher_address = compute.wait_for_dispatcher_registration(dispatcher_id, compute_config.timeout) + compute.wait_for_dispatcher_worker_registration(dispatcher_id, compute_config.timeout) + + # let the caller use the dispatcher + try: + yield dispatcher_address + finally: + if dispatcher_server: + # there is currently no other way to stop the dispatch server + logging.debug(f"Shuting down dispatcher") + dispatcher_server._stop() + dispatcher_server.join() + logging.info(f"Dispatcher shut down") + + +def send_to_data_service(dataset: tf.data.Dataset, + compute_config: TfDataServiceConfig, + rank: int, + size: Optional[int] = None, + processing_mode: str = 'distributed_epoch', + reuse_dataset: bool = False, + round_robin: bool = False) -> tf.data.Dataset: + if compute_config.dispatcher_side == 'training': + raise RuntimeError('training side dispatcher not supported, use tf_data_service context manager instead') + + with tf_data_service(compute_config, rank) as dispatcher_address: + return dataset.apply(tf.data.experimental.service.distribute( + processing_mode=processing_mode, + service=dispatcher_address, + job_name='job' if reuse_dataset else None, + consumer_index=rank if reuse_dataset and round_robin else None, + num_consumers=size if reuse_dataset and round_robin else None)) + + +tf.data.Dataset.send_to_data_service = send_to_data_service + + +def compute_worker_fn(compute_config: TfDataServiceConfig): + """ Function run on the compute tasks providing tf dispatcher and worker server. """ + hvd.init() + index, size = hvd.rank(), hvd.size() + dispatcher_index = index // compute_config.workers_per_dispatcher + + compute = compute_config.compute_client(verbose=2) + + import tensorflow as tf + + # Create dispatcher for train task + dispatcher_server = None + if compute_config.dispatcher_side == 'compute' and index % compute_config.workers_per_dispatcher == 0: + if compute_config.dispatchers == 1: + logging.info(f"Setting up Dispatcher for all tasks") + else: + logging.info(f"Setting up Dispatcher for task {dispatcher_index}") + + dispatcher_server = tf.data.experimental.service.DispatchServer() + logging.debug(f"Registering Dispatcher {dispatcher_index} at {dispatcher_server.target}") + compute.register_dispatcher(dispatcher_index, dispatcher_server.target) + logging.info(f"Registered Dispatcher {dispatcher_index} at {dispatcher_server.target}") + + # Get dispatcher for the worker + logging.debug(f'Waiting for dispatcher {dispatcher_index} for worker {index}') + dispatcher_address = compute.wait_for_dispatcher_registration(dispatcher_index, compute_config.timeout) + logging.debug(f'Dispatcher {dispatcher_index} for worker {index} available') + + # Create worker + logging.debug(f"Setting up worker for dispatcher {dispatcher_index}") + worker_config = tf.data.experimental.service.WorkerConfig( + dispatcher_address=dispatcher_address.split("://")[1], + heartbeat_interval_ms=1000, + dispatcher_timeout_ms=compute_config.timeout * 1000) + worker_server = tf.data.experimental.service.WorkerServer(worker_config) + logging.debug(f"Starting worker for dispatcher {dispatcher_index}") + worker_server.start() + logging.debug(f"Started worker for dispatcher {dispatcher_index}") + + # Tell the compute service that we are ready + logging.debug(f"Registering worker for dispatcher {dispatcher_index}") + compute.register_worker_for_dispatcher(dispatcher_index, index) + logging.info(f"Worker for dispatcher {dispatcher_index} registered") + + # Wait until the compute service shuts down + logging.debug(f"Waiting for shutdown request") + compute.wait_for_shutdown() + logging.debug(f"Shutdown requested") + + # stop the servers + # there is currently no other way to stop the worker server + logging.debug(f"Shuting down worker") + worker_server._stop() + worker_server.join() + logging.info(f"Worker shut down") + if dispatcher_server: + # there is currently no other way to stop the dispatch server + logging.debug(f"Shuting down dispatcher") + dispatcher_server._stop() + dispatcher_server.join() + logging.info(f"Dispatcher shut down") diff --git a/horovod/tensorflow/data/compute_worker.py b/horovod/tensorflow/data/compute_worker.py new file mode 100644 index 0000000000..4a1383558e --- /dev/null +++ b/horovod/tensorflow/data/compute_worker.py @@ -0,0 +1,83 @@ +# Copyright 2022 G-Research. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +import argparse + +import tensorflow as tf + +import horovod.tensorflow as hvd +from horovod.runner.common.service.compute_service import ComputeService +from horovod.runner.common.util import secret +from horovod.tensorflow.data.compute_service import TfDataServiceConfig, compute_worker_fn + + +def main(dispatchers: int, dispatcher_side: str, configfile: str, timeout: int): + hvd.init() + rank, size = hvd.rank(), hvd.size() + + if size % dispatchers: + raise ValueError(f'Number of processes ({size}) must be a multiple of number of dispatchers ({dispatchers}).') + workers_per_dispatcher = size // dispatchers + + # start the compute service on rank 0 + compute = None + try: + compute_config = None + + if rank == 0: + key = secret.make_secret_key() + compute = ComputeService(dispatchers, workers_per_dispatcher, key=key) + + compute_config = TfDataServiceConfig( + dispatchers=dispatchers, + workers_per_dispatcher=workers_per_dispatcher, + dispatcher_side=dispatcher_side, + addresses=compute.addresses(), + key=key, + timeout=timeout + ) + compute_config.write(configfile) + + # broadcast this config to all ranks via CPU ops + with tf.device(f'/cpu:0'): + compute_config = hvd.broadcast_object(compute_config, name='TfDataServiceConfig') + + # start all compute workers + compute_worker_fn(compute_config) + finally: + if compute is not None: + compute.shutdown() + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + + parser.add_argument("configfile", type=str, + help=f"The path to store the compute service config file.") + + parser.add_argument("--dispatchers", required=False, default=1, type=int, + help=f"The number of dispatcher to support.", + dest="dispatchers") + + parser.add_argument("--dispatcher-side", required=False, default='compute', type=str, + help=f"Where do the dispatcher run? On 'compute' side or 'training' side.", + dest="dispatcher_side") + + parser.add_argument("--timeout", required=False, default=60, type=int, + help=f"Timeout to setup worker and connect everything.", + dest="timeout") + + parsed_args = parser.parse_args() + main(parsed_args.dispatchers, parsed_args.dispatcher_side, parsed_args.configfile, parsed_args.timeout) diff --git a/setup.py b/setup.py index 415cf5eecc..8df5b71900 100644 --- a/setup.py +++ b/setup.py @@ -196,7 +196,7 @@ def build_extensions(self): # torchvision 0.5.0 depends on torch==1.4.0 # python packages required only to run tests -test_require_list = ['mock', 'pytest', 'pytest-forked', 'parameterized'] +test_require_list = ['mock', 'pytest', 'pytest-forked', 'pytest-subtests', 'parameterized'] # Skip cffi if pytorch extension explicitly disabled if not os.environ.get('HOROVOD_WITHOUT_PYTORCH'): diff --git a/test/parallel/test_compute_worker.py b/test/parallel/test_compute_worker.py new file mode 100644 index 0000000000..82ae4e7f42 --- /dev/null +++ b/test/parallel/test_compute_worker.py @@ -0,0 +1,256 @@ +# Copyright 2022 G-Research, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +import logging +import os +import unittest +from distutils.version import LooseVersion +from itertools import islice + +import tensorflow as tf + +import horovod.tensorflow as hvd +from horovod.runner.util.threads import in_thread +from horovod.tensorflow.data.compute_service import TfDataServiceConfig, tf_data_service +from horovod.tensorflow.data.compute_worker import main + +_PRE_TF_2_0_0 = LooseVersion(tf.__version__) < LooseVersion("2.0.0") + + +# this test is to be run via horovodrun -np 2, all processes have to run on the same machine +@unittest.skipIf(_PRE_TF_2_0_0, 'Compute service not supported pre 2.0.0') +class ComputeWorkerTest(unittest.TestCase): + # general timeout in this test + timeout = 3 + + # rank and size of this test + hvd.init() + rank, size = hvd.rank(), hvd.size() + + # Horovod: pin GPU to be used to process local rank (one GPU per process) + gpus = tf.config.experimental.list_physical_devices('GPU') + for gpu in gpus: + tf.config.experimental.set_memory_growth(gpu, True) + if gpus: + tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU') + + @property + def expected_cluster_shape(self): + return [(r, self.size) for r in range(self.size)] + + def test_single_dispatcher(self): + self.do_test_worker(1, reuse_dataset=False, round_robin=False) + + def test_single_dispatcher_reuse_dataset_fcfs(self): + self.do_test_worker(1, reuse_dataset=True, round_robin=False) + + @unittest.skip('Not fully consuming the dataset upsets the dispatcher on termination, even without stopping them.' + 'Round robing requires an infinite dataset, so it cannotbe fully consumed and test would idle a long time.') + # https://github.com/tensorflow/tensorflow/issues/56490 + def test_single_dispatcher_reuse_dataset_round_robin(self): + self.do_test_worker(1, reuse_dataset=True, round_robin=True) + + def test_two_dispatchers(self): + self.do_test_worker(2, reuse_dataset=False, round_robin=False) + + def do_test_worker(self, + dispatchers: int, + reuse_dataset: bool, + round_robin: bool): + for processing_mode in ['distributed_epoch', 'parallel_epochs']: + with self.subTest(processing_mode=processing_mode, dispatcher_side='compute'): + self.do_test_worker_compute_side(dispatchers, processing_mode=processing_mode, reuse_dataset=reuse_dataset, round_robin=round_robin) + with self.subTest(processing_mode=processing_mode, dispatcher_side='training'): + self.do_test_worker_training_side(dispatchers, processing_mode=processing_mode, reuse_dataset=reuse_dataset, round_robin=round_robin) + + # keep this in-sync with do_test_worker_training_side + def do_test_worker_compute_side(self, + dispatchers: int, + processing_mode: str, + reuse_dataset: bool, + round_robin: bool): + # the config file for this worker + configfile = __file__ + '.config' + if self.rank == 0 and os.path.exists(configfile): + raise RuntimeError(f'Config file exists already, please delete first: {configfile}') + + # synchronize with all processes + self.assertTrue(self.size > 1) + logging.debug('waiting for all processes to get started') + cluster_shape = hvd.allgather_object((self.rank, self.size), name='test_start') + self.assertEqual(self.expected_cluster_shape, cluster_shape) + logging.debug('all processes started') + + try: + # start the worker + logging.debug('starting worker process') + worker = in_thread(main, (dispatchers, 'compute', configfile, self.timeout), daemon=True) + # this runs 'main' as a separated process + #command = f'{sys.executable} -m horovod.tensorflow.data.compute_worker --dispatchers {dispatchers} --dispatcher-side compute {configfile}' + #worker = in_thread(safe_shell_exec.execute, (command, None, sys.stdout, sys.stderr), daemon=True) + logging.debug('worker process started') + + # read the config file + compute_config = TfDataServiceConfig.read(configfile, wait_for_file_creation=True) + + try: + # Allow tf.data service to pre-process the pipeline + dataset = tf.data.Dataset.range(1024) + if reuse_dataset and round_robin: + dataset = dataset.repeat() + dataset = dataset.batch(128) \ + .send_to_data_service(compute_config, self.rank, self.size, + processing_mode=processing_mode, + reuse_dataset=reuse_dataset, + round_robin=round_robin) + + # fetch the batches + it = islice(dataset.as_numpy_iterator(), 8) + actual = list([batch.tolist() for batch in it]) + + # synchronize with all processes + logging.debug('waiting for all processes to finish') + actuals = hvd.allgather_object(actual) + logging.debug('all processes finished') + + # assert the provided batches + # the batches are not deterministic, so we cannot assert them here too thoroughly + # that would test tf.data service anyway, all we assert here is that worker and send_to_data_service + # work together nicely and produce a consumable dataset + self.assertEqual(self.size, len(actuals), msg="one 'actual batches' from each process") + + # in reuse_dataset and fcfs it might happen that one process gets all the data and one does not get any + if reuse_dataset and not round_robin: + self.assertTrue(any([len(actual) > 0 for actual in actuals]), msg='at least one process has at least one batch') + else: + self.assertEqual([True] * self.size, [len(actual) > 0 for actual in actuals], msg='each process has at least one batch') + + for actual in actuals: + self.assertEqual([True] * len(actual), [0 < len(batch) <= 128 for batch in actual], msg=f'all batches are at most 128 in size: {[len(batch) for batch in actual]}') + for batch in actual: + self.assertEqual([True] * len(batch), [0 <= i < 1024 for i in batch], msg=f'values in batch must be within [0..1024): {batch}') + + finally: + # shutdown compute service + if self.rank == 0: + logging.debug('sending shutdown request') + compute = compute_config.compute_client(verbose=2) + compute.shutdown() + logging.debug('shutdown request sent') + + # in round robin mode, the worker process does not terminate once stopped until some high timeout + if not (reuse_dataset and round_robin): + # wait for the worker to terminate + logging.debug('waiting for worker to terminate') + worker.join(self.timeout) + + self.assertFalse(worker.is_alive()) + logging.debug('worker terminated') + + finally: + # remove the configfile as it will interfere with subsequent runs of this test + if self.rank == 0 and os.path.exists(configfile): + os.unlink(configfile) + + # keep this in-sync with do_test_worker_compute_side + def do_test_worker_training_side(self, + dispatchers: int, + processing_mode: str, + reuse_dataset: bool, + round_robin: bool): + # the config file for this worker + configfile = __file__ + '.config' + if self.rank == 0 and os.path.exists(configfile): + raise RuntimeError(f'Config file exists already, please delete first: {configfile}') + + # synchronize with all processes + self.assertTrue(self.size > 1) + logging.debug('waiting for all processes to get started') + cluster_shape = hvd.allgather_object((self.rank, self.size), name='test_start') + self.assertEqual(self.expected_cluster_shape, cluster_shape) + logging.debug('all processes started') + + try: + # start the worker + logging.debug('starting worker process') + worker = in_thread(main, (dispatchers, 'training', configfile, self.timeout), daemon=True) + # this runs 'main' as a separated process + #command = f'{sys.executable} -m horovod.tensorflow.data.compute_worker --dispatchers {dispatchers} --dispatcher-side compute {configfile}' + #worker = in_thread(safe_shell_exec.execute, (command, None, sys.stdout, sys.stderr), daemon=True) + logging.debug('worker process started') + + # read the config file + compute_config = TfDataServiceConfig.read(configfile, wait_for_file_creation=True) + + try: + with tf_data_service(compute_config, hvd.rank()) as dispatcher_address: + # Allow tf.data service to pre-process the pipeline + dataset = tf.data.Dataset.range(1024) + if reuse_dataset and round_robin: + dataset = dataset.repeat() + dataset = dataset.batch(128) \ + .apply(tf.data.experimental.service.distribute( + service=dispatcher_address, + processing_mode=processing_mode, + job_name='job' if reuse_dataset else None, + consumer_index=hvd.rank() if round_robin else None, + num_consumers=hvd.size() if round_robin else None)) + + # fetch the batches + it = islice(dataset.as_numpy_iterator(), 8) + actual = list([batch.tolist() for batch in it]) + + # synchronize with all processes + logging.debug('waiting for all processes to finish') + actuals = hvd.allgather_object(actual) + logging.debug('all processes finished') + + # assert the provided batches + # the batches are not deterministic, so we cannot assert them here too thoroughly + # that would test tf.data service anyway, all we assert here is that worker and send_to_data_service + # work together nicely and produce a consumable dataset + self.assertEqual(self.size, len(actuals), msg="one 'actual batches' from each process") + + # in reuse_dataset and fcfs it might happen that one process gets all the data and one does not get any + if reuse_dataset and not round_robin: + self.assertTrue(any([len(actual) > 0 for actual in actuals]), msg='at least one process has at least one batch') + else: + self.assertEqual([True] * self.size, [len(actual) > 0 for actual in actuals], msg='each process has at least one batch') + + for actual in actuals: + self.assertEqual([True] * len(actual), [0 < len(batch) <= 128 for batch in actual], msg=f'all batches are at most 128 in size: {[len(batch) for batch in actual]}') + for batch in actual: + self.assertEqual([True] * len(batch), [0 <= i < 1024 for i in batch], msg=f'values in batch must be within [0..1024): {batch}') + + finally: + # shutdown compute service + if self.rank == 0: + logging.debug('sending shutdown request') + compute = compute_config.compute_client(verbose=2) + compute.shutdown() + logging.debug('shutdown request sent') + + # in round robin mode, the worker process does not terminate once stopped until some high timeout + if not (reuse_dataset and round_robin): + # wait for the worker to terminate + logging.debug('waiting for worker to terminate') + worker.join(self.timeout) + + self.assertFalse(worker.is_alive()) + logging.debug('worker terminated') + + finally: + # remove the configfile as it will interfere with subsequent runs of this test + if self.rank == 0 and os.path.exists(configfile): + os.unlink(configfile) diff --git a/test/single/data/expected_buildkite_gpu_heads_pipeline.yaml b/test/single/data/expected_buildkite_gpu_heads_pipeline.yaml index 072a0a4e3f..1424735dcd 100644 --- a/test/single/data/expected_buildkite_gpu_heads_pipeline.yaml +++ b/test/single/data/expected_buildkite_gpu_heads_pipeline.yaml @@ -127,6 +127,24 @@ steps: automatic: true agents: queue: 2x-gpu-v572 +- label: ':tensorflow: Gloo TensorFlow 2.0 MNIST Data Service (test-gpu-gloo-py3_8-tfhead-keras_none-torchhead-mxnethead-pyspark3_2_1)' + command: bash -c "horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --gloo python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + artifact_paths: "artifacts/**" + env: + COMPOSE_HTTP_TIMEOUT: 300 + plugins: + - docker-compose#v3.5.0: + run: test-gpu-gloo-py3_8-tfhead-keras_none-torchhead-mxnethead-pyspark3_2_1 + volumes: "./artifacts:/artifacts" + config: docker-compose.test.yml + pull-retries: 3 + - ecr#v1.2.0: + login: true + timeout_in_minutes: 10 + retry: + automatic: true + agents: + queue: 2x-gpu-v572 - label: ':fire: Gloo PyTorch MNIST horovodrun (test-gpu-gloo-py3_8-tfhead-keras_none-torchhead-mxnethead-pyspark3_2_1)' command: horovodrun -np 2 -H localhost:2 --gloo python /horovod/examples/pytorch/pytorch_mnist.py --data-dir /data/pytorch_datasets artifact_paths: "artifacts/**" @@ -181,6 +199,24 @@ steps: automatic: true agents: queue: 2x-gpu-v572 +- label: ':spark: Spark TensorFlow 2.0 MNIST Data Service (test-gpu-gloo-py3_8-tfhead-keras_none-torchhead-mxnethead-pyspark3_2_1)' + command: bash -c "cd /horovod/examples/spark/tensorflow2; spark-submit --master \"local[2]\" \"/horovod/horovod/spark/tensorflow/compute_worker.py\" /tmp/compute.json & OMP_NUM_THREADS=1 /spark_env.sh spark-submit --master \"local[2]\" --py-files tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py,tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py tensorflow2_mnist_data_service.py /tmp/compute.json" + artifact_paths: "artifacts/**" + env: + COMPOSE_HTTP_TIMEOUT: 300 + plugins: + - docker-compose#v3.5.0: + run: test-gpu-gloo-py3_8-tfhead-keras_none-torchhead-mxnethead-pyspark3_2_1 + volumes: "./artifacts:/artifacts" + config: docker-compose.test.yml + pull-retries: 3 + - ecr#v1.2.0: + login: true + timeout_in_minutes: 10 + retry: + automatic: true + agents: + queue: 2x-gpu-v572 - label: ':spark: Spark Torch MNIST (test-gpu-gloo-py3_8-tfhead-keras_none-torchhead-mxnethead-pyspark3_2_1)' command: bash -c "OMP_NUM_THREADS=1 /spark_env.sh python /horovod/examples/spark/pytorch/pytorch_spark_mnist.py --num-proc 2 --work-dir /work --data-dir /data --epochs 3" artifact_paths: "artifacts/**" diff --git a/test/single/data/expected_buildkite_gpu_non_heads_pipeline.yaml b/test/single/data/expected_buildkite_gpu_non_heads_pipeline.yaml index 2ce0ae3c55..32643302a2 100644 --- a/test/single/data/expected_buildkite_gpu_non_heads_pipeline.yaml +++ b/test/single/data/expected_buildkite_gpu_non_heads_pipeline.yaml @@ -716,6 +716,24 @@ steps: automatic: true agents: queue: 2x-gpu-v572 +- label: ':tensorflow: Gloo TensorFlow 2.0 MNIST Data Service (test-gpu-gloo-py3_8-tf2_6_5-keras2_6_0-torch1_9_1-mxnet1_7_0_p1-pyspark3_2_1)' + command: bash -c "horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --gloo python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + artifact_paths: "artifacts/**" + env: + COMPOSE_HTTP_TIMEOUT: 300 + plugins: + - docker-compose#v3.5.0: + run: test-gpu-gloo-py3_8-tf2_6_5-keras2_6_0-torch1_9_1-mxnet1_7_0_p1-pyspark3_2_1 + volumes: "./artifacts:/artifacts" + config: docker-compose.test.yml + pull-retries: 3 + - ecr#v1.2.0: + login: true + timeout_in_minutes: 10 + retry: + automatic: true + agents: + queue: 2x-gpu-v572 - label: ':fire: Gloo PyTorch MNIST horovodrun (test-gpu-gloo-py3_8-tf2_6_5-keras2_6_0-torch1_9_1-mxnet1_7_0_p1-pyspark3_2_1)' command: horovodrun -np 2 -H localhost:2 --gloo python /horovod/examples/pytorch/pytorch_mnist.py --data-dir /data/pytorch_datasets artifact_paths: "artifacts/**" @@ -770,6 +788,24 @@ steps: automatic: true agents: queue: 2x-gpu-v572 +- label: ':spark: Spark TensorFlow 2.0 MNIST Data Service (test-gpu-gloo-py3_8-tf2_6_5-keras2_6_0-torch1_9_1-mxnet1_7_0_p1-pyspark3_2_1)' + command: bash -c "cd /horovod/examples/spark/tensorflow2; spark-submit --master \"local[2]\" \"/horovod/horovod/spark/tensorflow/compute_worker.py\" /tmp/compute.json & OMP_NUM_THREADS=1 /spark_env.sh spark-submit --master \"local[2]\" --py-files tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py,tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py tensorflow2_mnist_data_service.py /tmp/compute.json" + artifact_paths: "artifacts/**" + env: + COMPOSE_HTTP_TIMEOUT: 300 + plugins: + - docker-compose#v3.5.0: + run: test-gpu-gloo-py3_8-tf2_6_5-keras2_6_0-torch1_9_1-mxnet1_7_0_p1-pyspark3_2_1 + volumes: "./artifacts:/artifacts" + config: docker-compose.test.yml + pull-retries: 3 + - ecr#v1.2.0: + login: true + timeout_in_minutes: 10 + retry: + automatic: true + agents: + queue: 2x-gpu-v572 - label: ':spark: Spark Torch MNIST (test-gpu-gloo-py3_8-tf2_6_5-keras2_6_0-torch1_9_1-mxnet1_7_0_p1-pyspark3_2_1)' command: bash -c "OMP_NUM_THREADS=1 /spark_env.sh python /horovod/examples/spark/pytorch/pytorch_spark_mnist.py --num-proc 2 --work-dir /work --data-dir /data --epochs 3" artifact_paths: "artifacts/**" @@ -860,6 +896,24 @@ steps: automatic: true agents: queue: 2x-gpu-v572 +- label: ':tensorflow: Gloo TensorFlow 2.0 MNIST Data Service (test-gpu-gloo-py3_8-tf2_7_3-keras2_7_0-torch1_10_2-mxnet1_8_0_p0-pyspark3_2_1)' + command: bash -c "horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --gloo python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + artifact_paths: "artifacts/**" + env: + COMPOSE_HTTP_TIMEOUT: 300 + plugins: + - docker-compose#v3.5.0: + run: test-gpu-gloo-py3_8-tf2_7_3-keras2_7_0-torch1_10_2-mxnet1_8_0_p0-pyspark3_2_1 + volumes: "./artifacts:/artifacts" + config: docker-compose.test.yml + pull-retries: 3 + - ecr#v1.2.0: + login: true + timeout_in_minutes: 10 + retry: + automatic: true + agents: + queue: 2x-gpu-v572 - label: ':fire: Gloo PyTorch MNIST horovodrun (test-gpu-gloo-py3_8-tf2_7_3-keras2_7_0-torch1_10_2-mxnet1_8_0_p0-pyspark3_2_1)' command: horovodrun -np 2 -H localhost:2 --gloo python /horovod/examples/pytorch/pytorch_mnist.py --data-dir /data/pytorch_datasets artifact_paths: "artifacts/**" @@ -914,6 +968,24 @@ steps: automatic: true agents: queue: 2x-gpu-v572 +- label: ':spark: Spark TensorFlow 2.0 MNIST Data Service (test-gpu-gloo-py3_8-tf2_7_3-keras2_7_0-torch1_10_2-mxnet1_8_0_p0-pyspark3_2_1)' + command: bash -c "cd /horovod/examples/spark/tensorflow2; spark-submit --master \"local[2]\" \"/horovod/horovod/spark/tensorflow/compute_worker.py\" /tmp/compute.json & OMP_NUM_THREADS=1 /spark_env.sh spark-submit --master \"local[2]\" --py-files tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py,tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py tensorflow2_mnist_data_service.py /tmp/compute.json" + artifact_paths: "artifacts/**" + env: + COMPOSE_HTTP_TIMEOUT: 300 + plugins: + - docker-compose#v3.5.0: + run: test-gpu-gloo-py3_8-tf2_7_3-keras2_7_0-torch1_10_2-mxnet1_8_0_p0-pyspark3_2_1 + volumes: "./artifacts:/artifacts" + config: docker-compose.test.yml + pull-retries: 3 + - ecr#v1.2.0: + login: true + timeout_in_minutes: 10 + retry: + automatic: true + agents: + queue: 2x-gpu-v572 - label: ':spark: Spark Torch MNIST (test-gpu-gloo-py3_8-tf2_7_3-keras2_7_0-torch1_10_2-mxnet1_8_0_p0-pyspark3_2_1)' command: bash -c "OMP_NUM_THREADS=1 /spark_env.sh python /horovod/examples/spark/pytorch/pytorch_spark_mnist.py --num-proc 2 --work-dir /work --data-dir /data --epochs 3" artifact_paths: "artifacts/**" @@ -1004,6 +1076,24 @@ steps: automatic: true agents: queue: 2x-gpu-v572 +- label: ':tensorflow: Gloo TensorFlow 2.0 MNIST Data Service (test-gpu-openmpi-gloo-py3_8-tf2_8_2-keras2_8_0-torch1_11_0-mxnet1_9_1-pyspark3_2_1)' + command: bash -c "horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --gloo python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + artifact_paths: "artifacts/**" + env: + COMPOSE_HTTP_TIMEOUT: 300 + plugins: + - docker-compose#v3.5.0: + run: test-gpu-openmpi-gloo-py3_8-tf2_8_2-keras2_8_0-torch1_11_0-mxnet1_9_1-pyspark3_2_1 + volumes: "./artifacts:/artifacts" + config: docker-compose.test.yml + pull-retries: 3 + - ecr#v1.2.0: + login: true + timeout_in_minutes: 10 + retry: + automatic: true + agents: + queue: 2x-gpu-v572 - label: ':fire: Gloo PyTorch MNIST horovodrun (test-gpu-openmpi-gloo-py3_8-tf2_8_2-keras2_8_0-torch1_11_0-mxnet1_9_1-pyspark3_2_1)' command: horovodrun -np 2 -H localhost:2 --gloo python /horovod/examples/pytorch/pytorch_mnist.py --data-dir /data/pytorch_datasets artifact_paths: "artifacts/**" @@ -1148,6 +1238,42 @@ steps: automatic: true agents: queue: 2x-gpu-v572 +- label: ':tensorflow: MPI TensorFlow 2.0 MNIST Data Service (test-gpu-openmpi-gloo-py3_8-tf2_8_2-keras2_8_0-torch1_11_0-mxnet1_9_1-pyspark3_2_1)' + command: bash -c " horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --mpi python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + artifact_paths: "artifacts/**" + env: + COMPOSE_HTTP_TIMEOUT: 300 + plugins: + - docker-compose#v3.5.0: + run: test-gpu-openmpi-gloo-py3_8-tf2_8_2-keras2_8_0-torch1_11_0-mxnet1_9_1-pyspark3_2_1 + volumes: "./artifacts:/artifacts" + config: docker-compose.test.yml + pull-retries: 3 + - ecr#v1.2.0: + login: true + timeout_in_minutes: 10 + retry: + automatic: true + agents: + queue: 2x-gpu-v572 +- label: ':spark: Spark TensorFlow 2.0 MNIST Data Service (test-gpu-openmpi-gloo-py3_8-tf2_8_2-keras2_8_0-torch1_11_0-mxnet1_9_1-pyspark3_2_1)' + command: bash -c "cd /horovod/examples/spark/tensorflow2; spark-submit --master \"local[2]\" \"/horovod/horovod/spark/tensorflow/compute_worker.py\" /tmp/compute.json & OMP_NUM_THREADS=1 /spark_env.sh spark-submit --master \"local[2]\" --py-files tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py,tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py tensorflow2_mnist_data_service.py /tmp/compute.json" + artifact_paths: "artifacts/**" + env: + COMPOSE_HTTP_TIMEOUT: 300 + plugins: + - docker-compose#v3.5.0: + run: test-gpu-openmpi-gloo-py3_8-tf2_8_2-keras2_8_0-torch1_11_0-mxnet1_9_1-pyspark3_2_1 + volumes: "./artifacts:/artifacts" + config: docker-compose.test.yml + pull-retries: 3 + - ecr#v1.2.0: + login: true + timeout_in_minutes: 10 + retry: + automatic: true + agents: + queue: 2x-gpu-v572 - label: ':spark: Spark Torch MNIST (test-gpu-openmpi-gloo-py3_8-tf2_8_2-keras2_8_0-torch1_11_0-mxnet1_9_1-pyspark3_2_1)' command: bash -c "OMP_NUM_THREADS=1 /spark_env.sh python /horovod/examples/spark/pytorch/pytorch_spark_mnist.py --num-proc 2 --work-dir /work --data-dir /data --epochs 3" artifact_paths: "artifacts/**" @@ -1238,6 +1364,24 @@ steps: automatic: true agents: queue: 2x-gpu-v572 +- label: ':tensorflow: Gloo TensorFlow 2.0 MNIST Data Service (test-mixed-openmpi-gloo-py3_8-tf2_8_2-keras2_8_0-torch1_11_0-mxnet1_9_1-pyspark3_2_1)' + command: bash -c "horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --gloo python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + artifact_paths: "artifacts/**" + env: + COMPOSE_HTTP_TIMEOUT: 300 + plugins: + - docker-compose#v3.5.0: + run: test-mixed-openmpi-gloo-py3_8-tf2_8_2-keras2_8_0-torch1_11_0-mxnet1_9_1-pyspark3_2_1 + volumes: "./artifacts:/artifacts" + config: docker-compose.test.yml + pull-retries: 3 + - ecr#v1.2.0: + login: true + timeout_in_minutes: 10 + retry: + automatic: true + agents: + queue: 2x-gpu-v572 - label: ':fire: Gloo PyTorch MNIST horovodrun (test-mixed-openmpi-gloo-py3_8-tf2_8_2-keras2_8_0-torch1_11_0-mxnet1_9_1-pyspark3_2_1)' command: horovodrun -np 2 -H localhost:2 --gloo python /horovod/examples/pytorch/pytorch_mnist.py --data-dir /data/pytorch_datasets artifact_paths: "artifacts/**" @@ -1382,6 +1526,42 @@ steps: automatic: true agents: queue: 2x-gpu-v572 +- label: ':tensorflow: MPI TensorFlow 2.0 MNIST Data Service (test-mixed-openmpi-gloo-py3_8-tf2_8_2-keras2_8_0-torch1_11_0-mxnet1_9_1-pyspark3_2_1)' + command: bash -c " horovodrun -np 2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json & horovodrun -np 2 --mpi python /horovod/examples/tensorflow2/tensorflow2_mnist_data_service.py /tmp/compute.json" + artifact_paths: "artifacts/**" + env: + COMPOSE_HTTP_TIMEOUT: 300 + plugins: + - docker-compose#v3.5.0: + run: test-mixed-openmpi-gloo-py3_8-tf2_8_2-keras2_8_0-torch1_11_0-mxnet1_9_1-pyspark3_2_1 + volumes: "./artifacts:/artifacts" + config: docker-compose.test.yml + pull-retries: 3 + - ecr#v1.2.0: + login: true + timeout_in_minutes: 10 + retry: + automatic: true + agents: + queue: 2x-gpu-v572 +- label: ':spark: Spark TensorFlow 2.0 MNIST Data Service (test-mixed-openmpi-gloo-py3_8-tf2_8_2-keras2_8_0-torch1_11_0-mxnet1_9_1-pyspark3_2_1)' + command: bash -c "cd /horovod/examples/spark/tensorflow2; spark-submit --master \"local[2]\" \"/horovod/horovod/spark/tensorflow/compute_worker.py\" /tmp/compute.json & OMP_NUM_THREADS=1 /spark_env.sh spark-submit --master \"local[2]\" --py-files tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py,tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py tensorflow2_mnist_data_service.py /tmp/compute.json" + artifact_paths: "artifacts/**" + env: + COMPOSE_HTTP_TIMEOUT: 300 + plugins: + - docker-compose#v3.5.0: + run: test-mixed-openmpi-gloo-py3_8-tf2_8_2-keras2_8_0-torch1_11_0-mxnet1_9_1-pyspark3_2_1 + volumes: "./artifacts:/artifacts" + config: docker-compose.test.yml + pull-retries: 3 + - ecr#v1.2.0: + login: true + timeout_in_minutes: 10 + retry: + automatic: true + agents: + queue: 2x-gpu-v572 - label: ':spark: Spark Torch MNIST (test-mixed-openmpi-gloo-py3_8-tf2_8_2-keras2_8_0-torch1_11_0-mxnet1_9_1-pyspark3_2_1)' command: bash -c "OMP_NUM_THREADS=1 /spark_env.sh python /horovod/examples/spark/pytorch/pytorch_spark_mnist.py --num-proc 2 --work-dir /work --data-dir /data --epochs 3" artifact_paths: "artifacts/**" diff --git a/test/single/test_compute_service.py b/test/single/test_compute_service.py new file mode 100644 index 0000000000..69c6caa5c4 --- /dev/null +++ b/test/single/test_compute_service.py @@ -0,0 +1,223 @@ +# Copyright 2022 G-Research, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +import unittest +from distutils.version import LooseVersion +from queue import Queue + +import tensorflow as tf + +from horovod.runner.common.service.compute_service import ComputeService, ComputeClient +from horovod.runner.common.util import secret +from horovod.runner.common.util.timeout import TimeoutException +from horovod.runner.util.threads import in_thread + +_PRE_TF_2_0_0 = LooseVersion(tf.__version__) < LooseVersion("2.0.0") + + +class ComputeServiceTest(unittest.TestCase): + + @staticmethod + def wait_for_dispatcher(client, dispatcher_id, queue): + def _wait(): + queue.put((dispatcher_id, client.wait_for_dispatcher_registration(dispatcher_id, 10))) + return in_thread(_wait, daemon=True) + + @staticmethod + def wait_for_dispatcher_workers(client, dispatcher_id, queue): + def _wait(): + client.wait_for_dispatcher_worker_registration(dispatcher_id, 10) + queue.put(dispatcher_id) + return in_thread(_wait, daemon=True) + + @staticmethod + def wait_for_shutdown(client, queue): + def _wait(): + client.wait_for_shutdown() + queue.put(True) + return in_thread(_wait, daemon=True) + + @staticmethod + def get_all(queue): + while not queue.empty(): + yield queue.get_nowait() + + def test_good_path(self): + for dispatchers_num, workers_per_dispatcher in [ + (1, 1), (1, 2), (1, 4), + (2, 1), (2, 2), (2, 4), + (32, 16), (1, 512) + ]: + with self.subTest(dispatchers=dispatchers_num, workers_per_dispatcher=workers_per_dispatcher): + key = secret.make_secret_key() + service = ComputeService(dispatchers_num, workers_per_dispatcher, key, nics=None) + try: + client = ComputeClient(service.addresses(), key, verbose=2) + + # create thread waiting for shutdown + shutdown = Queue() + shutdown_thread = self.wait_for_shutdown(client, shutdown) + + # dispatcher registration + # start threads that wait for dispatchers + threads = [] + dispatchers = Queue() + for id in range(dispatchers_num): + threads.append(self.wait_for_dispatcher(client, id, dispatchers)) + + # register dispatchers + for id in range(dispatchers_num): + client.register_dispatcher(id, f'grpc://localhost:{10000+id}') + + # check threads terminate + for thread in threads: + thread.join(10) + self.assertFalse(thread.is_alive(), msg="threads waiting for dispatchers did not terminate") + + # check reported dispatcher addresses + self.assertEqual([(id, f'grpc://localhost:{10000+id}') for id in range(dispatchers_num)], + sorted(self.get_all(dispatchers))) + + # worker registration + # start threads to wait for dispatcher worker registration + threads = [] + dispatchers = Queue() + for id in range(dispatchers_num): + threads.append(self.wait_for_dispatcher_workers(client, id, dispatchers)) + + # register dispatcher workers + for id in range(dispatchers_num * workers_per_dispatcher): + client.register_worker_for_dispatcher(dispatcher_id=id // workers_per_dispatcher, worker_id=id) + + # check threads terminate + for thread in threads: + thread.join(10) + self.assertFalse(thread.is_alive(), msg="threads waiting for dispatchers' workers did not terminate") + + # check reported dispatcher success + self.assertEqual(sorted(range(dispatchers_num)), sorted(self.get_all(dispatchers))) + + # shutdown and wait for shutdown + self.assertTrue(shutdown_thread.is_alive(), msg="thread waiting for shutdown, terminated early") + client.shutdown() + shutdown_thread.join(10) + self.assertFalse(shutdown_thread.is_alive(), msg="thread waiting for shutdown did not terminate") + self.assertEqual([True], list(self.get_all(shutdown))) + finally: + service.shutdown() + + def test_invalid_dispatcher_ids(self): + key = secret.make_secret_key() + service = ComputeService(2, 4, key, nics=None) + try: + client = ComputeClient(service.addresses(), key, verbose=2) + + with self.assertRaises(IndexError): + client.register_dispatcher(-1, 'grpc://localhost:10000') + with self.assertRaises(IndexError): + client.register_dispatcher(2, 'grpc://localhost:10000') + + with self.assertRaises(IndexError): + client.wait_for_dispatcher_registration(-1, 0.1) + with self.assertRaises(IndexError): + client.wait_for_dispatcher_registration(2, 0.1) + + with self.assertRaises(IndexError): + client.register_worker_for_dispatcher(-1, 0) + with self.assertRaises(IndexError): + client.register_worker_for_dispatcher(2, 0) + + with self.assertRaises(IndexError): + client.wait_for_dispatcher_worker_registration(-1, 0.1) + with self.assertRaises(IndexError): + client.wait_for_dispatcher_worker_registration(2, 0.1) + finally: + service.shutdown() + + def test_register_dispatcher_duplicate(self): + key = secret.make_secret_key() + service = ComputeService(2, 1, key, nics=None) + try: + client = ComputeClient(service.addresses(), key, verbose=2) + client.register_dispatcher(0, 'grpc://localhost:10000') + with self.assertRaisesRegex(ValueError, 'Dispatcher with id 0 has already been registered under ' + 'different address grpc://localhost:10000: grpc://localhost:10001'): + client.register_dispatcher(0, 'grpc://localhost:10001') + finally: + service.shutdown() + + def test_register_dispatcher_replay(self): + key = secret.make_secret_key() + service = ComputeService(2, 1, key, nics=None) + try: + client = ComputeClient(service.addresses(), key, verbose=2) + client.register_dispatcher(0, 'grpc://localhost:10000') + client.wait_for_dispatcher_registration(0, timeout=2) + + # register the same dispatcher again should not interfere the registration of second dispatcher + client.register_dispatcher(0, 'grpc://localhost:10000') + with self.assertRaises(TimeoutException): + client.wait_for_dispatcher_registration(1, timeout=2) + + # registering the second dispatcher completes registration + client.register_dispatcher(1, 'grpc://localhost:10001') + client.wait_for_dispatcher_registration(0, timeout=2) + client.wait_for_dispatcher_registration(1, timeout=2) + finally: + service.shutdown() + + def test_register_dispatcher_worker_replay(self): + key = secret.make_secret_key() + service = ComputeService(1, 2, key, nics=None) + try: + client = ComputeClient(service.addresses(), key, verbose=2) + client.register_dispatcher(0, 'grpc://localhost:10000') + client.register_worker_for_dispatcher(0, 0) + + # register the same worker again should not complete the registration + client.register_worker_for_dispatcher(0, 0) + with self.assertRaises(TimeoutException): + client.wait_for_dispatcher_worker_registration(0, timeout=2) + + # registering the second dispatcher completes registration + client.register_worker_for_dispatcher(0, 1) + client.wait_for_dispatcher_worker_registration(0, timeout=2) + finally: + service.shutdown() + + def test_register_dispatcher_timeout(self): + key = secret.make_secret_key() + service = ComputeService(1, 1, key, nics=None) + try: + client = ComputeClient(service.addresses(), key, verbose=2) + with self.assertRaisesRegex(TimeoutException, + expected_regex='Timed out waiting for dispatcher 0 to register. ' + 'Try to find out what takes the dispatcher so long ' + 'to register or increase timeout. Timeout after 0.1 seconds.'): + client.wait_for_dispatcher_registration(0, timeout=0.1) + finally: + service.shutdown() + + def test_register_dispatcher_worker_timeout(self): + key = secret.make_secret_key() + service = ComputeService(1, 1, key, nics=None) + try: + client = ComputeClient(service.addresses(), key, verbose=2) + with self.assertRaisesRegex(TimeoutException, + expected_regex='Timed out waiting for workers for dispatcher 0 to register. ' + 'Try to find out what takes the workers so long ' + 'to register or increase timeout. Timeout after 0.1 seconds.'): + client.wait_for_dispatcher_worker_registration(0, timeout=0.1) + finally: + service.shutdown()