Skip to content

Commit

Permalink
* One possible bug remains that I have been unable to track down but …
Browse files Browse the repository at this point in the history
…have concluded is unrelated to the modification in this PR.

Steps to reproduce:
srun -n 4 -c 2 --gpu-bind=map_gpu:3,2,1,0 rrdesi_mpi --gpu --max-gpuprocs 4 -n_nearest 4 --archetypes new-archetypes/ -i $CFS/desi/spectro/redux/fuji/tiles/cumulative/100/20210505/coadd-0-100-thru20210505.fits     -o $SCRATCH/abhijeet.fits
srun -n 64 -c 2 rrdesi_mpi -n_nearest 4 --archetypes new-archetypes/ -i $CFS/desi/spectro/redux/fuji/tiles/cumulative/100/20210505/coadd-0-100-thru20210505.fits     -o $SCRATCH/abhijeet2.fits
These two results will be np.allclose() but not equal which is as expected.  The CPU version will be equal to Abhijeet's original code, also as expected.

Now rerun with -n_nearest 5 and the CPU version is still equal to Abhijeet's original code, but there are a small ~10 number of zzchi2 values and zzcoeff values that are different more than np.allclose (as much as 3.0 difference) between GPU and CPU.  Going back to Abhijeet's original nearest_neighbor_model code, and running on the CPU for that method with GPU get_best_archetype and there is still the same small differences, despite the fact that the output is np.allclose when not doing -n_nearest (or even for doing -n_nearest 4).

It does look like the differences seem to be most? all? in QSO templates.  But the trans array shows no difference - as far as I've been able to tell, we are sending the same input tdata to calc_zchi2_one on the CPU and getting a slightly different chi2.  But only for -n_nearest == 5.

However since this is independent of the code changes in the current PR (I get the exact same differences in this version versus rolling back to the previous one), it makes sense to proceed with this PR.

----------------------

- Added legendre function to Target class so that legendre of a certain degree
can be calculated (and optionally copied to GPU) once without additional
overhead.

- Added default value of 15 for fitz()

- In fitz() use Target.legendre to calculate legendre.  Pass target object instead of spectra to get_best_archetype, which allows for simplification as spectra, gpuweights, gpuflux, etc are all members of Target class.  Store trans dict and pass that to get_best_archetype to eliminate need to re-calculate transmission for the same wavelength regime.

- In archetypes, added properties gpuwave and gpuflux to copy and cache data on the GPU.  These are then used in rebin_template_batch.

- In get_best_archetype, pass target instead of spectra, which allows for elimination of dedges and legendre as args.  Get spectra, gpuweights, gpuflux, gpuwflux, dedges, and legendre directly from the target object.  Copying trans and using Target.legendre reduce runtime by about 1s on 4 GPUs.

- Vectorized and GPUized nearest_neighbor_model.  Instead of just passing trans to it, get_best_archetype now passes the binned dict, which already has rebinned flux multiplied by trans.  Using the Target.legendre also saves time.  Since the size of the tdata arrays is small (nbasis of a few), it is faster to keep operations on CPU for calc_zchi2_batch similar to in fitz().

Timing notes: adding nearest_neighbor_model is a bigger hit to the GPU than CPU but since get_best_archetypes is so much faster on the GPU, the combined time is still a good speed-up.
  • Loading branch information
craigwarner-ufastro committed Sep 26, 2023
1 parent 0397d18 commit 6ece174
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 66 deletions.
155 changes: 100 additions & 55 deletions py/redrock/archetypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,28 @@ def __init__(self, filename):

h.close()

#It is much more efficient to calculate edges once if rebinning
#and copy to GPU and store here rather than doing this every time
self._gpuwave = None
self._gpuflux = None
return

@property
def gpuwave(self):
if (self._gpuwave is None):
#Copy to GPU once
import cupy as cp
self._gpuwave = cp.asarray(self.wave)
return self._gpuwave

@property
def gpuflux(self):
if (self._gpuflux is None):
#Copy to GPU once
import cupy as cp
self._gpuflux = cp.asarray(self.flux)
return self._gpuflux

