-
Notifications
You must be signed in to change notification settings - Fork 218
/
sim.py
1557 lines (1270 loc) · 71.3 KB
/
sim.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
'''
Defines the Sim class, Covasim's core class.
'''
#%% Imports
import numpy as np
import pandas as pd
import sciris as sc
from . import utils as cvu
from . import misc as cvm
from . import base as cvb
from . import defaults as cvd
from . import parameters as cvpar
from . import population as cvpop
from . import plotting as cvplt
from . import interventions as cvi
from . import immunity as cvimm
from . import analysis as cva
from .settings import options as cvo
# Almost everything in this file is contained in the Sim class
__all__ = ['Sim', 'diff_sims', 'demo', 'AlreadyRunError']
class Sim(cvb.BaseSim):
'''
The Sim class handles the running of the simulation: the creation of the
population and the dynamics of the epidemic. This class handles the mechanics
of the actual simulation, while BaseSim takes care of housekeeping (saving,
loading, exporting, etc.). Please see the BaseSim class for additional methods.
Args:
pars (dict): parameters to modify from their default values
datafile (str/df): filename of (Excel, CSV) data file to load, or a pandas dataframe of the data
datacols (list): list of column names of the data to load
label (str): the name of the simulation (useful to distinguish in batch runs)
simfile (str): the filename for this simulation, if it's saved
popfile (str): if supplied, load the population from this file
people (varies): if supplied, use these pre-generated people (as a dict, SynthPop, or People object) instead of loading or generating new ones
version (str): if supplied, use default parameters from this version of Covasim instead of the latest
kwargs (dict): additional parameters; passed to ``cv.make_pars()``
**Examples**::
sim = cv.Sim()
sim = cv.Sim(pop_size=10e3, datafile='my_data.xlsx', label='Sim with data')
'''
def __init__(self, pars=None, datafile=None, label=None, simfile=None,
popfile=None, people=None, version=None, **kwargs):
# Set attributes
self.label = label # The label/name of the simulation
self.created = None # The datetime the sim was created
self.simfile = simfile # The filename of the sim
self.datafile = datafile # The name of the data file
self.popfile = popfile # The population file
self.data = None # The actual data
self.popdict = people # The population dictionary
self.people = None # Initialize these here so methods that check their length can see they're empty
self.t = None # The current time in the simulation (during execution); outside of sim.step(), its value corresponds to next timestep to be computed
self.results = {} # For storing results
self.summary = None # For storing a summary of the results
self.initialized = False # Whether or not initialization is complete
self.complete = False # Whether a simulation has completed running
self.results_ready = False # Whether or not results are ready
self._default_ver = version # Default version of parameters used
self._legacy_trans = None # Whether to use the legacy transmission calculation method (slower; for reproducing earlier results)
self._orig_pars = None # Store original parameters to optionally restore at the end of the simulation
# Make default parameters (using values from parameters.py)
default_pars = cvpar.make_pars(version=version) # Start with default pars
super().__init__(default_pars) # Initialize and set the parameters as attributes
# Now update everything
self.set_metadata(simfile) # Set the simulation date and filename
self.update_pars(pars, **kwargs) # Update the parameters, if provided
self.load_data(datafile) # Load the data, if provided
return
def load_data(self, datafile=None, verbose=None, **kwargs):
''' Load the data to calibrate against, if provided '''
if verbose is None:
verbose = self['verbose']
self.datafile = datafile # Store this
if datafile is not None: # If a data file is provided, load it
self.data = cvm.load_data(datafile=datafile, verbose=verbose, start_day=self['start_day'], **kwargs)
return
def initialize(self, reset=False, init_infections=True, **kwargs):
'''
Perform all initializations on the sim.
This includes validating the parameters, setting the random number seed,
creating the results structure, initializing the people, validating the
layer parameters (which requires the people), and initializing the interventions.
Note: to create a population to save for later use, use ``init_infections=False``.
This will then create a fresh People object which other sims can finish
initializing.
Args:
reset (bool): whether or not to reset people even if they already exist
init_infections (bool): whether to initialize infections (default true so sim is ready, but can't reuse people then)
kwargs (dict): passed to init_people
'''
self.t = 0 # The current time index
self.validate_pars() # Ensure parameters have valid values
self.set_seed() # Reset the random seed before the population is created
self.init_variants() # Initialize the variants
self.init_immunity() # initialize information about immunity (if use_waning=True)
self.init_results() # After initializing the variant, create the results structure
self.init_people(reset=reset, init_infections=init_infections, **kwargs) # Create all the people (the heaviest step)
self.init_interventions() # Initialize the interventions...
self.init_analyzers() # ...and the analyzers...
self.validate_layer_pars() # Once the population is initialized, validate the layer parameters again
self.set_seed() # Reset the random seed again so the random number stream is consistent
self.initialized = True
self.complete = False
self.results_ready = False
return self
def layer_keys(self):
'''
Attempt to retrieve the current layer keys, in the following order: from
the people object (for an initialized sim), from the popdict (for one in
the process of being initialized), from the beta_layer parameter (for an
uninitialized sim), or by assuming a default (if none of the above are
available).
'''
try:
keys = list(self['beta_layer'].keys()) # Get keys from beta_layer since the "most required" layer parameter
except: # pragma: no cover
keys = []
return keys
def reset_layer_pars(self, layer_keys=None, force=False):
'''
Reset the parameters to match the population.
Args:
layer_keys (list): override the default layer keys (use stored keys by default)
force (bool): reset the parameters even if they already exist
'''
if layer_keys is None:
if self.people is not None: # If people exist
layer_keys = self.people.contacts.keys()
elif self.popdict is not None:
layer_keys = self.popdict['layer_keys']
cvpar.reset_layer_pars(self.pars, layer_keys=layer_keys, force=force)
return
def validate_layer_pars(self):
'''
Handle layer parameters, since they need to be validated after the population
creation, rather than before.
'''
# First, try to figure out what the layer keys should be and perform basic type checking
layer_keys = self.layer_keys()
layer_pars = cvpar.layer_pars # The names of the parameters that are specified by layer
for lp in layer_pars:
val = self[lp]
if sc.isnumber(val): # It's a scalar instead of a dict, assume it's all contacts
self[lp] = {k:val for k in layer_keys}
# Handle key mismatches
for lp in layer_pars:
lp_keys = set(self.pars[lp].keys())
if not lp_keys == set(layer_keys):
errormsg = 'At least one layer parameter is inconsistent with the layer keys; all parameters must have the same keys:'
errormsg += f'\nsim.layer_keys() = {layer_keys}'
for lp2 in layer_pars: # Fail on first error, but re-loop to list all of them
errormsg += f'\n{lp2} = ' + ', '.join(self.pars[lp2].keys())
raise sc.KeyNotFoundError(errormsg)
# Handle mismatches with the population
if self.people is not None:
pop_keys = set(self.people.contacts.keys())
if pop_keys != set(layer_keys): # pragma: no cover
if not len(pop_keys):
errormsg = f'Your population does not have any layer keys, but your simulation does {layer_keys}. If you called cv.People() directly, you probably need cv.make_people() instead.'
raise sc.KeyNotFoundError(errormsg)
else:
errormsg = f'Please update your parameter keys {layer_keys} to match population keys {pop_keys}. You may find sim.reset_layer_pars() helpful.'
raise sc.KeyNotFoundError(errormsg)
return
def validate_pars(self, validate_layers=True):
'''
Some parameters can take multiple types; this makes them consistent.
Args:
validate_layers (bool): whether to validate layer parameters as well via validate_layer_pars() -- usually yes, except during initialization
'''
# Handle population size
pop_size = self.pars.get('pop_size')
scaled_pop = self.pars.get('scaled_pop')
pop_scale = self.pars.get('pop_scale')
if scaled_pop is not None: # If scaled_pop is supplied, try to use it
if pop_scale in [None, 1.0]: # Normal case, recalculate population scale
self['pop_scale'] = scaled_pop/pop_size
else: # Special case, recalculate number of agents
self['pop_size'] = int(scaled_pop/pop_scale)
# Handle types
for key in ['pop_size', 'pop_infected']:
try:
self[key] = int(self[key])
except Exception as E:
errormsg = f'Could not convert {key}={self[key]} of {type(self[key])} to integer'
raise ValueError(errormsg) from E
# Handle start day
start_day = self['start_day'] # Shorten
if start_day in [None, 0]: # Use default start day
start_day = '2020-03-01'
self['start_day'] = sc.date(start_day)
# Handle end day and n_days
end_day = self['end_day']
n_days = self['n_days']
if end_day:
self['end_day'] = sc.date(end_day)
n_days = sc.daydiff(self['start_day'], self['end_day'])
if n_days <= 0:
errormsg = f"Number of days must be >0, but you supplied start={str(self['start_day'])} and end={str(self['end_day'])}, which gives n_days={n_days}"
raise ValueError(errormsg)
else:
self['n_days'] = int(n_days)
else:
if n_days:
self['n_days'] = int(n_days)
self['end_day'] = self.date(n_days) # Convert from the number of days to the end day
else:
errormsg = f'You must supply one of n_days and end_day, not "{n_days}" and "{end_day}"'
raise ValueError(errormsg)
# Handle population data
popdata_choices = ['random', 'hybrid', 'clustered', 'synthpops']
choice = self['pop_type']
if choice and choice not in popdata_choices: # pragma: no cover
choicestr = ', '.join(popdata_choices)
errormsg = f'Population type "{choice}" not available; choices are: {choicestr}'
raise ValueError(errormsg)
# Handle interventions, analyzers, and variants
for key in ['interventions', 'analyzers', 'variants']: # Ensure all of them are lists
self[key] = sc.dcp(sc.tolist(self[key], keepnone=False)) # All of these have initialize functions that run into issues if they're reused
for i,interv in enumerate(self['interventions']):
if isinstance(interv, dict): # It's a dictionary representation of an intervention
self['interventions'][i] = cvi.InterventionDict(**interv)
self['variant_map'] = {int(k):v for k,v in self['variant_map'].items()} # Ensure keys are ints, not strings of ints if loaded from JSON
# Optionally handle layer parameters
if validate_layers:
self.validate_layer_pars()
# Handle versioning
if self._legacy_trans is None:
default_ver = self._default_ver if self._default_ver else self.version
self._legacy_trans = sc.compareversions(default_ver, '<3.1.1') # Handle regression
# Handle verbose
if self['verbose'] == 'brief':
self['verbose'] = -1
if not sc.isnumber(self['verbose']): # pragma: no cover
errormsg = f'Verbose argument should be either "brief", -1, or a float, not {type(self["verbose"])} "{self["verbose"]}"'
raise ValueError(errormsg)
return
def init_results(self):
'''
Create the main results structure.
We differentiate between flows, stocks, and cumulative results
The prefix "new" is used for flow variables, i.e. counting new events (infections/deaths/recoveries) on each timestep
The prefix "n" is used for stock variables, i.e. counting the total number in any given state (sus/inf/rec/etc) on any particular timestep
The prefix "cum" is used for cumulative variables, i.e. counting the total number that have ever been in a given state at some point in the sim
Note that, by definition, n_dead is the same as cum_deaths and n_recovered is the same as cum_recoveries, so we only define the cumulative versions
'''
def init_res(*args, **kwargs):
''' Initialize a single result object '''
output = cvb.Result(*args, **kwargs, npts=self.npts)
return output
dcols = cvd.get_default_colors() # Get default colors
# Flows and cumulative flows
for key,label in cvd.result_flows.items():
self.results[f'cum_{key}'] = init_res(f'Cumulative {label}', color=dcols[key]) # Cumulative variables -- e.g. "Cumulative infections"
for key,label in cvd.result_flows.items(): # Repeat to keep all the cumulative keys together
self.results[f'new_{key}'] = init_res(f'Number of new {label}', color=dcols[key]) # Flow variables -- e.g. "Number of new infections"
# Stock variables
for key,label in cvd.result_stocks.items():
self.results[f'n_{key}'] = init_res(label, color=dcols[key])
# Other variables
self.results['n_imports'] = init_res('Number of imported infections', scale=True)
self.results['n_alive'] = init_res('Number alive', scale=True)
self.results['n_naive'] = init_res('Number never infected', scale=True)
self.results['n_preinfectious'] = init_res('Number preinfectious', scale=True, color=dcols.exposed)
self.results['n_removed'] = init_res('Number removed', scale=True, color=dcols.recovered)
self.results['prevalence'] = init_res('Prevalence', scale=False)
self.results['incidence'] = init_res('Incidence', scale=False)
self.results['r_eff'] = init_res('Effective reproduction number', scale=False)
self.results['doubling_time'] = init_res('Doubling time', scale=False)
self.results['test_yield'] = init_res('Testing yield', scale=False)
self.results['rel_test_yield'] = init_res('Relative testing yield', scale=False)
self.results['frac_vaccinated'] = init_res('Proportion vaccinated', scale=False)
self.results['pop_nabs'] = init_res('Population nab levels', scale=False, color=dcols.pop_nabs)
self.results['pop_protection'] = init_res('Population immunity protection', scale=False, color=dcols.pop_protection)
self.results['pop_symp_protection'] = init_res('Population symptomatic protection', scale=False, color=dcols.pop_symp_protection)
# Handle variants
nv = self['n_variants']
self.results['variant'] = {}
self.results['variant']['prevalence_by_variant'] = init_res('Prevalence by variant', scale=False, n_variants=nv)
self.results['variant']['incidence_by_variant'] = init_res('Incidence by variant', scale=False, n_variants=nv)
for key,label in cvd.result_flows_by_variant.items():
self.results['variant'][f'cum_{key}'] = init_res(f'Cumulative {label}', color=dcols[key], n_variants=nv) # Cumulative variables -- e.g. "Cumulative infections"
for key,label in cvd.result_flows_by_variant.items():
self.results['variant'][f'new_{key}'] = init_res(f'Number of new {label}', color=dcols[key], n_variants=nv) # Flow variables -- e.g. "Number of new infections"
for key,label in cvd.result_stocks_by_variant.items():
self.results['variant'][f'n_{key}'] = init_res(label, color=dcols[key], n_variants=nv)
# Populate the rest of the results
if self['rescale']:
scale = 1
else:
scale = self['pop_scale']
self.rescale_vec = scale*np.ones(self.npts) # Not included in the results, but used to scale them
self.results['date'] = self.datevec
self.results['t'] = self.tvec
self.results_ready = False
return
def load_population(self, popfile=None, init_people=True, **kwargs):
'''
Load the population dictionary from file -- typically done automatically
as part of ``sim.initialize()``.
Supports loading either saved population dictionaries (popdicts, file ending
.pop by convention), or ready-to-go People objects (file ending .ppl by
convention). Either object an also be supplied directly. Once a population
file is loaded, it is removed from the Sim object.
Args:
popfile (str or obj): if a string, name of the file; otherwise, the popdict or People object to load
init_people (bool): whether to immediately convert the loaded popdict into an initialized People object
kwargs (dict): passed to ``sim.init_people()``
'''
# Set the file path if not is provided
if popfile is None and self.popfile is not None:
popfile = self.popfile
# Load the population into the popdict
self.popdict = cvm.load(popfile)
if self['verbose']:
print(f'Loading population from {popfile}')
if init_people:
self.init_people(**kwargs)
return
def init_people(self, popdict=None, init_infections=False, reset=False, verbose=None, **kwargs):
'''
Create the people.
Use ``init_infections=False`` for creating a fresh People object for use
in future simulations
Args:
popdict (any): pre-generated people of various formats
init_infections (bool): whether to initialize infections (default false when called directly)
reset (bool): whether to regenerate the people even if they already exist
verbose (int): detail to print
kwargs (dict): passed to cv.make_people()
'''
# Handle inputs
if verbose is None:
verbose = self['verbose']
if popdict is not None:
self.popdict = popdict
if verbose > 0:
resetstr= ''
if self.people:
resetstr = ' (resetting people)' if reset else ' (warning: not resetting sim.people)'
print(f'Initializing sim{resetstr} with {self["pop_size"]:0n} people for {self["n_days"]} days')
if self.popfile and self.popdict is None: # If there's a popdict, we initialize it via cvpop.make_people()
self.load_population(init_people=False)
# Actually make the people
self.people = cvpop.make_people(self, reset=reset, verbose=verbose, **kwargs)
self.people.initialize(sim_pars=self.pars) # Fully initialize the people
self.reset_layer_pars(force=False) # Ensure that layer keys match the loaded population
if init_infections:
self.init_infections(verbose=verbose)
return self
def init_interventions(self):
''' Initialize and validate the interventions '''
# Initialization
if self._orig_pars and 'interventions' in self._orig_pars:
self['interventions'] = self._orig_pars.pop('interventions') # Restore
for i,intervention in enumerate(self['interventions']):
if isinstance(intervention, cvi.Intervention):
intervention.initialize(self)
# Validation
trace_ind = np.nan # Index of the tracing intervention(s)
test_ind = np.nan # Index of the tracing intervention(s)
for i,intervention in enumerate(self['interventions']):
if isinstance(intervention, (cvi.contact_tracing)):
trace_ind = np.fmin(trace_ind, i) # Find the earliest-scheduled tracing intervention
elif isinstance(intervention, (cvi.test_num, cvi.test_prob)):
test_ind = np.fmax(test_ind, i) # Find the latest-scheduled testing intervention
if not np.isnan(trace_ind): # pragma: no cover
warnmsg = ''
if np.isnan(test_ind):
warnmsg = 'Note: you have defined a contact tracing intervention but no testing intervention was found. Unless this is intentional, please define at least one testing intervention.'
elif trace_ind < test_ind:
warnmsg = f'Note: contact tracing (index {trace_ind:.0f}) is scheduled before testing ({test_ind:.0f}); this creates a 1-day delay. Unless this is intentional, please reorder the interentions.'
if warnmsg:
cvm.warn(warnmsg)
return
def finalize_interventions(self):
for intervention in self['interventions']:
if isinstance(intervention, cvi.Intervention):
intervention.finalize(self)
def init_analyzers(self):
''' Initialize the analyzers '''
if self._orig_pars and 'analyzers' in self._orig_pars:
self['analyzers'] = self._orig_pars.pop('analyzers') # Restore
for analyzer in self['analyzers']:
if isinstance(analyzer, cva.Analyzer):
analyzer.initialize(self)
return
def finalize_analyzers(self):
for analyzer in self['analyzers']:
if isinstance(analyzer, cva.Analyzer):
analyzer.finalize(self)
def init_variants(self):
''' Initialize the variants '''
if self._orig_pars and 'variants' in self._orig_pars:
self['variants'] = self._orig_pars.pop('variants') # Restore
for i,variant in enumerate(self['variants']):
if isinstance(variant, cvimm.variant):
if not variant.initialized:
variant.initialize(self)
else: # pragma: no cover
errormsg = f'Variant {i} ({variant}) is not a cv.variant object; please create using cv.variant()'
raise TypeError(errormsg)
len_pars = len(self['variant_pars'])
len_map = len(self['variant_map'])
assert len_pars == len_map, f"variant_pars and variant_map must be the same length, but they're not: {len_pars} ≠ {len_map}"
self['n_variants'] = len_pars # Each variant has an entry in variant_pars
return
def init_immunity(self, create=False):
''' Initialize immunity matrices and precompute nab waning for each variant '''
if self['use_waning']:
cvimm.init_immunity(self, create=create)
return
def init_infections(self, force=False, verbose=None):
'''
Initialize prior immunity and seed infections.
Args:
force (bool): initialize prior infections even if already initialized
'''
if verbose is None:
verbose = self['verbose']
# If anyone is non-naive, don't re-initialize
if self.people.count_not('naive') == 0 or force: # Everyone is naive
# Handle anyone who isn't susceptible
if self['frac_susceptible'] < 1:
inds = cvu.choose(self['pop_size'], np.round((1-self['frac_susceptible'])*self['pop_size']))
self.people.make_nonnaive(inds=inds)
# Create the seed infections
if self['pop_infected']:
inds = cvu.choose(self['pop_size'], self['pop_infected'])
self.people.infect(inds=inds, layer='seed_infection') # Not counted by results since flows are re-initialized during the step
elif verbose:
print(f'People already initialized with {self.people.count_not("naive")} people non-naive and {self.people.count("exposed")} exposed; not reinitializing')
return
def rescale(self):
''' Dynamically rescale the population -- used during step() '''
if self['rescale']:
pop_scale = self['pop_scale']
current_scale = self.rescale_vec[self.t]
if current_scale < pop_scale: # We have room to rescale
not_naive_inds = self.people.false('naive') # Find everyone not naive
n_not_naive = len(not_naive_inds) # Number of people who are not naive
n_people = self['pop_size'] # Number of people overall
current_ratio = n_not_naive/n_people # Current proportion not naive
threshold = self['rescale_threshold'] # Threshold to trigger rescaling
if current_ratio > threshold: # Check if we've reached point when we want to rescale
max_ratio = pop_scale/current_scale # We don't want to exceed the total population size
proposed_ratio = max(current_ratio/threshold, self['rescale_factor']) # The proposed ratio to rescale: the rescale factor, unless we've exceeded it
scaling_ratio = min(proposed_ratio, max_ratio) # We don't want to scale by more than the maximum ratio
self.rescale_vec[self.t:] *= scaling_ratio # Update the rescaling factor from here on
n = int(round(n_not_naive*(1.0-1.0/scaling_ratio))) # For example, rescaling by 2 gives n = 0.5*not_naive_inds
choices = cvu.choose(max_n=n_not_naive, n=n) # Choose who to make naive again
new_naive_inds = not_naive_inds[choices] # Convert these back into indices for people
self.people.make_naive(new_naive_inds) # Make people naive again
return
def step(self):
'''
Step the simulation forward in time. Usually, the user would use sim.run()
rather than calling sim.step() directly.
'''
# Set the time and if we have reached the end of the simulation, then do nothing
if self.complete:
raise AlreadyRunError('Simulation already complete (call sim.initialize() to re-run)')
t = self.t
# If it's the first timestep, infect people
if t == 0:
self.init_infections(verbose=False)
# Perform initial operations
self.rescale() # Check if we need to rescale
people = self.people # Shorten this for later use
people.update_states_pre(t=t) # Update the state of everyone and count the flows
contacts = people.update_contacts() # Compute new contacts
hosp_max = people.count('severe') > self['n_beds_hosp'] if self['n_beds_hosp'] is not None else False # Check for acute bed constraint
icu_max = people.count('critical') > self['n_beds_icu'] if self['n_beds_icu'] is not None else False # Check for ICU bed constraint
# Randomly infect some people (imported infections)
if self['n_imports']:
n_imports = cvu.poisson(self['n_imports']/self.rescale_vec[self.t]) # Imported cases
if n_imports>0:
importation_inds = cvu.choose(max_n=self['pop_size'], n=n_imports)
people.infect(inds=importation_inds, hosp_max=hosp_max, icu_max=icu_max, layer='importation')
self.results['n_imports'][t] += n_imports
# Add variants
for variant in self['variants']:
if isinstance(variant, cvimm.variant):
variant.apply(self)
# Apply interventions
for i,intervention in enumerate(self['interventions']):
intervention(self) # If it's a function, call it directly
people.update_states_post() # Check for state changes after interventions
# Compute viral loads
frac_time = cvd.default_float(self['viral_dist']['frac_time'])
load_ratio = cvd.default_float(self['viral_dist']['load_ratio'])
high_cap = cvd.default_float(self['viral_dist']['high_cap'])
date_inf = people.date_infectious
date_rec = people.date_recovered
date_dead = people.date_dead
viral_load = cvu.compute_viral_load(t, date_inf, date_rec, date_dead, frac_time, load_ratio, high_cap)
# Shorten useful parameters
nv = self['n_variants'] # Shorten number of variants
sus = people.susceptible
symp = people.symptomatic
diag = people.diagnosed
quar = people.quarantined
iso = people.isolated
prel_trans = people.rel_trans
prel_sus = people.rel_sus
# Iterate through n_variants to calculate infections
for variant in range(nv):
# Deal with variant parameters
asymp_factor = self['asymp_factor']
variant_label = self.pars['variant_map'][variant]
beta = cvd.default_float(self['beta'] * self['rel_beta'] * self['variant_pars'][variant_label]['rel_beta'])
inf_variant = people.infectious * (people.infectious_variant == variant)
if ~inf_variant.any():
continue
for lkey, layer in contacts.items():
p1 = layer['p1']
p2 = layer['p2']
betas = layer['beta']
# Compute relative transmission and susceptibility
sus_imm = people.sus_imm[variant,:]
iso_factor = cvd.default_float(self['iso_factor'][lkey])
quar_factor = cvd.default_float(self['quar_factor'][lkey])
beta_layer = cvd.default_float(self['beta_layer'][lkey])
rel_trans, rel_sus = cvu.compute_trans_sus(prel_trans, prel_sus, inf_variant, sus, beta_layer, viral_load, symp, iso, quar, asymp_factor, iso_factor, quar_factor, sus_imm)
# Calculate actual transmission
pairs = [[p1,p2]] if not self._legacy_trans else [[p1,p2], [p2,p1]] # Support slower legacy method of calculation, but by default skip this loop
for p1,p2 in pairs:
source_inds, target_inds = cvu.compute_infections(beta, p1, p2, betas, rel_trans, rel_sus, legacy=self._legacy_trans) # Calculate transmission!
people.infect(inds=target_inds, hosp_max=hosp_max, icu_max=icu_max, source=source_inds, layer=lkey, variant=variant) # Actually infect people
# Update counts for this time step: stocks
for key in cvd.result_stocks.keys():
self.results[f'n_{key}'][t] = people.count(key)
for key in cvd.result_stocks_by_variant.keys():
for variant in range(nv):
self.results['variant'][f'n_{key}'][variant, t] = people.count_by_variant(key, variant)
# Update counts for this time step: flows
for key,count in people.flows.items():
self.results[key][t] += count
for key,count in people.flows_variant.items():
for variant in range(nv):
self.results['variant'][key][variant][t] += count[variant]
# Update nab and immunity for this time step
if self['use_waning']:
has_nabs = cvu.true(people.peak_nab)
if len(has_nabs):
cvimm.update_nab(people, inds=has_nabs)
inds_alive = cvu.false(people.dead)
self.results['pop_nabs'][t] = np.sum(people.nab[inds_alive[cvu.true(people.nab[inds_alive])]])/len(inds_alive)
self.results['pop_protection'][t] = np.nanmean(people.sus_imm)
self.results['pop_symp_protection'][t] = np.nanmean(people.symp_imm)
# Apply analyzers -- same syntax as interventions
for i,analyzer in enumerate(self['analyzers']):
analyzer(self)
# Tidy up
self.t += 1
if self.t == self.npts:
self.complete = True
return
def run(self, do_plot=False, until=None, restore_pars=True, reset_seed=True, verbose=None):
'''
Run the simulation.
Args:
do_plot (bool): whether to plot
until (int/str): day or date to run until
restore_pars (bool): whether to make a copy of the parameters before the run and restore it after, so runs are repeatable
reset_seed (bool): whether to reset the random number stream immediately before run
verbose (float): level of detail to print, e.g. -1 = one-line output, 0 = no output, 0.1 = print every 10th day, 1 = print every day
Returns:
A pointer to the sim object (with results modified in-place)
'''
# Initialization steps -- start the timer, initialize the sim and the seed, and check that the sim hasn't been run
T = sc.timer()
if not self.initialized:
self.initialize()
self._orig_pars = sc.dcp(self.pars) # Create a copy of the parameters, to restore after the run, in case they are dynamically modified
if verbose is None:
verbose = self['verbose']
if reset_seed:
# Reset the RNG. If the simulation is newly created, then the RNG will be reset by sim.initialize() so the use case
# for resetting the seed here is if the simulation has been partially run, and changing the seed is required
self.set_seed()
# Check for AlreadyRun errors
errormsg = None
until = self.npts if until is None else self.day(until)
if until > self.npts:
errormsg = f'Requested to run until t={until} but the simulation end is t={self.npts}'
if self.t >= until: # NB. At the start, self.t is None so this check must occur after initialization
errormsg = f'Simulation is currently at t={self.t}, requested to run until t={until} which has already been reached'
if self.complete:
errormsg = 'Simulation is already complete (call sim.initialize() to re-run)'
if self.people.t not in [self.t, self.t-1]: # Depending on how the sim stopped, either of these states are possible
errormsg = f'The simulation has been run independently from the people (t={self.t}, people.t={self.people.t}): if this is intentional, manually set sim.people.t = sim.t. Remember to save the people object before running the sim.'
if errormsg:
raise AlreadyRunError(errormsg)
# Main simulation loop
while self.t < until:
# Check if we were asked to stop
elapsed = T.toc(output=True)
if self['timelimit'] and elapsed > self['timelimit']:
sc.printv(f"Time limit ({self['timelimit']} s) exceeded; call sim.finalize() to compute results if desired", 1, verbose)
return
elif self['stopping_func'] and self['stopping_func'](self):
sc.printv("Stopping function terminated the simulation; call sim.finalize() to compute results if desired", 1, verbose)
return
# Print progress
if verbose:
simlabel = f'"{self.label}": ' if self.label else ''
string = f' Running {simlabel}{self.datevec[self.t]} ({self.t:2.0f}/{self.pars["n_days"]}) ({elapsed:0.2f} s) '
if verbose >= 2:
sc.heading(string)
elif verbose>0:
if not (self.t % int(1.0/verbose)):
sc.progressbar(self.t+1, self.npts, label=string, length=20, newline=True)
# Do the heavy lifting -- actually run the model!
self.step()
# If simulation reached the end, finalize the results
if self.complete:
self.finalize(verbose=verbose, restore_pars=restore_pars)
sc.printv(f'Run finished after {elapsed:0.2f} s.\n', 1, verbose)
return self
def finalize(self, verbose=None, restore_pars=True):
''' Compute final results '''
if self.results_ready:
# Because the results are rescaled in-place, finalizing the sim cannot be run more than once or
# otherwise the scale factor will be applied multiple times
raise AlreadyRunError('Simulation has already been finalized')
# Scale the results
for reskey in self.result_keys():
if self.results[reskey].scale: # Scale the result dynamically
self.results[reskey].values *= self.rescale_vec
for reskey in self.result_keys('variant'):
if self.results['variant'][reskey].scale: # Scale the result dynamically
self.results['variant'][reskey].values = np.einsum('ij,j->ij', self.results['variant'][reskey].values, self.rescale_vec)
# Calculate cumulative results
for key in cvd.result_flows.keys():
self.results[f'cum_{key}'][:] = np.cumsum(self.results[f'new_{key}'][:], axis=0)
for key in cvd.result_flows_by_variant.keys():
for variant in range(self['n_variants']):
self.results['variant'][f'cum_{key}'][variant, :] = np.cumsum(self.results['variant'][f'new_{key}'][variant, :], axis=0)
for res in [self.results['cum_infections'], self.results['variant']['cum_infections_by_variant']]: # Include initially infected people
res.values += self['pop_infected']*self.rescale_vec[0]
# Finalize interventions and analyzers
self.finalize_interventions()
self.finalize_analyzers()
# Final settings
self.results_ready = True # Set this first so self.summary() knows to print the results
self.t -= 1 # During the run, this keeps track of the next step; restore this be the final day of the sim
# Perform calculations on results
self.compute_results(verbose=verbose) # Calculate the rest of the results
self.results = sc.objdict(self.results) # Convert results to a odicts/objdict to allow e.g. sim.results.diagnoses
if restore_pars and self._orig_pars:
preserved = ['analyzers', 'interventions']
orig_pars_keys = list(self._orig_pars.keys()) # Get a list of keys so we can iterate over them
for key in orig_pars_keys:
if key not in preserved:
self.pars[key] = self._orig_pars.pop(key) # Restore everything except for the analyzers and interventions
# Optionally print summary output
if verbose: # Verbose is any non-zero value
if verbose>0: # Verbose is any positive number
self.summarize() # Print medium-length summary of the sim
else:
self.brief() # Print brief summary of the sim
return
def compute_results(self, verbose=None):
''' Perform final calculations on the results '''
self.compute_states()
self.compute_yield()
self.compute_doubling()
self.compute_r_eff()
self.compute_summary()
return
def compute_states(self):
'''
Compute prevalence, incidence, and other states. Prevalence is the current
number of infected people divided by the number of people who are alive.
Incidence is the number of new infections per day divided by the susceptible
population. Also calculates the number of people alive, the number preinfectious,
the number removed, and recalculates susceptibles to handle scaling.
'''
res = self.results
count_recov = 1-self['use_waning'] # If waning is on, don't count recovered people as removed
self.results['n_alive'][:] = self.scaled_pop_size - res['cum_deaths'][:] # Number of people still alive
self.results['n_naive'][:] = self.scaled_pop_size - res['cum_deaths'][:] - res['n_recovered'][:] - res['n_exposed'][:] # Number of people naive
self.results['n_susceptible'][:] = res['n_alive'][:] - res['n_exposed'][:] - count_recov*res['cum_recoveries'][:] # Recalculate the number of susceptible people, not agents
self.results['n_preinfectious'][:] = res['n_exposed'][:] - res['n_infectious'][:] # Calculate the number not yet infectious: exposed minus infectious
self.results['n_removed'][:] = count_recov*res['cum_recoveries'][:] + res['cum_deaths'][:] # Calculate the number removed: recovered + dead
self.results['prevalence'][:] = res['n_exposed'][:]/res['n_alive'][:] # Calculate the prevalence
self.results['incidence'][:] = res['new_infections'][:]/res['n_susceptible'][:] # Calculate the incidence
self.results['frac_vaccinated'][:] = res['n_vaccinated'][:]/res['n_alive'][:] # Calculate the fraction vaccinated
self.results['variant']['incidence_by_variant'][:] = np.einsum('ji,i->ji',res['variant']['new_infections_by_variant'][:], 1/res['n_susceptible'][:]) # Calculate the incidence
self.results['variant']['prevalence_by_variant'][:] = np.einsum('ji,i->ji',res['variant']['new_infections_by_variant'][:], 1/res['n_alive'][:]) # Calculate the prevalence
return
def compute_yield(self):
'''
Compute test yield -- number of positive tests divided by the total number
of tests, also called test positivity rate. Relative yield is with respect
to prevalence: i.e., how the yield compares to what the yield would be from
choosing a person at random from the population.
'''
# Absolute yield
res = self.results
inds = cvu.true(res['new_tests'][:]) # Pull out non-zero numbers of tests
self.results['test_yield'][inds] = res['new_diagnoses'][inds]/res['new_tests'][inds] # Calculate the yield
# Relative yield
inds = cvu.true(res['n_infectious'][:]) # To avoid divide by zero if no one is infectious
denom = res['n_infectious'][inds] / (res['n_alive'][inds] - res['cum_diagnoses'][inds]) # Alive + undiagnosed people might test; infectious people will test positive
self.results['rel_test_yield'][inds] = self.results['test_yield'][inds]/denom # Calculate the relative yield
return
def compute_doubling(self, window=3, max_doubling_time=30):
'''
Calculate doubling time using exponential approximation -- a more detailed
approach is in utils.py. Compares infections at time t to infections at time
t-window, and uses that to compute the doubling time. For example, if there are
100 cumulative infections on day 12 and 200 infections on day 19, doubling
time is 7 days.
Args:
window (float): the size of the window used (larger values are more accurate but less precise)
max_doubling_time (float): doubling time could be infinite, so this places a bound on it
Returns:
doubling_time (array): the doubling time results array
'''
cum_infections = self.results['cum_infections'].values
infections_now = cum_infections[window:]
infections_prev = cum_infections[:-window]
use = (infections_prev > 0) & (infections_now > infections_prev)
doubling_time = window * np.log(2) / np.log(infections_now[use] / infections_prev[use])
self.results['doubling_time'][:] = np.nan
self.results['doubling_time'][window:][use] = np.minimum(doubling_time, max_doubling_time)
return self.results['doubling_time'].values
def compute_r_eff(self, method='daily', smoothing=2, window=7):
'''
Effective reproduction number based on number of people each person infected.
Args:
method (str): 'daily' uses daily infections, 'infectious' counts from the date infectious, 'outcome' counts from the date recovered/dead
smoothing (int): the number of steps to smooth over for the 'daily' method
window (int): the size of the window used for 'infectious' and 'outcome' calculations (larger values are more accurate but less precise)
Returns:
r_eff (array): the r_eff results array
'''
# Initialize arrays to hold sources and targets infected each day
sources = np.zeros(self.npts)
targets = np.zeros(self.npts)
window = int(window)
# Default method -- calculate the daily infections
if method == 'daily':
# Find the dates that everyone became infectious and recovered, and hence calculate infectious duration
recov_inds = self.people.defined('date_recovered')
dead_inds = self.people.defined('date_dead')
date_recov = self.people.date_recovered[recov_inds]
date_dead = self.people.date_dead[dead_inds]
date_outcome = np.concatenate((date_recov, date_dead))
inds = np.concatenate((recov_inds, dead_inds))
date_inf = self.people.date_infectious[inds]
if len(date_outcome):
mean_inf = date_outcome.mean() - date_inf.mean()
else:
warnmsg ='There were no infections during the simulation'
cvm.warn(warnmsg)
mean_inf = 0 # Doesn't matter since r_eff is 0
# Calculate R_eff as the mean infectious duration times the number of new infectious divided by the number of infectious people on a given day
new_infections = self.results['new_infections'].values - self.results['n_imports'].values
n_infectious = self.results['n_infectious'].values
raw_values = mean_inf*np.divide(new_infections, n_infectious, out=np.zeros(self.npts), where=n_infectious>0)
# Handle smoothing, including with too-short arrays
len_raw = len(raw_values) # Calculate the number of raw values
dur_pars = self['dur'][0] if isinstance(self['dur'], list) else self['dur'] # Note: does not take variants into account
if len_raw >= 3: # Can't smooth arrays shorter than this since the default smoothing kernel has length 3
initial_period = dur_pars['exp2inf']['par1'] + dur_pars['asym2rec']['par1'] # Approximate the duration of the seed infections for averaging
initial_period = int(min(len_raw, initial_period)) # Ensure we don't have too many points
for ind in range(initial_period): # Loop over each of the initial inds
raw_values[ind] = raw_values[ind:initial_period].mean() # Replace these values with their average
values = sc.smooth(raw_values, smoothing)
values[:smoothing] = raw_values[:smoothing] # To avoid numerical effects, replace the beginning and end with the original
values[-smoothing:] = raw_values[-smoothing:]
else:
values = raw_values
# Alternate (traditional) method -- count from the date of infection or outcome
elif method in ['infectious', 'outcome']:
# Store a mapping from each source to their date
source_dates = {}
for t in self.tvec:
# Sources are easy -- count up the arrays for all the people who became infections on that day
if method == 'infectious':
inds = cvu.true(t == self.people.date_infectious) # Find people who became infectious on this timestep
elif method == 'outcome':
recov_inds = cvu.true(t == self.people.date_recovered) # Find people who recovered on this timestep
dead_inds = cvu.true(t == self.people.date_dead) # Find people who died on this timestep
inds = np.concatenate((recov_inds, dead_inds))
sources[t] = len(inds)
# Create the mapping from sources to dates
for ind in inds:
source_dates[ind] = t
# Targets are hard -- loop over the transmission tree
for transdict in self.people.infection_log:
source = transdict['source']
if source is not None and source in source_dates: # Skip seed infections and people with e.g. recovery after the end of the sim
source_date = source_dates[source]
targets[source_date] += 1
# for ind in inds:
# targets[t] += len(self.people.transtree.targets[ind])
# Populate the array -- to avoid divide-by-zero, skip indices that are 0
r_eff = np.divide(targets, sources, out=np.full(self.npts, np.nan), where=sources > 0)
# Use stored weights calculate the moving average over the window of timesteps, n
num = np.nancumsum(r_eff * sources)
num[window:] = num[window:] - num[:-window]
den = np.cumsum(sources)
den[window:] = den[window:] - den[:-window]
values = np.divide(num, den, out=np.full(self.npts, np.nan), where=den > 0)
# Method not recognized
else: # pragma: no cover
errormsg = f'Method must be "daily", "infectious", or "outcome", not "{method}"'
raise ValueError(errormsg)
# Set the values and return
self.results['r_eff'].values[:] = values