Skip to content

Commit

Permalink
better parallelization for matter power spectrum
Browse files Browse the repository at this point in the history
  • Loading branch information
apetri committed May 11, 2016
1 parent 1aa82bf commit b35f8bb
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 15 deletions.
27 changes: 23 additions & 4 deletions lenstools/scripts/nbody.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def powerSpectrumExecution():
################Snapshot power spectrum#########################
################################################################

def matterPowerSpectrum(pool,batch,settings,id,**kwargs):
def matterPowerSpectrum(pool,batch,settings,node_id,**kwargs):

assert "fmt" in kwargs.keys()
fmt = kwargs["fmt"]
Expand All @@ -55,7 +55,7 @@ def matterPowerSpectrum(pool,batch,settings,id,**kwargs):
assert isinstance(settings,PowerSpectrumSettings)

#Split the id into the model,collection and realization parts
cosmo_id,geometry_id = id.split("|")
cosmo_id,geometry_id = node_id.split("|")

#Get a handle on the simulation model
model = batch.getModel(cosmo_id)
Expand Down Expand Up @@ -88,6 +88,14 @@ def matterPowerSpectrum(pool,batch,settings,id,**kwargs):
#Construct the array of bin edges
k_egdes = np.linspace(settings.kmin,settings.kmax,settings.num_k_bins+1).to(model.Mpc_over_h**-1)

#Placeholder for the density MPI communications
density_placeholder = np.empty((settings.fft_grid_size,)*3,dtype=np.float32)
if pool is not None:
pool.openWindow(density_placeholder)

if pool.is_master():
logdriver.debug("Opened density window of type {0}".format(pool._window_type))

#Cycle over snapshots
for n in range(settings.first_snapshot,settings.last_snapshot+1):

Expand Down Expand Up @@ -115,7 +123,7 @@ def matterPowerSpectrum(pool,batch,settings,id,**kwargs):
sys.exit(1)

snap = fmt.open(realization.snapshotPath(n,sub=None),pool=pool)
k,power_ensemble[r],hits = snap.powerSpectrum(k_egdes,resolution=settings.fft_grid_size,return_num_modes=True)
k,power_ensemble[r],hits = snap.powerSpectrum(k_egdes,resolution=settings.fft_grid_size,return_num_modes=True,density_placeholder=density_placeholder)
snap.close()

#Safety barrier sync
Expand Down Expand Up @@ -145,8 +153,19 @@ def matterPowerSpectrum(pool,batch,settings,id,**kwargs):
if pool is not None:
pool.comm.Barrier()

###########
#Completed#
###########

#Close the RMA window
if pool is not None:
pool.comm.Barrier()
pool.closeWindow()

if pool.is_master():
logdriver.debug("Closed density window of type {0}".format(pool._window_type))


#Completed
if pool is None or pool.is_master():
logdriver.info("DONE!!")

Expand Down
28 changes: 21 additions & 7 deletions lenstools/simulations/nbody.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ def setVelocities(self,velocities):
self.velocities = velocities


def massDensity(self,resolution=0.5*Mpc,smooth=None,left_corner=None,save=False):
def massDensity(self,resolution=0.5*Mpc,smooth=None,left_corner=None,save=False,density_placeholder=None):

"""
Uses a C backend gridding function to compute the matter mass density fluctutation for the current snapshot: the density is evaluated using a nearest neighbor search
Expand All @@ -413,6 +413,9 @@ def massDensity(self,resolution=0.5*Mpc,smooth=None,left_corner=None,save=False)
:param save: if True saves the density histogram and resolution as instance attributes
:type save: bool.
:param density placeholder: if not None, it is used as a fixed memory chunk for MPI communications of the density
:type density_placeholder: array
:returns: tuple(numpy 3D array with the (unsmoothed) matter density fluctuation on a grid,bin resolution along the axes)
"""
Expand Down Expand Up @@ -473,10 +476,18 @@ def massDensity(self,resolution=0.5*Mpc,smooth=None,left_corner=None,save=False)