def rebin_template(self,index,z,dwave,trapz=True):
"""
"""
Expand All @@ -65,16 +86,21 @@ def rebin_template(self,index,z,dwave,trapz=True):
else:
return {hs:self._archetype['INTERP'][index](wave/(1.+z)) for hs, wave in dwave.items()}

def rebin_template_batch(self,z,dwave,trapz=True,dedges=None,use_gpu=False):
def rebin_template_batch(self,z,dwave,trapz=True,dedges=None,indx=None,use_gpu=False):
"""
"""
if (use_gpu):
import cupy as cp
xmin = self.minwave
xmax = self.maxwave
self.flux = cp.asarray(self.flux)
self.wave = cp.asarray(self.wave)
wave = self.gpuwave
flux = self.gpuflux
else:
wave = self.wave
flux = self.flux

if (indx is not None):
flux = flux[indx]
minedge = None
maxedge = None
result = dict()
Expand All @@ -84,16 +110,16 @@ def rebin_template_batch(self,z,dwave,trapz=True,dedges=None,use_gpu=False):
#Check if edges is a 1-d array or a tuple also containing scalar min/max values
if type(edges) is tuple:
(edges, minedge, maxedge) = edges
result[hs] = trapz_rebin(self.wave, self.flux, edges=edges, use_gpu=use_gpu, myz=cp.array([z]), xmin=xmin, xmax=xmax, edge_min=minedge, edge_max=maxedge)[0,:,:]
result[hs] = trapz_rebin(wave, flux, edges=edges, use_gpu=use_gpu, myz=cp.array([z]), xmin=xmin, xmax=xmax, edge_min=minedge, edge_max=maxedge)[0,:,:]
return result
if trapz:
#Use batch mode of trapz_rebin
return {hs:trapz_rebin((1.+z)*self.wave, self.flux, wave, use_gpu=use_gpu) for hs, wave in dwave.items()}
return {hs:trapz_rebin((1.+z)*wave, flux, w, use_gpu=use_gpu) for hs, w in dwave.items()}
else:
for hs, wave in dwave.items():
result[hs] = np.empty((len(wave), self._narch))
for hs, w in dwave.items():
result[hs] = np.empty((len(w), self._narch))
for i in range(self._narch):
result[hs][:,i] = self._archetype['INTERP'][i](wave/(1.+z))
result[hs][:,i] = self._archetype['INTERP'][i](w/(1.+z))
if (use_gpu):
result[hs] = cp.asarray(result[hs])
#return {hs:self._archetype['INTERP'](wave/(1.+z)) for hs, wave in dwave.items()}
Expand All @@ -117,23 +143,26 @@ def eval(self, subtype, dwave, coeff, wave, z):

return flux

def nearest_neighbour_model(self, spectra,weights,flux,wflux,dwave,z, legendre, n_nearest, zzchi2, trans, per_camera):
def nearest_neighbour_model(self, target,weights,flux,wflux,dwave,z, n_nearest, zzchi2, trans, per_camera, dedges=None, binned=None, use_gpu=False):

"""
Parameters:
------------------
spectra (list): list of Spectrum objects.
target (Target): the target object that contains spectra
weights (array): concatenated spectral weights (ivar).
flux (array): concatenated flux values.
wflux (array): concatenated weighted flux values.
dwave (dict): dictionary of wavelength grids
z (float): best redshift
legendre (dict): legendre polynomial
n_nearest (int): number of nearest neighbours to be used in chi2 space (including best archetype)
zchi2 (array); chi2 array for all archetypes
trans (dict); dictionary of transmission Ly-a arrays
per_camera (bool): If True model will be solved in each camera
dedges (dict): in GPU mode, use pre-computed dict of wavelength bin edges, already on GPU
binned (dict): already computed dictionary of rebinned fluxes
use_gpu (bool): use GPU or not
Returns:
-----------------
Expand All @@ -144,39 +173,48 @@ def nearest_neighbour_model(self, spectra,weights,flux,wflux,dwave,z, legendre,
"""

#trans and dedges only need to be passed if binned is not as binned already
#is multiplied by trans in get_best_archetype
spectra = target.spectra
nleg = target.nleg
legendre = target.legendre(nleg=nleg, use_gpu=False) #Get previously calculated legendre

