Skip to content

Commit

Permalink
Merge pull request #74 from SKA-ScienceDataProcessor/cluster_test
Browse files Browse the repository at this point in the history
Cluster_tests
  • Loading branch information
timcornwell committed Nov 12, 2019
2 parents 1bbb8ef + 014641e commit 2df0ddf
Show file tree
Hide file tree
Showing 16 changed files with 119 additions and 36 deletions.
13 changes: 13 additions & 0 deletions cluster_tests/Makefile
@@ -0,0 +1,13 @@

.PHONY: clean
clean:
make -C dask_test clean
make -C test_image clean
make -C ritoy clean

.PHONY: test
clean:
make -C dask_test test
make -C test_image test
make -C ritoy test

15 changes: 15 additions & 0 deletions cluster_tests/README.md
@@ -0,0 +1,15 @@

These are tests of the ability to run dask in various contexts. These are intended to be run in an initial port and
whenever the cluster configuration has changed. The tests should be runnable from the commmand line and via SLURM
scripts.

- cluster_dask_test is python + Dask
- cluster_image_test is python + Dask + ARL
- ritoy is python + Dask
- ritoy_numba is python + Dask + numba

Known exceptions

- ritoy_numba fails on P3

Tim Cornwell 12/11/2019
15 changes: 15 additions & 0 deletions cluster_tests/dask_test/Makefile
@@ -0,0 +1,15 @@

PYFILE = cluster_dask_test.py

.PHONY: clean
clean:
\rm -rf *.log slurm*.out dask-worker-space hostfile.*

.PHONY: test
test: ${PYFILE}
python ${PYFILE}





14 changes: 7 additions & 7 deletions cluster_tests/dask_test/cluster_dask_test.py
@@ -1,5 +1,6 @@
import random
import time
import sys
import os

from dask.distributed import Client
Expand All @@ -21,14 +22,13 @@ def add(x, y):
return x + y

print("Starting cluster_dask_test")
scheduler = os.getenv('ARL_DASK_SCHEDULER', None)
if scheduler is not None:
print("Creating Dask Client using externally defined scheduler")
# We pass in the scheduler from the invoking script
if len(sys.argv) > 1:
scheduler = sys.argv[1]
client = Client(scheduler)
else:
print("Using Dask on this computer")
client = Client(threads_per_worker=1, n_workers=8)

client = Client()

import dask
inc = dask.delayed(inc)
dec = dask.delayed(dec)
Expand All @@ -52,7 +52,7 @@ def add(x, y):

result = dask.compute(L, sync=True)
assert result[0][0] == 65536
print("Finished cluster_dask_test correctly")
print("Successfully finished cluster_dask_test")

client.close()

Expand Down
6 changes: 2 additions & 4 deletions cluster_tests/dask_test/cluster_dask_test_csd3.slurm
Expand Up @@ -75,11 +75,9 @@ for host in `cat hostfile.$JOBID`; do
done
echo "Scheduler and workers now running"

#! We need to tell dask Client (inside python) where the scheduler is running
export ARL_DASK_SCHEDULER=${scheduler}:8786
echo "Scheduler is running at ${scheduler}"
echo "Scheduler is running at ${scheduler}:8786"

CMD="python cluster_dask_test.py | tee cluster_dask_test.log"
CMD="python cluster_dask_test.py ${scheduler}:8786 | tee cluster_dask_test.log"

eval $CMD

3 changes: 1 addition & 2 deletions cluster_tests/dask_test/cluster_dask_test_p3.slurm
Expand Up @@ -72,10 +72,9 @@ done
echo "Scheduler and workers now running"

#! We need to tell dask Client (inside python) where the scheduler is running
export ARL_DASK_SCHEDULER=${scheduler}:8786
echo "Scheduler is running at ${scheduler}"

CMD="python cluster_dask_test.py | tee cluster_dask_test.log"
CMD="python cluster_dask_test.py ${scheduler}:8786 | tee cluster_dask_test.log"

eval $CMD

15 changes: 15 additions & 0 deletions cluster_tests/ritoy-numba/Makefile
@@ -0,0 +1,15 @@

PYFILE = cluster_test_ritoy_numba.py

.PHONY: clean
clean:
\rm -rf *.log slurm*.out dask-worker-space hostfile.*

.PHONY: test
test: ${PYFILE}
python ${PYFILE}





File renamed without changes.
Expand Up @@ -76,7 +76,7 @@ for host in `cat hostfile.$JOBID`; do
done
echo "Scheduler and workers now running"

CMD="python ./ritoy_numba.py ${scheduler}:8786 > ritoy_numba_${JOBID}.out"
CMD="python ./cluster_test_ritoy_numba.py ${scheduler}:8786 | tee cluster_test_ritoy_numba.log"
echo "About to execute $CMD"

eval $CMD
15 changes: 15 additions & 0 deletions cluster_tests/ritoy/Makefile
@@ -0,0 +1,15 @@

PYFILE = cluster_test_ritoy.py

.PHONY: clean
clean:
\rm -rf *.log slurm*.out dask-worker-space hostfile.*

.PHONY: test
test: ${PYFILE}
python ${PYFILE}





