Skip to content

Commit

Permalink
Resolved merge conflicts with master.
Browse files Browse the repository at this point in the history
  • Loading branch information
markkness committed Mar 7, 2014
2 parents 2dc610b + 19e43f3 commit 67ad79e
Show file tree
Hide file tree
Showing 42 changed files with 1,458 additions and 142 deletions.
4 changes: 3 additions & 1 deletion .travis.yml
Expand Up @@ -6,11 +6,13 @@ before_install:
- sudo apt-get update
- sudo apt-get install libopenmpi-dev
- sudo apt-get install openmpi-bin
- sudo apt-get install libhdf5-openmpi-dev
- pip install numpy
- pip install pyzmq --install-option="--zmq=bundled"
- pip install ipython
- pip install mpi4py --allow-all-external --allow-unverified mpi4py
- pip install six
- pip install cython
- CC=mpicc pip install git+https://github.com/h5py/h5py.git --install-option="--mpi"
- pip install sphinx
- pip install sphinxcontrib-napoleon
- pip install coverage
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Expand Up @@ -15,11 +15,11 @@ setup_cluster:
.PHONY: setup_cluster

test_client:
${PYTHON} -m unittest discover
${PYTHON} -m unittest discover -v
.PHONY: test_client

test_client_with_coverage:
${COVERAGE} run -pm unittest discover
${COVERAGE} run -pm unittest discover -v
.PHONY: test_client_with_coverage

test_engines:
Expand Down
34 changes: 26 additions & 8 deletions distarray/client.py
Expand Up @@ -13,16 +13,14 @@
# Imports
#----------------------------------------------------------------------------

import uuid
from itertools import product

import numpy as np
from six import next
from distarray.externals.six import next

from itertools import product

from IPython.parallel import Client
from distarray.utils import has_exactly_one

__all__ = ['DistArray', 'Context']
__all__ = ['DistArray']


#----------------------------------------------------------------------------
Expand Down Expand Up @@ -138,7 +136,7 @@ def __setitem__(self, index, value):

@property
def shape(self):
return self._get_attribute('shape')
return self._get_attribute('global_shape')

@property
def size(self):
Expand Down Expand Up @@ -226,12 +224,32 @@ def std(self, axis=None, dtype=None, out=None):
result = self.context._pull0(result_key)
return result

def get_localarrays(self):
def get_ndarrays(self):
"""Pull the local ndarrays from the engines.
Returns
-------
list of ndarrays
one ndarray per process
"""
key = self.context._generate_key()
self.context._execute('%s = %s.get_localarray()' % (key, self.key))
result = self.context._pull(key)
return result

def get_localarrays(self):
"""Pull the LocalArray objects from the engines.
Returns
-------
list of localarrays
one localarray per process
"""
result = self.context._pull(self.key)
return result

def get_localshapes(self):
key = self.context._generate_key()
self.context._execute('%s = %s.local_shape' % (key, self.key))
Expand Down
140 changes: 125 additions & 15 deletions distarray/context.py
Expand Up @@ -4,6 +4,8 @@
__docformat__ = "restructuredtext en"

import uuid
from distarray.externals import six
import collections

from IPython.parallel import Client
import numpy
Expand Down Expand Up @@ -145,33 +147,54 @@ def empty(self, shape, dtype=float, dist={0:'b'}, grid_shape=None):
)
return DistArray(da_key, self)

def save(self, filename, da):
def save(self, name, da):
"""
Save a distributed array to files in the ``.dnpy`` format.
Parameters
----------
filename : str
Prefix for filename used by each engine. Each engine will save a
file named ``<filename>_<comm_rank>.dnpy``.
name : str or list of str
If a str, this is used as the prefix for the filename used by each
engine. Each engine will save a file named
``<name>_<rank>.dnpy``.
If a list of str, each engine will use the name at the index
corresponding to its rank. An exception is raised if the length of
this list is not the same as the communicator's size.
da : DistArray
Array to save to files.
"""
subs = self._key_and_push(filename) + (da.key,)
self._execute(
'distarray.local.save(%s, %s)' % subs
)
if isinstance(name, six.string_types):
subs = self._key_and_push(name) + (da.key, da.key)
self._execute(
'distarray.local.save(%s + "_" + str(%s.comm_rank) + ".dnpy", %s)' % subs
)
elif isinstance(name, collections.Iterable):
if len(name) != len(self.targets):
errmsg = "`name` must be the same length as `self.targets`."
raise TypeError(errmsg)
subs = self._key_and_push(name) + (da.key, da.key)
self._execute(
'distarray.local.save(%s[%s.comm_rank], %s)' % subs
)
else:
errmsg = "`name` must be a string or a list."
raise TypeError(errmsg)