nleg = legendre[list(legendre.keys())[0]].shape[0]
iBest = np.argsort(zzchi2)[0:n_nearest]
binned_best = self.rebin_template(iBest[0], z, dwave,trapz=True)
for hs, w in dwave.items():
if (trans[hs] is not None):
binned_best[hs] *= trans[hs]
#binned_best = { hs:trans[hs]*binned_best[hs] for hs, w in dwave.items() }
tdata = {hs: binned_best[hs][:,None] for hs, wave in dwave.items()}
if len(iBest)>1:
for ib in iBest[1:]:
#print ("IB", ib)
binned = self.rebin_template(ib, z, dwave,trapz=True)
for hs, w in dwave.items():
if (trans[hs] is not None):
binned[hs] *= trans[hs]
#binned = { hs:trans[hs]*binned[hs] for hs, w in dwave.items() }
tdata = { hs:np.append(tdata[hs], binned[hs][:,None], axis=1) for hs, wave in dwave.items()}
if nleg>0:
tdata = { hs:np.append(tdata[hs],legendre[hs].transpose(), axis=1 ) for hs, wave in dwave.items() }

if per_camera:
zzchi2, zzcoeff= per_camera_coeff_with_least_square(spectra, tdata, nleg, method='bvls', n_nbh=n_nearest)
tdata = dict()
if (binned is not None):
if (use_gpu):
binned = { hs:binned[hs][:,iBest].get() for hs in binned }
else:
zzchi2, zzcoeff= calc_zchi2_one(spectra, weights, flux, wflux, tdata)
binned = { hs:binned[hs][:,iBest] for hs in binned }
else:
binned = self.rebin_template_batch(z, dwave, trapz=True, dedges=dedges, indx=iBest, use_gpu=use_gpu)
for hs, w in dwave.items():
if (use_gpu):
binned[hs] = binned[hs].get()
if (trans[hs] is not None):
#Only multiply if trans[hs] is not None
#Both arrays are on CPU so no need to wrap with asarray
binned[hs] *= trans[hs][:,None]
for hs, w in dwave.items():
tdata[hs] = binned[hs][None,:,:]
if (nleg > 0):
tdata[hs] = np.append(tdata[hs], legendre[hs].transpose()[None,:,:], axis=2)
nbasis = tdata[hs].shape[2]
if per_camera:
#Batch placeholder that right now loops over each arch but will be GPU accelerated
(zzchi2, zzcoeff) = per_camera_coeff_with_least_square_batch(spectra, tdata, nleg, method='bvls', n_nbh=1, use_gpu=use_gpu)
else:
#Use CPU mode for calc_zchi2 since small tdata
(zzchi2, zzcoeff) = calc_zchi2_batch(spectra, tdata, weights, flux, wflux, 1, nbasis, use_gpu=False)

sstype = ['%s'%(self._subtype[k]) for k in iBest] # subtypes of best archetypes
fsstype = '_'.join(sstype)
#print(sstype)
#print(z, zzchi2, zzcoeff, fsstype)
return zzchi2, zzcoeff, self._rrtype+':::%s'%(fsstype)
sstype = ['%s'%(self._subtype[k]) for k in iBest] # subtypes of best archetypes
fsstype = '_'.join(sstype)
#print(sstype)
#print(z, zzchi2, zzcoeff, fsstype)
return zzchi2[0], zzcoeff[0], self._rrtype+':::%s'%(fsstype)


def get_best_archetype(self,spectra,weights,flux,wflux,dwave,z,legendre, per_camera, n_nearest, dedges=None, trans=None, use_gpu=False):
def get_best_archetype(self,target,weights,flux,wflux,dwave,z, per_camera, n_nearest, trans=None, use_gpu=False):