File renamed without changes.
2 changes: 1 addition & 1 deletion cluster_tests/ritoy/submit_ritoy_P3.slurm
Expand Up @@ -76,7 +76,7 @@ for host in `cat hostfile.$JOBID`; do
done
echo "Scheduler and workers now running"

CMD="python ./ritoy.py ${scheduler}:8786 > ritoy_${JOBID}.out"
CMD="python ./cluster_test_ritoy.py ${scheduler}:8786 | tee ritoy.log"
echo "About to execute $CMD"

eval $CMD
15 changes: 15 additions & 0 deletions cluster_tests/test_image/Makefile
@@ -0,0 +1,15 @@

PYFILE = cluster_test_image.py

.PHONY: clean
clean:
\rm -rf *.log slurm*.out dask-worker-space hostfile.*

.PHONY: test
test: ${PYFILE}
python ${PYFILE}





35 changes: 16 additions & 19 deletions cluster_tests/test_image/cluster_test_image.py
Expand Up @@ -8,6 +8,7 @@
import numpy
from astropy import units as u
from astropy.coordinates import SkyCoord
from distributed import Client

from data_models.polarisation import PolarisationFrame
from processing_components.imaging.base import create_image_from_visibility
Expand All @@ -17,23 +18,21 @@
from processing_components.visibility.coalesce import convert_blockvisibility_to_visibility
from workflows.arlexecute.image.image_arlexecute import image_arlexecute_map_workflow
from wrappers.arlexecute.execution_support.arlexecute import arlexecute
from wrappers.arlexecute.execution_support.dask_init import get_dask_Client
from wrappers.arlexecute.image.operations import export_image_to_fits

log = logging.getLogger()
log.setLevel(logging.DEBUG)
#log.addHandler(logging.StreamHandler(sys.stdout))
#log.addHandler(logging.StreamHandler(sys.stderr))
import asyncio

logging.getLogger('asyncio').setLevel(logging.WARNING)

if __name__ == '__main__':
client = get_dask_Client(threads_per_worker=1,
processes=True,
memory_limit=32 * 1024 * 1024 * 1024,
n_workers=8)

print("Starting cluster_test_image")
# We pass in the scheduler from the invoking script
if len(sys.argv) > 1:
scheduler = sys.argv[1]
client = Client(scheduler)
else:
client = Client()
arlexecute.set_client(client=client)

from data_models.parameters import arl_path
Expand All @@ -43,7 +42,6 @@
frequency = numpy.linspace(1e8, 1.5e8, 3)
channel_bandwidth = numpy.array([2.5e7, 2.5e7, 2.5e7])
flux = numpy.array([[100.0], [100.0], [100.0]])
phasecentre = SkyCoord(ra=+15.0 * u.deg, dec=-35.0 * u.deg, frame='icrs', equinox='J2000')
config = create_named_configuration('LOWBD2-CORE')
times = numpy.linspace(-300.0, 300.0, 3) * numpy.pi / 43200.0
nants = config.xyz.shape[0]
Expand All @@ -55,19 +53,18 @@
phasecentre = SkyCoord(ra=+15 * u.deg, dec=-45.0 * u.deg, frame='icrs', equinox='J2000')

bvis_graph = arlexecute.execute(create_blockvisibility)(config, times, frequency,
channel_bandwidth=channel_bandwidth,
phasecentre=phasecentre, weight=1.0,
polarisation_frame=PolarisationFrame('stokesI'))
channel_bandwidth=channel_bandwidth,
phasecentre=phasecentre, weight=1.0,
polarisation_frame=PolarisationFrame('stokesI'))
vis_graph = arlexecute.execute(convert_blockvisibility_to_visibility)(bvis_graph)

model_graph = arlexecute.execute(create_image_from_visibility)(vis_graph, npixel=4096, cellsize=0.001, override_cellsize=False)
model_graph = arlexecute.execute(create_image_from_visibility)(vis_graph, npixel=4096, cellsize=0.001,
override_cellsize=False)
beam = image_arlexecute_map_workflow(model_graph, create_pb, facets=16, pointingcentre=phasecentre,
telescope='MID')
beam = arlexecute.compute(beam, sync=True)
from time import sleep

sleep(10)
exit()

assert numpy.max(beam.data) > 0.0
export_image_to_fits(beam, "cluster_test_image.fits")

print("Successfully finished test_image")
exit(0)
4 changes: 2 additions & 2 deletions cluster_tests/test_image/cluster_test_image_csd3.slurm
Expand Up @@ -17,8 +17,8 @@
#SBATCH --nodes=4
#! How many (MPI) tasks will there be in total? (<= nodes*16)
#SBATCH --ntasks=16
#! Memory limit: P3 has roughly 107GB per node
#SBATCH --mem 100000
#! Memory limit
#SBATCH --mem 10000
#! How much wallclock time will be required?
#SBATCH --time=01:00:00
#! What types of email messages do you wish to receive?
Expand Down
1 change: 1 addition & 0 deletions wrappers/arlexecute/execution_support/arlexecutebase.py
Expand Up @@ -81,6 +81,7 @@ def set_client(self, client=None, use_dask=True, use_dlg=False, verbose=False, o
raise ValueError('use_dask and use_dlg cannot be specified together')

if isinstance(self._client, Client):
print("Removing existing client")
self.client.close()

if use_dask:
Expand Down

0 comments on commit 2df0ddf

Please sign in to comment.