def load(self, filename):
def load(self, name):
"""
Load a distributed array from ``.dnpy`` files.
Parameters
----------
filename : str
Prefix used for the file saved by each engine. Each engine will
load a file named ``<filename>_<comm_rank>.dnpy``.
name : str or list of str
If a str, this is used as the prefix for the filename used by each
engine. Each engine will load a file named
``<name>_<rank>.dnpy``.
If a list of str, each engine will use the name at the index
corresponding to its rank. An exception is raised if the length of
this list is not the same as the communicator's size.
Returns
-------
Expand All @@ -180,11 +203,98 @@ def load(self, filename):
"""
da_key = self._generate_key()
subs = (da_key, filename, self._comm_key)
subs = (da_key, name, self._comm_key)

if isinstance(name, six.string_types):
subs = (da_key,) + self._key_and_push(name) + (self._comm_key,
self._comm_key)
self._execute(
'%s = distarray.local.load(%s + "_" + str(%s.Get_rank()) + ".dnpy", %s)' % subs
)
elif isinstance(name, collections.Iterable):
if len(name) != len(self.targets):
errmsg = "`name` must be the same length as `self.targets`."
raise TypeError(errmsg)
subs = (da_key,) + self._key_and_push(name) + (self._comm_key,
self._comm_key)
self._execute(
'%s = distarray.local.load(%s[%s.Get_rank()], %s)' % subs
)
else:
errmsg = "`name` must be a string or a list."
raise TypeError(errmsg)

return DistArray(da_key, self)

def save_hdf5(self, filename, da, key='buffer', mode='a'):
"""
Save a DistArray to a dataset in an ``.hdf5`` file.
Parameters
----------
filename : str
Name of file to write to.
da : DistArray
Array to save to a file.
key : str, optional
The identifier for the group to save the DistArray to (the default
is 'buffer').
mode : optional, {'w', 'w-', 'a'}, default 'a'
``'w'``
Create file, truncate if exists
``'w-'``
Create file, fail if exists
``'a'``
Read/write if exists, create otherwise (default)
"""
try:
# this is just an early check,
# h5py isn't necessary until the local call on the engines
import h5py
except ImportError:
errmsg = "An MPI-enabled h5py must be available to use save_hdf5."
raise ImportError(errmsg)

subs = (self._key_and_push(filename) + (da.key,) +
self._key_and_push(key, mode))
self._execute(
'%s = distarray.local.load("%s", comm=%s)' % subs
'distarray.local.save_hdf5(%s, %s, %s, %s)' % subs
)
return DistArray(da_key, self)

def load_hdf5(self, filename, key='buffer', dist={0: 'b'},
grid_shape=None):
"""
Load a DistArray from a dataset in an ``.hdf5`` file.
Parameters
----------
filename : str
Filename to load.
key : str, optional
The identifier for the group to load the DistArray from (the
default is 'buffer').
dist : dict of int->str, optional
Distribution of loaded DistArray.
grid_shape : tuple of int, optional
Shape of process grid.
Returns
-------
result : DistArray
A DistArray encapsulating the file loaded.
"""
try:
import h5py
except ImportError:
errmsg = "An MPI-enabled h5py must be available to use load_hdf5."
raise ImportError(errmsg)

with h5py.File(filename, "r") as fp:
da = self.fromndarray(fp[key], dist=dist, grid_shape=grid_shape)

return da

def fromndarray(self, arr, dist={0: 'b'}, grid_shape=None):
"""Convert an ndarray to a distarray."""
Expand Down
10 changes: 10 additions & 0 deletions distarray/externals/__init__.py
@@ -0,0 +1,10 @@
# encoding: utf-8

__docformat__ = "restructuredtext en"

#----------------------------------------------------------------------------
# Copyright (C) 2008-2014, IPython Development Team and Enthought, Inc.
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#----------------------------------------------------------------------------

0 comments on commit 67ad79e

Please sign in to comment.