"""Get the best archetype for the given redshift and spectype.
Expand All @@ -187,10 +225,8 @@ def get_best_archetype(self,spectra,weights,flux,wflux,dwave,z,legendre, per_cam
wflux (array): concatenated weighted flux values.
dwave (dict): dictionary of wavelength grids
z (float): best redshift
legendre (dict): legendre polynomial
per_camera (bool): True if fitting needs to be done in each camera
n_nearest (int): number of nearest neighbours to be used in chi2 space (including best archetype)
dedges (dict): in GPU mode, use pre-computed dict of wavelength bin edges, already on GPU
trans (dict): pass previously calcualated Lyman transmission instead of recalculating
use_gpu (bool): use GPU or not
Expand All @@ -200,6 +236,24 @@ def get_best_archetype(self,spectra,weights,flux,wflux,dwave,z,legendre, per_cam
fulltype (str): fulltype of best archetype
"""
spectra = target.spectra
nleg = target.nleg
legendre = target.legendre(nleg=nleg, use_gpu=use_gpu) #Get previously calculated legendre

#Select np or cp for operations as arrtype
if (use_gpu):
import cupy as cp
arrtype = cp
#Get CuPy arrays of weights, flux, wflux
#These are created on the first call of gpu_spectral_data() for a
#target and stored. They are retrieved on subsequent calls.
(gpuweights, gpuflux, gpuwflux) = target.gpu_spectral_data()
# Build dictionaries of wavelength bin edges, min/max, and centers
dedges = { s.wavehash:(s.gpuedges, s.minedge, s.maxedge) for s in spectra }
else:
arrtype = np
dedges = None

