-
Notifications
You must be signed in to change notification settings - Fork 27
/
simulation.py
3355 lines (2280 loc) · 110 KB
/
simulation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import division,with_statement
from abc import ABCMeta,abstractproperty,abstractmethod
import sys,os
import re
import tarfile
import json
import itertools
if sys.version_info.major>=3:
from io import StringIO
else:
from StringIO import StringIO
import numpy as np
import astropy.units as u
from astropy.cosmology import z_at_value
from .. import configuration
from ..utils.configuration import LensToolsCosmology
from .remote import SystemHandler,LocalGit
from .settings import *
from .deploy import JobHandler
from ..simulations.camb import CAMBTransferFromPower
from ..simulations import Gadget2SnapshotDE
from ..simulations.raytracing import PotentialPlane
#####################################################
############Parse cosmology from string##############
#####################################################
def string2cosmo(s,name2attr):
parmatch = re.compile(r"([a-zA-Z]+)([0-9.-]+)")
parameters_dict = dict()
parameters_list = list()
parameters = s.split("_")
neutrino_masses = list()
for parameter in parameters:
try:
par,val = parmatch.match(parameter).groups()
except AttributeError:
return None
parameters_list.append(par)
try:
#The Hubble parameter needs particular attention
if par=="h":
parameters_dict["H0"] = 100.0*float(val)
elif par.startswith("mv"):
neutrino_masses.append(float(val))
else:
parameters_dict[name2attr[par]] = float(val)
except (ValueError,KeyError):
return None
#Fill in neutrino parameters
if neutrino_masses:
parameters_dict["m_nu"] = np.array(neutrino_masses)*u.eV
try:
cosmoModel = LensToolsCosmology(**parameters_dict)
except TypeError:
return None
return cosmoModel,parameters_list
###################################################################
############CAMB --> NGenIC power spectrum conversion##############
###################################################################
def _camb2ngenic(k,P):
lgk = np.log(k) / np.log(10)
lgP = np.log((k**3)*P/(2.0*(np.pi**2))) / np.log(10)
return lgk,lgP
##############################################
##############InfoDict class##################
##############################################
class InfoDict(object):
def __init__(self,batch,**kwargs):
if isinstance(batch,SimulationBatch):
self.batch = batch
elif isinstance(batch,EnvironmentSettings):
self.batch = SimulationBatch(batch,**kwargs)
else:
raise TypeError("batch type not recognized!")
self.dictionary = self.batch.info
def __enter__(self):
return self
def __exit__(self,type,value,tb):
pass
def update(self):
dictionary_file = os.path.join(self.batch.home_subdir,self.batch.environment.json_tree_file)
with self.batch.syshandler.open(dictionary_file,"w") as fp:
fp.write(json.dumps(self.dictionary))
#############################################################
##############Convenient resource retrieval##################
#############################################################
def _retrieve(self,s):
#Break down search string into single components
search = "mcrpMCS"
nsteps = len(filter(lambda c:c in search,s))
if not nsteps:
return None
s_pieces = re.match(r"([{0}][0-9]+)".format(search)*nsteps,s).groups()
#Walk to the pieces and return the corresponding resource
current = self
for p in s_pieces:
#Split into resoure and index number
resource,index = re.match(r"([{0}])([0-9]+)".format(search),p).groups()
#model
if resource=="m":
current = current.models[int(index)]
#collection
if resource=="c":
current = current.collections[int(index)]
#realization
if resource=="r":
current = current.realizations[int(index)]
#plane set
if resource=="p":
current = current.planesets[int(index)]
#map set
if resource=="M":
current = current.mapsets[int(index)]
#catalog
if resource=="C":
current = current.catalogs[int(index)]
#sub-catalog
if resource=="S":
current = current.subcatalogs[int(index)]
#Return resource to user
return current
#####################################################
##############SimulationBatch class##################
#####################################################
class SimulationBatch(object):
"""
Class handler of a batch of weak lensing simulations that share the same environment settings
"""
#Keep track of which batches are already loaded in memory (give the class a singleton--like aspect)
_in_memory = dict()
@classmethod
def current(cls,name="environment.ini",syshandler=configuration.syshandler,indicize=False):
"""
This method looks in the current directory and looks for a configuration file named "environment.ini"; if it finds one, it returns a SimulationBatch instance that corresponds to the one pointed to by "environment.ini" (default)
:param name: name of the INI file with the environment settings, defaults to 'environment.ini'
:type name: str.
:param syshandler: system handler that allows to override the methods used to create directories and do I/O from files, must implement the abstract type SystemHandler
:type syshandler: SystemHandler
:returns: Simulation batch pointed to by "environment.ini", or None
:rtype: SimulationBatch
"""
if not(os.path.exists(name)):
return None
env = EnvironmentSettings.read(name)
return cls(env,syshandler,indicize)
@property
def home_subdir(self):
return self.environment.home
@property
def storage_subdir(self):
return self.environment.storage
@property
def home(self):
return self.home_subdir
@property
def storage(self):
return self.storage_subdir
############
#Tree edges#
############
@property
def edges(self):
return self.models
#############################################
def __init__(self,environment,syshandler=configuration.syshandler,indicize=False):
"""
Gets the handler instance of a batch of simulations residing in the provided environment
:param environment: environment settings
:type environment: EnvironmentSettings
:param syshandler: system handler that allows to override the methods used to create directories and do I/O from files, must implement the abstract type SystemHandler
:type syshandler: SystemHandler
"""
#Type check
assert isinstance(environment,EnvironmentSettings)
assert isinstance(syshandler,SystemHandler)
if environment.home in self.__class__._in_memory.keys():
return
self.environment = environment
self.syshandler = syshandler
#Create directories if they do not exist yet
if not self.syshandler.isbatch(environment.home):
self.syshandler.init(environment.home)
print("[+] {0} created on {1}".format(environment.home,self.syshandler.name))
#Create also an "environment.ini" file that provides easy access to the current simulation batch from Home
with self.syshandler.open(os.path.join(environment.home,"environment.ini"),"w") as envfile:
envfile.write("[EnvironmentSettings]\n\n")
envfile.write("home = {0}\n".format(os.path.abspath(environment.home)))
envfile.write("storage = {0}\n\n".format(os.path.abspath(environment.storage)))
if not self.syshandler.exists(environment.storage):
self.syshandler.mkdir(environment.storage)
print("[+] {0} created on {1}".format(environment.storage,self.syshandler.name))
#Indicize the simulation products
if indicize:
self.update_changes()
#Keep track of this simulation batch
self.__class__._in_memory[self.home_subdir] = self
def __new__(cls,environment,*args,**kwargs):
if environment.home in cls._in_memory:
return cls._in_memory[environment.home]
else:
return object.__new__(cls)
def update_changes(self):
with InfoDict(self) as info:
info.update()
##############################################################################################################################
#Convenient resource retrieval
def __getitem__(self,s):
return _retrieve(self,s)
##############################################################################################################################
def commit(self,message):
"""
If the Simulation Batch is put under version control in a git repository, this method commits the newly added models,collections,realizations or map/plane sets
:param message: commit message
:type message: str.
"""
if isinstance(self.syshandler,LocalGit):
self.syshandler.repository.index.commit(message)
else:
raise TypeError("The system handler must be an instance of LocalGit to use this method!")
##############################################################################################################################
@property
def available(self):
"""
Lists all currently available models in the home and storage directories
:returns: list.
:rtype: SimulationModel
"""
models = list()
#Check all available models in the home directory
if self.syshandler.exists(self.infofile):
dirnames = self.info.keys()
else:
dirnames = [ os.path.basename(n) for n in self.syshandler.glob(os.path.join(self.environment.home,"*")) ]
#Cycle over directory names
for d in dirnames:
model = self.getModel(d)
if model is not None:
models.append(model)
return models
#Alias for available models
@property
def models(self):
return self.available
##############################################################################################################################
@property
def infofile(self):
return os.path.join(self.home_subdir,self.environment.json_tree_file)
@property
def info(self):
"""
Returns summary info of the simulation batch corresponding to the current environment
:returns: info in dictionary format
"""
#See if the info is already available
if hasattr(self,"_info"):
return self._info
#Load info from tree file if available
tree_file_path = os.path.join(self.home_subdir,self.environment.json_tree_file)
if self.syshandler.exists(tree_file_path):
with self.syshandler.open(tree_file_path,"r") as fp:
self._info = json.loads(fp.read())
return self._info
#Information will be returned in dictionary format
info_dict = dict()
#Start with the available models
available_models = self.available
for model in available_models:
info_dict[model.cosmo_id] = dict()
#Follow with the collections
for collection in model.collections:
info_dict[model.cosmo_id][collection.geometry_id] = dict()
info_dict[model.cosmo_id][collection.geometry_id]["nbody"] = dict()
info_dict[model.cosmo_id][collection.geometry_id]["map_sets"] = dict()
info_dict[model.cosmo_id][collection.geometry_id]["catalogs"] = dict()
#Check if there are any map sets or catalogs present
try:
with self.syshandler.open(os.path.join(collection.home_subdir,"sets.txt"),"r") as setsfile:
for line in setsfile.readlines():
if line=="":
continue
map_set = collection.getMapSet(line.strip("\n"))
info_dict[model.cosmo_id][collection.geometry_id]["map_sets"][map_set.settings.directory_name] = dict()
info_dict[model.cosmo_id][collection.geometry_id]["map_sets"][map_set.settings.directory_name]["settings"] = map_set.settings.to_dict()
except IOError:
pass
try:
with self.syshandler.open(os.path.join(collection.home_subdir,"catalogs.txt"),"r") as setsfile:
for line in setsfile.readlines():
if line=="":
continue
catalog = collection.getCatalog(line.strip("\n"))
info_dict[model.cosmo_id][collection.geometry_id]["catalogs"][catalog.settings.directory_name] = dict()
info_dict[model.cosmo_id][collection.geometry_id]["catalogs"][catalog.settings.directory_name]["settings"] = catalog.settings.to_dict()
except IOError:
pass
#Follow with the realizations
for r in collection.realizations:
info_dict[model.cosmo_id][collection.geometry_id]["nbody"][r.ic_index] = dict()
try:
info_dict[model.cosmo_id][collection.geometry_id]["nbody"][r.ic_index]["ngenic_settings"] = r.ngenic_settings.to_dict()
info_dict[model.cosmo_id][collection.geometry_id]["nbody"][r.ic_index]["gadget_settings"] = r.gadget_settings.to_dict()
except AttributeError:
pass
info_dict[model.cosmo_id][collection.geometry_id]["nbody"][r.ic_index]["plane_sets"] = dict()
#Check if there are any plane sets present
try:
with self.syshandler.open(os.path.join(r.home_subdir,"sets.txt"),"r") as setsfile:
for line in setsfile.readlines():
if line=="":
continue
plane_set = r.getPlaneSet(line.strip("\n"))
info_dict[model.cosmo_id][collection.geometry_id]["nbody"][r.ic_index]["plane_sets"][plane_set.settings.directory_name] = dict()
info_dict[model.cosmo_id][collection.geometry_id]["nbody"][r.ic_index]["plane_sets"][plane_set.settings.directory_name]["settings"] = plane_set.settings.to_dict()
except IOError:
pass
#Return to user
return info_dict
##############################################################################################################################################
def list(self,resource=None,which=None,chunk_size=10,**kwargs):
"""
Lists the available resources in the simulation batch (collections,mapsets,etc...)
:param resource: custom function to call on each batch.models element, must return a string. If None the list of Storage model directories is returned
:type resource: None or callable
:param which: extremes of the model numbers to get (if None all models are processed); if callable, filter(which,self.models) gives the models to archive
:type which: tuple. or callable
:param chunk_size: size of output chunk
:type chunk_size: int.
:param kwargs: the keyword arguments are passed to resource
:type kwargs: dict.
:returns: requested resources
:rtype: list.
"""
#Available models
if which is None:
models = self.models
elif isinstance(which,tuple):
models = self.models.__getslice__(*which)
else:
models = filter(which,self.models)
#Return chunks
chunks = list()
local_chunk = list()
while True:
#Get the model at the front
try:
model = models.pop(0)
except IndexError:
if len(local_chunk):
chunks.append("\n".join(local_chunk))
break
#Extract the resource
if resource is not None:
local_chunk.append(resource(model,**kwargs))
else:
local_chunk.append(model.storage_subdir)
#If we reached the chunk size dump and reset
if len(local_chunk)==chunk_size:
chunks.append("\n".join(local_chunk))
local_chunk = list()
#Return to user
return chunks
#################
####Compress#####
#################
@staticmethod
def _archive(name,chunk,mode):
print("[+] Compressing {0} into {1}".format("-".join(chunk.split("\n")),name))
with tarfile.open(name,mode) as tar:
for f in chunk.split("\n"):
tar.add(f)
##############################################################################################################################################
def archive(self,name,pool=None,chunk_size=1,**kwargs):
"""
Archives a batch available resource to a tar gzipped archive; the resource file/directory names are retrieved with the list method. The archives will be written to the simulation batch storage directory
:param name: name of the archive
:type name: str.
:param pool: MPI Pool used to parallelize compression
:type pool: MPIPool
:param kwargs: the keyword arguments are passed to the list method
:type kwargs: dict.
"""
#Retrieve resource chunks
resource_chunks = self.list(chunk_size=chunk_size,**kwargs)
#Get archive names
if type(name)==str:
name_pieces = name.split(".")
if name_pieces[-1]=="gz":
mode = "w"
else:
mode = "w:gz"
archive_names = list()
for n,chunk in enumerate(resource_chunks):
#Build archive name
archive_name = name.replace(".tar","{0}.tar".format(n+1))
archive_names.append(os.path.join(self.environment.storage,archive_name))
elif type(name)==list:
mode = "w:gz"
archive_names = [ os.path.join(self.environment.storage,n) for n in name ]
else:
raise TypeError("name should be a string or list!")
#Safety assert
assert len(archive_names)==len(resource_chunks),"You should provide an archive file name for each resouce chunk!"
#Call the _archive method to make the compression
if pool is None:
for n,chunk in enumerate(resource_chunks):
self.__class__._archive(archive_names[n],chunk,mode)
else:
assert len(resource_chunks)==pool.size+1,"There should be one MPI task (you have {0}) for each chunk (you have {1})!".format(pool.size+1,len(resource_chunks))
self.__class__._archive(archive_names[pool.rank],resource_chunks[pool.rank],mode)
#################
####Unpack#######
#################
@staticmethod
def _unpack(name,path):
print("[+] Unpacking {0} into {1}...".format(name,path))
with tarfile.open(name,"r:gz") as tar:
tar.extractall(path=path)
def unpack(self,where,which=None,pool=None):
"""
Unpacks the compressed simulation batch products into the storage directory: the resources of each model must be contained in a file called <cosmo_id>.tar.gz
:param where: path of the compressed resources
:type where: str.
:param which: extremes of the model numbers to unpack (if None all models are unpacked)
:type which: tuple.
:param pool: MPI Pool used to parallelize de-compression
:type pool: MPIPool
"""
#Get the models to unpack
if which is None:
models = self.available
else:
models = self.available.__getslice__(*which)
#Unpack each model (spread computations over MPIPool if provided)
if pool is not None:
assert len(models)==pool.size+1,"The number of MPI processes must be equal to the number of models to unpack!"
if pool is None:
for model in models:
archive_path = os.path.join(where,"{0}.tar.gz".format(model.cosmo_id))
self._unpack(archive_path,self.environment.storage)
else:
archive_path = os.path.join(where,"{0}.tar.gz".format(models[pool.rank].cosmo_id))
self._unpack(archive_path,self.environment.storage)
##############################################################################################################################################
####################
####Duplicate#######
####################
def copyTree(self,path,syshandler=configuration.syshandler):
"""
Copies the current batch directory tree into a separate path
:param path: path into which to copy the current batch directory tree
:type path: str.
:param syshandler: system handler (can be a remote)
:type syshandler: SystemHandler
"""
#Instantiate new SimulationBatch object (home and storage will be the same)
environment = EnvironmentSettings(home=path,storage=path)
for key in ["cosmo_id_digits","name2attr","json_tree_file"]:
setattr(environment,key,getattr(self.environment,key))
batchCopy = SimulationBatch(environment,syshandler)
#Walk down the directory tree and create the copied directories on the go (only if non existent already)
#Model
for model in self.available:
modelCopy = batchCopy.getModel(model.cosmo_id)
if modelCopy is None:
modelCopy = batchCopy.newModel(model.cosmology,model.parameters)
#Collection
for coll in model.collections:
collCopy = modelCopy.getCollection(box_size=coll.box_size,nside=coll.nside)
if collCopy is None:
collCopy = modelCopy.newCollection(box_size=coll.box_size,nside=coll.nside)
#Maps
for map_set in coll.mapsets:
map_setCopy = collCopy.getMapSet(map_set.name)
if map_setCopy is None:
map_setCopy = collCopy.newMapSet(map_set.settings)
#Catalogs
for catalog in coll.catalogs:
catalogCopy = collCopy.getCatalog(catalog.name)
if catalogCopy is None:
catalogCopy = collCopy.newCatalog(catalog.settings)
#Realizations
for r in coll.realizations:
rCopy = collCopy.getRealization(r.ic_index)
if rCopy is None:
rCopy = collCopy.newRealization(seed=r.seed)
#Planes
for plane_set in r.planesets:
plane_setCopy = rCopy.getPlaneSet(plane_set.name)
if plane_setCopy is None:
plane_setCopy = rCopy.newPlaneSet(plane_set.settings)
#Return handle on the copied directory tree
return batchCopy
##############################################################################################################################################
def newModel(self,cosmology,parameters):
"""
Create a new simulation model, given a set of cosmological parameters
:param cosmology: cosmological model to simulate
:type cosmology: LensToolsCosmology
:param parameters: cosmological parameters to keep track of
:type parameters: list.
:rtype: SimulationModel
"""
#cosmology needs to be of type LensToolsCosmology
assert isinstance(cosmology,LensToolsCosmology)
newModel = SimulationModel(cosmology=cosmology,environment=self.environment,parameters=parameters,syshandler=self.syshandler)
for d in [newModel.home_subdir,newModel.storage_subdir]:
if not self.syshandler.exists(d):
self.syshandler.mkdir(d)
print("[+] {0} created on {1}".format(d,self.syshandler.name))
#Update dictionary if the batch is indicized
if self.syshandler.exists(self.infofile):
self.info[newModel.cosmo_id] = dict()
else:
print("[-] Model {0} already exists!".format(newModel.cosmo_id))
#Return to user
return newModel
###########################################################################################################################################
def getModel(self,cosmo_id):
"""
Instantiate a SimulationModel object corresponding to the cosmo_id provided
:param cosmo_id: cosmo_id of the model
:type cosmo_id: str.
:rtype: SimulationModel
"""
if not(self.syshandler.exists(os.path.join(self.environment.home,cosmo_id))) or not(self.syshandler.exists(os.path.join(self.environment.storage,cosmo_id))):
return None
#Parse the cosmological model from the directory name
cosmo_parsed = string2cosmo(cosmo_id,self.environment.name2attr)
#Return the SimulationModel instance
if cosmo_parsed is not None:
return SimulationModel(cosmology=cosmo_parsed[0],environment=self.environment,parameters=cosmo_parsed[1],syshandler=self.syshandler)
else:
return None
###########################################################################################################################################
##########################################Job submission scripts###########################################################################
###########################################################################################################################################
def writeCAMBSubmission(self,realization_list,job_settings,job_handler,config_file="camb.param",chunks=1,**kwargs):
"""
Writes CAMB submission script
:param realization_list: list of ics to generate in the form "cosmo_id|geometry_id"
:type realization_list: list. of str.
:param chunks: number of independent jobs in which to split the submission (one script per job will be written)
:type chunks: int.
:param job_settings: settings for the job (resources, etc...)
:type job_settings: JobSettings
:param job_handler: handler of the cluster specific features (job scheduler, architecture, etc...)
:type job_handler: JobHandler
:param config_file: name of the CAMB configuration file
:type config_file: str.
:param kwargs: you can set one_script=True to include all the executables sequentially in a single script
:type kwargs: dict.
"""
#Type safety check
assert isinstance(job_settings,JobSettings)
assert isinstance(job_handler,JobHandler)
assert len(realization_list)%chunks==0,"Perfect load balancing enforced, each job will process the same number of realizations!"
#Check if we need to collapse everyting in one script
if "one_script" in kwargs.keys():
one_script = kwargs["one_script"]
else:
one_script = False
#It's better to run CAMB from the directory where the executable resides
job_handler.cluster_specs.execution_preamble = "cd {0}".format(os.path.dirname(job_settings.path_to_executable))
#This limit is enforced
assert job_settings.cores_per_simulation*chunks==len(realization_list),"cores_per_simulation x chunks should be equal to the total number of models!"
job_settings.num_cores = job_settings.cores_per_simulation
#Create the dedicated Job and Logs directories if not existent already
for d in [os.path.join(self.environment.home,"Jobs"),os.path.join(self.environment.home,"Logs")]:
if not(self.syshandler.exists(d)):
self.syshandler.mkdir(d)
print("[+] {0} created on {1}".format(d,self.syshandler.name))
#Split realizations between independent jobs
realizations_per_chunk = len(realization_list)//chunks
for c in range(chunks):
#Arguments for the executable
exec_args = list()
for realization in realization_list[realizations_per_chunk*c:realizations_per_chunk*(c+1)]:
#Separate the cosmo_id,geometry_id,realization number
cosmo_id,geometry_id = realization.split("|")
#Get the corresponding SimulationXXX instances
model = self.getModel(cosmo_id)
nside,box_size = geometry_id.split("b")
nside = int(nside)
box_size = float(box_size) * model.Mpc_over_h
collection = model.getCollection(box_size=box_size,nside=nside)
parameter_file = os.path.join(collection.home_subdir,config_file)
if not(self.syshandler.exists(parameter_file)):
raise IOError("CAMB parameter file at {0} does not exist yet!".format(parameter_file))
exec_args.append(parameter_file)
executable = job_settings.path_to_executable + " " + " ".join(exec_args)
#Write the script
script_filename = os.path.join(self.environment.home,"Jobs",job_settings.job_script_file)
if not one_script:
script_filename_split = script_filename.split(".")
script_filename_split[-2] += "{0}".format(c+1)
script_filename = ".".join(script_filename_split)
#Override settings to make stdout and stderr go in the right places
job_settings.redirect_stdout = os.path.join(self.environment.home,"Logs",job_settings.redirect_stdout)
job_settings.redirect_stderr = os.path.join(self.environment.home,"Logs",job_settings.redirect_stderr)
if (not one_script) or (not c):
#Inform user where logs will be directed
print("[+] Stdout will be directed to {0}".format(job_settings.redirect_stdout))
print("[+] Stderr will be directed to {0}".format(job_settings.redirect_stderr))
with self.syshandler.open(script_filename,"w") as scriptfile:
scriptfile.write(job_handler.writePreamble(job_settings))
with self.syshandler.open(script_filename,"a") as scriptfile:
scriptfile.write(job_handler.writeExecution([executable],[job_settings.num_cores],job_settings))
#Log to user and return
if (not one_script) or (not c):
print("[+] {0} written on {1}".format(script_filename,self.syshandler.name))
############################################################################################################################################
def writeNGenICSubmission(self,realization_list,job_settings,job_handler,config_file="ngenic.param",chunks=1,**kwargs):
"""
Writes NGenIC submission script
:param realization_list: list of ics to generate in the form "cosmo_id|geometry_id|icN"
:type realization_list: list. of str.
:param chunks: number of independent jobs in which to split the submission (one script per job will be written)
:type chunks: int.
:param job_settings: settings for the job (resources, etc...)
:type job_settings: JobSettings
:param job_handler: handler of the cluster specific features (job scheduler, architecture, etc...)
:type job_handler: JobHandler
:param config_file: name of the NGenIC config file
:type config_file: str.
:param kwargs: you can set one_script=True to include all the executables sequentially in a single script
:type kwargs: dict.
"""
#Type safety check
assert isinstance(job_settings,JobSettings)
assert isinstance(job_handler,JobHandler)
assert len(realization_list)%chunks==0,"Perfect load balancing enforced, each job will process the same number of realizations!"
#Check if we need to collapse everyting in one script
if "one_script" in kwargs.keys():
one_script = kwargs["one_script"]
else:
one_script = False
#Create the dedicated Job and Logs directories if not existent already
for d in [os.path.join(self.environment.home,"Jobs"),os.path.join(self.environment.home,"Logs")]:
if not(self.syshandler.exists(d)):
self.syshandler.mkdir(d)
print("[+] {0} created on {1}".format(d,self.syshandler.name))
#Split realizations between independent jobs
realizations_per_chunk = len(realization_list)//chunks
for c in range(chunks):
#Arguments for executable
exec_args = list()
for realization in realization_list[realizations_per_chunk*c:realizations_per_chunk*(c+1)]:
#Separate the cosmo_id,geometry_id,realization number
cosmo_id,geometry_id,ic_number = realization.split("|")
#Get the corresponding SimulationXXX instances
model = self.getModel(cosmo_id)
nside,box_size = geometry_id.split("b")
nside = int(nside)
box_size = float(box_size) * model.Mpc_over_h
collection = model.getCollection(box_size=box_size,nside=nside)
r = collection.getRealization(int(ic_number.strip("ic")))
parameter_file = os.path.join(r.home_subdir,config_file)
if not(self.syshandler.exists(parameter_file)):
raise IOError("NGenIC parameter file at {0} does not exist yet!".format(parameter_file))
exec_args.append(parameter_file)
#Executable
executables = [ job_settings.path_to_executable + " " + " ".join(exec_args) ]
#Write the script
script_filename = os.path.join(self.environment.home,"Jobs",job_settings.job_script_file)
if not one_script:
script_filename_split = script_filename.split(".")
script_filename_split[-2] += "{0}".format(c+1)
script_filename = ".".join(script_filename_split)
#Override settings to make stdout and stderr go in the right places
job_settings.redirect_stdout = os.path.join(self.environment.home,"Logs",job_settings.redirect_stdout)
job_settings.redirect_stderr = os.path.join(self.environment.home,"Logs",job_settings.redirect_stderr)
#Override settings
job_settings.num_cores = job_settings.cores_per_simulation
if (not one_script) or (not c):
#Inform user where logs will be directed
print("[+] Stdout will be directed to {0}".format(job_settings.redirect_stdout))
print("[+] Stderr will be directed to {0}".format(job_settings.redirect_stderr))
with self.syshandler.open(script_filename,"w") as scriptfile:
scriptfile.write(job_handler.writePreamble(job_settings))
with self.syshandler.open(script_filename,"a") as scriptfile:
scriptfile.write(job_handler.writeExecution(executables,[job_settings.num_cores]*len(executables),job_settings))
#Log to user and return
if (not one_script) or (not c):
print("[+] {0} written on {1}".format(script_filename,self.syshandler.name))