#Accumulate from the other processors
if self.pool is not None:

if density_placeholder is not None:

density_placeholder[:] = density
self.pool.comm.Barrier()
self.pool.accumulate()

else:

self.pool.openWindow(density)
self.pool.accumulate()
self.pool.closeWindow()
self.pool.openWindow(density)
self.pool.accumulate()
self.pool.closeWindow()

#Recompute resolution to make sure it represents the bin size correctly
bin_resolution = ((xi[1:]-xi[:-1]).mean() * positions.unit,(yi[1:]-yi[:-1]).mean() * positions.unit,(zi[1:]-zi[:-1]).mean() * positions.unit)
Expand Down Expand Up @@ -1234,7 +1245,7 @@ def lensMaxSize(self):
#############################################################################################################################################


def powerSpectrum(self,k_edges,resolution=None,return_num_modes=False):
def powerSpectrum(self,k_edges,resolution=None,return_num_modes=False,density_placeholder=None):

"""
Computes the power spectrum of the relative density fluctuations in the snapshot at the wavenumbers specified by k_edges; a discrete particle number density is computed before hand to prepare the FFT grid
Expand All @@ -1248,6 +1259,9 @@ def powerSpectrum(self,k_edges,resolution=None,return_num_modes=False):
:param return_num_modes: if True returns the mode counting for each k bin as the last element in the return tuple
:type return_num_modes: bool.
:param density placeholder: if not None, it is used as a fixed memory chunk for MPI communications in the density calculations
:type density_placeholder: array
:returns: tuple(k_values(bin centers),power spectrum at the specified k_values)
"""
Expand All @@ -1264,7 +1278,7 @@ def powerSpectrum(self,k_edges,resolution=None,return_num_modes=False):

#Compute the gridded number density
if not hasattr(self,"density"):
density,bin_resolution = self.massDensity(resolution=resolution)
density,bin_resolution = self.massDensity(resolution=resolution,density_placeholder=density_placeholder)
else:
assert resolution is None,"The spatial resolution is already specified in the attributes of this instance! Call massDensity() to modify!"
density,bin_resolution = self.density,self.resolution
Expand All @@ -1280,7 +1294,7 @@ def powerSpectrum(self,k_edges,resolution=None,return_num_modes=False):

#Sanity check on maximum k: maximum is limited by the grid resolution
if k_edges.max() > k_max:
print("Your grid resolution is too low to compute accurately the power on {0} (maximum recommended {1}, distortions might start to appear already at {2}): results might be inaccurate".format(k_edges.max(),k_max,k_max_recommended))
logstderr.warning("Your grid resolution is too low to compute accurately the power on {0} (maximum recommended {1}, distortions might start to appear already at {2}): results might be inaccurate".format(k_edges.max(),k_max,k_max_recommended))

#Perform the FFT
density_ft = fftengine.rfftn(density)
Expand Down
4 changes: 2 additions & 2 deletions scripts/lenstools.execute-mpi
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,5 @@ batch = SimulationBatch(environment_settings)
settings = settings_handler.read(cmd_args.config_file)

#Cycle over ids to produce the planes
for batch_id in cmd_args.id:
script_to_execute(pool=pool,batch=batch,settings=settings,id=batch_id,**kwargs)
for node_id in cmd_args.id:
script_to_execute(pool=pool,batch=batch,settings=settings,node_id=node_id,**kwargs)
4 changes: 2 additions & 2 deletions scripts/lenstools.matter_power_spectrum
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,5 @@ batch = SimulationBatch(environment_settings)
settings = settings_handler.read(cmd_args.config_file)

#Cycle over ids to produce the planes
for batch_id in cmd_args.id:
script_to_execute(pool=pool,batch=batch,settings=settings,id=batch_id,**kwargs)
for node_id in cmd_args.id:
script_to_execute(pool=pool,batch=batch,settings=settings,node_id=node_id,**kwargs)

0 comments on commit b35f8bb

Please sign in to comment.