if per_camera:
ncam=3 # b, r, z cameras
else:
Expand All @@ -226,12 +280,6 @@ def get_best_archetype(self,spectra,weights,flux,wflux,dwave,z,legendre, per_cam
if (trans[hs] is not None):
trans[hs] = trans[hs][0,:]

#Select np or cp for operations as arrtype
if (use_gpu):
import cupy as cp
arrtype = cp
else:
arrtype = np
#Rebin in batch
binned = self.rebin_template_batch(z, dwave, trapz=True, dedges=dedges, use_gpu=use_gpu)

Expand All @@ -251,16 +299,13 @@ def get_best_archetype(self,spectra,weights,flux,wflux,dwave,z,legendre, per_cam
#Batch placeholder that right now loops over each arch but will be GPU accelerated
(zzchi2, zzcoeff) = per_camera_coeff_with_least_square_batch(spectra, tdata, nleg, self._narch, method='bvls', n_nbh=1, use_gpu=use_gpu)
else:
(zzchi2, zzcoeff) = calc_zchi2_batch(spectra, tdata, weights, flux, wflux, self._narch, nbasis, use_gpu=use_gpu)

if n_nearest is not None:
if (use_gpu):
##PLACEHOLDER - copy flux and wave back to CPU so that nearest neighbor will work until that algorithm is GPU-ized
self.flux = self.flux.get()
self.wave = self.wave.get()
best_chi2, best_coeff, best_fulltype = self.nearest_neighbour_model(spectra,weights.get(),flux.get(),wflux.get(),dwave,z, legendre, n_nearest, zzchi2, trans, per_camera)
(zzchi2, zzcoeff) = calc_zchi2_batch(spectra, tdata, gpuweights, gpuflux, gpuwflux, self._narch, nbasis, use_gpu=use_gpu)
else:
best_chi2, best_coeff, best_fulltype = self.nearest_neighbour_model(spectra,weights,flux,wflux,dwave,z, legendre, n_nearest, zzchi2, trans, per_camera)
(zzchi2, zzcoeff) = calc_zchi2_batch(spectra, tdata, weights, flux, wflux, self._narch, nbasis, use_gpu=use_gpu)

if n_nearest is not None:
best_chi2, best_coeff, best_fulltype = self.nearest_neighbour_model(target,weights,flux,wflux,dwave,z, n_nearest, zzchi2, trans, per_camera, dedges=dedges, binned=binned, use_gpu=use_gpu)
return best_chi2, best_coeff, best_fulltype
else:
iBest = np.argmin(zzchi2)
Expand Down
18 changes: 7 additions & 11 deletions py/redrock/fitz.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def legendre_calculate(nleg, dwave):

return legendre

def fitz(zchi2, redshifts, target, template, nminima=3, archetype=None, use_gpu=False, deg_legendre=None, nz=None, per_camera=False, n_nearest=None):
def fitz(zchi2, redshifts, target, template, nminima=3, archetype=None, use_gpu=False, deg_legendre=None, nz=15, per_camera=False, n_nearest=None):
"""Refines redshift measurement around up to nminima minima.
TODO:
Expand All @@ -131,7 +131,7 @@ def fitz(zchi2, redshifts, target, template, nminima=3, archetype=None, use_gpu=
nminima (int): the number of minima to consider.
use_gpu (bool): use GPU or not
deg_legendre (int): in archetype mode polynomials upto deg_legendre-1 will be used
nz (int): number of finer redshift pixels to search for final redshift
nz (int): number of finer redshift pixels to search for final redshift - default 15
per_camera: (bool): True if fitting needs to be done in each camera for archetype mode
n_nearest: (int): number of nearest neighbours to be used in chi2 space (including best archetype)
Expand Down Expand Up @@ -162,12 +162,11 @@ def fitz(zchi2, redshifts, target, template, nminima=3, archetype=None, use_gpu=
gpudwave = { s.wavehash:s.gpuwave for s in spectra }

if not archetype is None:
legendre = legendre_calculate(deg_legendre, dwave=dwave)
#legendre = legendre_calculate(deg_legendre, dwave=dwave)
legendre = target.legendre(deg_legendre)

results = list()
#Define nz here instead of hard-coding length 15 and then defining nz as
#length of zz list
nz = 15
#Moved default nz to arg list

for imin in find_minima(zchi2):
if len(results) == nminima:
Expand Down Expand Up @@ -290,11 +289,8 @@ def fitz(zchi2, redshifts, target, template, nminima=3, archetype=None, use_gpu=
chi2=chi2min, zz=zz, zzchi2=zzchi2,
coeff=coeff))
else:
if (use_gpu):
chi2min, coeff, fulltype = archetype.get_best_archetype(spectra,gpuweights,gpuflux,gpuwflux,dwave,zbest,legendre, per_camera, n_nearest, dedges=gpuedges, trans=trans, use_gpu=use_gpu)
del trans
else:
chi2min, coeff, fulltype = archetype.get_best_archetype(spectra,weights,flux,wflux,dwave,zbest,legendre, per_camera, n_nearest, use_gpu=use_gpu)
chi2min, coeff, fulltype = archetype.get_best_archetype(target,weights,flux,wflux,dwave,zbest, per_camera, n_nearest, trans=trans, use_gpu=use_gpu)
del trans

results.append(dict(z=zbest, zerr=zerr, zwarn=zwarn,
chi2=chi2min, zz=zz, zzchi2=zzchi2,
Expand Down
25 changes: 25 additions & 0 deletions py/redrock/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,31 @@ def __init__(self, targetid, spectra, coadd=False, cosmics_nsig=0., meta=None):
self.gpuweights = None
self.gpuflux = None
self.gpuwflux = None
self._legendre = None
self._gpulegendre = None
self.nleg = 0

def legendre(self, nleg, use_gpu=False):
if (use_gpu and self.nleg == nleg):
if (self._gpulegendre is not None):
return self._gpulegendre
elif (self._legendre is not None):
import cupy as cp
self._gpulegendre = { hs:cp.asarray(self._legendre[hs]) for hs in self._legendre }
return self._gpulegendre
if (self._legendre is not None and self.nleg == nleg):
return self._legendre
dwave = { s.wavehash:s.wave for s in self.spectra }
wave = np.concatenate([ w for w in dwave.values() ])
wmin = wave.min()
wmax = wave.max()
self._legendre = { hs:np.array([scipy.special.legendre(i)( (w-wmin)/(wmax-wmin)*2.) for i in range(nleg)]) for hs, w in dwave.items() }
self.nleg = nleg
if (use_gpu):
import cupy as cp
self._gpulegendre = { hs:cp.asarray(self._legendre[hs]) for hs in self._legendre }
return self._gpulegendre
return self._legendre

def gpu_spectral_data(self):
if self.gpuweights is None:
Expand Down

0 comments on commit 6ece174

Please sign in to comment.