/
relative_setup.py
1855 lines (1587 loc) · 109 KB
/
relative_setup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import absolute_import
from perses.dispersed import feptasks
from perses.utils.openeye import *
from perses.utils.data import load_smi
from perses.annihilation.relative import HybridTopologyFactory
from perses.annihilation.lambda_protocol import RelativeAlchemicalState, LambdaProtocol
from perses.rjmc.topology_proposal import TopologyProposal, TwoMoleculeSetProposalEngine, SmallMoleculeSetProposalEngine
from perses.rjmc.geometry import FFAllAngleGeometryEngine
from openmmtools.states import ThermodynamicState, CompoundThermodynamicState, SamplerState
import pymbar
import simtk.openmm as openmm
import simtk.openmm.app as app
import simtk.unit as unit
import numpy as np
from openmoltools import forcefield_generators
import copy
import mdtraj as md
from io import StringIO
from openmmtools.constants import kB
import logging
import os
import dask.distributed as distributed
import parmed as pm
from collections import namedtuple
from typing import List, Tuple, Union, NamedTuple
from collections import namedtuple
import random
from scipy.special import logsumexp
logging.basicConfig(level = logging.NOTSET)
_logger = logging.getLogger("relative_setup")
_logger.setLevel(logging.INFO)
# define NamedTuples from feptasks
# EquilibriumResult = namedtuple('EquilibriumResult', ['sampler_state', 'reduced_potentials', 'files', 'timers', 'nonalchemical_perturbations'])
EquilibriumFEPTask = namedtuple('EquilibriumInput', ['sampler_state', 'inputs', 'outputs'])
NonequilibriumFEPTask = namedtuple('NonequilibriumFEPTask', ['particle', 'inputs'])
class RelativeFEPSetup(object):
"""
This class is a helper class for relative FEP calculations. It generates the input objects that are necessary
legs of a relative FEP calculation. For each leg, that is a TopologyProposal, old_positions, and new_positions.
Importantly, it ensures that the atom maps in the solvent and complex phases match correctly.
"""
def __init__(self, ligand_input, old_ligand_index, new_ligand_index, forcefield_files, phases,
protein_pdb_filename=None,receptor_mol2_filename=None, pressure=1.0 * unit.atmosphere,
temperature=300.0 * unit.kelvin, solvent_padding=9.0 * unit.angstroms, atom_map=None,
hmass=4*unit.amus, neglect_angles=False, map_strength='default', anneal_14s = False,
small_molecule_forcefield='gaff-2.11', small_molecule_parameters_cache=None):
"""
Initialize a NonequilibriumFEPSetup object
Parameters
----------
ligand_input : str
the name of the ligand file (any openeye supported format)
this can either be an .sdf or list of .sdf files, or a list of SMILES strings
forcefield_files : list of str
The list of ffxml files that contain the forcefields that will be used (excepting for small molecules)
phases : list of str
The phases to simulate
protein_pdb_filename : str, default None
Protein pdb filename. If none, receptor_mol2_filename must be provided
receptor_mol2_filename : str, default None
Receptor mol2 filename. If none, protein_pdb_filename must be provided
pressure : Quantity, units of pressure
Pressure to use in the barostat
temperature : Quantity, units of temperature
Temperature to use for the Langevin integrator
solvent_padding : Quantity, units of length
The amount of padding to use when adding solvent
neglect_angles : bool
Whether to neglect certain angle terms for the purpose of minimizing work variance in the RJMC protocol.
anneal_14s : bool, default False
Whether to anneal 1,4 interactions over the protocol;
if True, then geometry_engine takes the argument use_14_nonbondeds = False;
if False, then geometry_engine takes the argument use_14_nonbondeds = True;
small_molecule_forcefield : str, optional, default='gaff-2.11'
Small molecule force field name.
Anything supported by SystemGenerator is supported, but we recommend one of
['gaff-1.81', 'gaff-2.11', 'smirnoff99Frosst-1.1.0', 'openff-1.0.0']
small_molecule_parameters_cache : str, optional, default=None
If specified, this filename will be used for a small molecule parameter cache by the SystemGenerator.
"""
self._pressure = pressure
self._temperature = temperature
self._barostat_period = 50
self._pme_tol = 1e-04
self._padding = solvent_padding
self._hmass = hmass
_logger.info(f"\t\t\t_hmass: {hmass}.\n")
self._proposal_phase = None
self._map_strength = map_strength
self._anneal_14s = anneal_14s
beta = 1.0 / (kB * temperature)
mol_list = []
#all legs need ligands so do this first
self._ligand_input = ligand_input
self._old_ligand_index = old_ligand_index
self._new_ligand_index = new_ligand_index
_logger.info(f"Handling files for ligands and indices...")
if type(self._ligand_input) is not list: # the ligand has been provided as a single file
if self._ligand_input[-3:] == 'smi': #
_logger.info(f"Detected .smi format. Proceeding...")
self._ligand_smiles_old = load_smi(self._ligand_input,self._old_ligand_index)
self._ligand_smiles_new = load_smi(self._ligand_input,self._new_ligand_index)
_logger.info(f"\told smiles: {self._ligand_smiles_old}")
_logger.info(f"\tnew smiles: {self._ligand_smiles_new}")
all_old_mol = createSystemFromSMILES(self._ligand_smiles_old, title='MOL') # should be stereospecific
self._ligand_oemol_old, self._ligand_system_old, self._ligand_positions_old, self._ligand_topology_old = all_old_mol
self._ligand_oemol_old = generate_unique_atom_names(self._ligand_oemol_old)
all_new_mol = createSystemFromSMILES(self._ligand_smiles_new, title='NEW')
self._ligand_oemol_new, self._ligand_system_new, self._ligand_positions_new, self._ligand_topology_new = all_new_mol
self._ligand_oemol_new = generate_unique_atom_names(self._ligand_oemol_new)
_logger.info(f"\tsuccessfully created old and new systems from smiles")
mol_list.append(self._ligand_oemol_old)
mol_list.append(self._ligand_oemol_new)
# forcefield_generators needs to be able to distinguish between the two ligands
# while topology_proposal needs them to have the same residue name
self._ligand_oemol_old.SetTitle("MOL")
self._ligand_oemol_new.SetTitle("MOL")
_logger.info(f"\tsetting both molecule oemol titles to 'MOL'.")
self._ligand_topology_old = forcefield_generators.generateTopologyFromOEMol(self._ligand_oemol_old)
self._ligand_topology_new = forcefield_generators.generateTopologyFromOEMol(self._ligand_oemol_new)
_logger.info(f"\tsuccessfully generated topologies for both oemols.")
elif self._ligand_input[-3:] == 'sdf': #
_logger.info(f"Detected .sdf format. Proceeding...") #TODO: write checkpoints for sdf format
self._ligand_oemol_old = createOEMolFromSDF(self._ligand_input, index=self._old_ligand_index)
self._ligand_oemol_new = createOEMolFromSDF(self._ligand_input, index=self._new_ligand_index)
self._ligand_oemol_old = generate_unique_atom_names(self._ligand_oemol_old)
self._ligand_oemol_new = generate_unique_atom_names(self._ligand_oemol_new)
mol_list.append(self._ligand_oemol_old)
mol_list.append(self._ligand_oemol_new)
self._ligand_positions_old = extractPositionsFromOEMol(self._ligand_oemol_old)
_logger.info(f"\tsuccessfully extracted positions from OEMOL.")
self._ligand_oemol_old.SetTitle("MOL")
self._ligand_oemol_new.SetTitle("MOL")
_logger.info(f"\tsetting both molecule oemol titles to 'MOL'.")
self._ligand_smiles_old = oechem.OECreateSmiString(self._ligand_oemol_old,
oechem.OESMILESFlag_DEFAULT | oechem.OESMILESFlag_Hydrogens)
self._ligand_smiles_new = oechem.OECreateSmiString(self._ligand_oemol_new,
oechem.OESMILESFlag_DEFAULT | oechem.OESMILESFlag_Hydrogens)
_logger.info(f"\tsuccessfully created SMILES for both ligand OEMOLs.")
# replace this with function that will generate the system etc. so that vacuum can be performed
self._ligand_topology_old = forcefield_generators.generateTopologyFromOEMol(self._ligand_oemol_old)
self._ligand_topology_new = forcefield_generators.generateTopologyFromOEMol(self._ligand_oemol_new)
_logger.info(f"\tsuccessfully generated topologies for both OEMOLs.")
else:
print(f'RelativeFEPSetup can only handle .smi or .sdf files currently')
else: # the ligand has been provided as a list of .sdf files
_logger.info(f"Detected list...perhaps this is of sdf format. Proceeding (but without checkpoints...this may be buggy).") #TODO: write checkpoints and debug for list
old_ligand = pm.load_file('%s.parm7' % self._ligand_input[0], '%s.rst7' % self._ligand_input[0])
self._ligand_topology_old = old_ligand.topology
self._ligand_positions_old = old_ligand.positions
self._ligand_oemol_old = createOEMolFromSDF('%s.mol2' % self._ligand_input[0])
self._ligand_oemol_old = generate_unique_atom_names(self._ligand_oemol_old)
self._ligand_smiles_old = oechem.OECreateSmiString(self._ligand_oemol_old,
oechem.OESMILESFlag_DEFAULT | oechem.OESMILESFlag_Hydrogens)
new_ligand = pm.load_file('%s.parm7' % self._ligand_input[1], '%s.rst7' % self._ligand_input[1])
self._ligand_topology_new = new_ligand.topology
self._ligand_positions_new = new_ligand.positions
self._ligand_oemol_new = createOEMolFromSDF('%s.mol2' % self._ligand_input[1])
self._ligand_oemol_new = generate_unique_atom_names(self._ligand_oemol_new)
self._ligand_smiles_new = oechem.OECreateSmiString(self._ligand_oemol_new,
oechem.OESMILESFlag_DEFAULT | oechem.OESMILESFlag_Hydrogens)
mol_list.append(self._ligand_oemol_old)
mol_list.append(self._ligand_oemol_new)
self._ligand_md_topology_old = md.Topology.from_openmm(self._ligand_topology_old)
self._ligand_md_topology_new = md.Topology.from_openmm(self._ligand_topology_new)
_logger.info(f"Created mdtraj topologies for both ligands.")
if 'complex' in phases or 'solvent' in phases:
self._nonbonded_method = app.PME
_logger.info(f"Detected complex or solvent phases: setting PME nonbonded method.")
else:
self._nonbonded_method = app.NoCutoff
_logger.info(f"Detected vacuum phase: setting noCutoff nonbonded method.")
# Select barostat
from simtk.openmm.app import NoCutoff, CutoffNonPeriodic
NONPERIODIC_NONBONDED_METHODS = [NoCutoff, CutoffNonPeriodic]
if pressure is not None:
if self._nonbonded_method not in NONPERIODIC_NONBONDED_METHODS:
barostat = openmm.MonteCarloBarostat(self._pressure, self._temperature, self._barostat_period)
_logger.info(f"set MonteCarloBarostat because pressure was specified as {pressure} atmospheres")
else:
barostat = None
_logger.info(f"omitted MonteCarloBarostat because pressure was specified but system was not periodic")
else:
barostat = None
_logger.info(f"omitted MonteCarloBarostat because pressure was not specified")
# Create openforcefield Molecule objects for old and new molecules
from openforcefield.topology import Molecule
molecules = [ Molecule.from_openeye(oemol) for oemol in [self._ligand_oemol_old, self._ligand_oemol_new] ]
# Create SystemGenerator
from openmmforcefields.generators import SystemGenerator
forcefield_kwargs = {'removeCMMotion': False, 'ewaldErrorTolerance': self._pme_tol, 'nonbondedMethod': self._nonbonded_method,'constraints' : app.HBonds, 'hydrogenMass' : self._hmass}
self._system_generator = SystemGenerator(forcefields=forcefield_files, barostat=barostat, forcefield_kwargs=forcefield_kwargs,
small_molecule_forcefield=small_molecule_forcefield, molecules=molecules, cache=small_molecule_parameters_cache)
_logger.info("successfully created SystemGenerator to create ligand systems")
_logger.info(f"executing SmallMoleculeSetProposalEngine...")
self._proposal_engine = SmallMoleculeSetProposalEngine([self._ligand_smiles_old, self._ligand_smiles_new], self._system_generator,map_strength=self._map_strength, residue_name='MOL')
_logger.info(f"instantiating FFAllAngleGeometryEngine...")
# NOTE: we are conducting the geometry proposal without any neglected angles
self._geometry_engine = FFAllAngleGeometryEngine(metadata=None, use_sterics=False, n_bond_divisions=100, n_angle_divisions=180, n_torsion_divisions=360, verbose=True, storage=None, bond_softening_constant=1.0, angle_softening_constant=1.0, neglect_angles = neglect_angles, use_14_nonbondeds = (not self._anneal_14s))
# if we are running multiple phases, we only want to generate one topology proposal, and use the same one for the other legs
# this is tracked using _proposal_phase
if 'complex' in phases:
_logger.info('Generating the topology proposal from the complex leg')
self._nonbonded_method = app.PME
_logger.info(f"setting up complex phase...")
self._setup_complex_phase(protein_pdb_filename,receptor_mol2_filename,mol_list)
self._complex_topology_old_solvated, self._complex_positions_old_solvated, self._complex_system_old_solvated = self._solvate_system(
self._complex_topology_old, self._complex_positions_old)
_logger.info(f"successfully generated complex topology, positions, system")
self._complex_md_topology_old_solvated = md.Topology.from_openmm(self._complex_topology_old_solvated)
_logger.info(f"creating TopologyProposal...")
self._complex_topology_proposal = self._proposal_engine.propose(self._complex_system_old_solvated,
self._complex_topology_old_solvated, current_mol=self._ligand_oemol_old,proposed_mol=self._ligand_oemol_new)
self.non_offset_new_to_old_atom_map = self._proposal_engine.non_offset_new_to_old_atom_map
self._proposal_phase = 'complex'
_logger.info(f"conducting geometry proposal...")
self._complex_positions_new_solvated, self._complex_logp_proposal = self._geometry_engine.propose(self._complex_topology_proposal,
self._complex_positions_old_solvated,
beta)
self._complex_logp_reverse = self._geometry_engine.logp_reverse(self._complex_topology_proposal, self._complex_positions_new_solvated, self._complex_positions_old_solvated, beta)
if not self._complex_topology_proposal.unique_new_atoms:
assert self._geometry_engine.forward_final_context_reduced_potential == None, f"There are no unique new atoms but the geometry_engine's final context reduced potential is not None (i.e. {self._geometry_engine.forward_final_context_reduced_potential})"
assert self._geometry_engine.forward_atoms_with_positions_reduced_potential == None, f"There are no unique new atoms but the geometry_engine's forward atoms-with-positions-reduced-potential in not None (i.e. { self._geometry_engine.forward_atoms_with_positions_reduced_potential})"
self._complex_added_valence_energy = 0.0
else:
self._complex_added_valence_energy = self._geometry_engine.forward_final_context_reduced_potential - self._geometry_engine.forward_atoms_with_positions_reduced_potential
if not self._complex_topology_proposal.unique_old_atoms:
assert self._geometry_engine.reverse_final_context_reduced_potential == None, f"There are no unique old atoms but the geometry_engine's final context reduced potential is not None (i.e. {self._geometry_engine.reverse_final_context_reduced_potential})"
assert self._geometry_engine.reverse_atoms_with_positions_reduced_potential == None, f"There are no unique old atoms but the geometry_engine's atoms-with-positions-reduced-potential in not None (i.e. { self._geometry_engine.reverse_atoms_with_positions_reduced_potential})"
self._complex_subtracted_valence_energy = 0.0
else:
self._complex_subtracted_valence_energy = self._geometry_engine.reverse_final_context_reduced_potential - self._geometry_engine.reverse_atoms_with_positions_reduced_potential
self._complex_forward_neglected_angles = self._geometry_engine.forward_neglected_angle_terms
self._complex_reverse_neglected_angles = self._geometry_engine.reverse_neglected_angle_terms
self._complex_geometry_engine = copy.deepcopy(self._geometry_engine)
if 'solvent' in phases:
_logger.info(f"Detected solvent...")
if self._proposal_phase is None:
_logger.info(f"no complex detected in phases...generating unique topology/geometry proposals...")
self._nonbonded_method = app.PME
_logger.info(f"solvating ligand...")
self._ligand_topology_old_solvated, self._ligand_positions_old_solvated, self._ligand_system_old_solvated = self._solvate_system(
self._ligand_topology_old, self._ligand_positions_old)
self._ligand_md_topology_old_solvated = md.Topology.from_openmm(self._ligand_topology_old_solvated)
_logger.info(f"creating TopologyProposal")
self._solvent_topology_proposal = self._proposal_engine.propose(self._ligand_system_old_solvated,
self._ligand_topology_old_solvated,current_mol=self._ligand_oemol_old,proposed_mol=self._ligand_oemol_new)
self.non_offset_new_to_old_atom_map = self._proposal_engine.non_offset_new_to_old_atom_map
self._proposal_phase = 'solvent'
else:
_logger.info('Using the topology proposal from the complex leg')
self._solvent_topology_proposal, self._ligand_positions_old_solvated = self._generate_solvent_topologies(
self._complex_topology_proposal, self._complex_positions_old_solvated)
_logger.info(f"conducting geometry proposal...")
self._ligand_positions_new_solvated, self._ligand_logp_proposal_solvated = self._geometry_engine.propose(self._solvent_topology_proposal,
self._ligand_positions_old_solvated, beta)
self._ligand_logp_reverse_solvated = self._geometry_engine.logp_reverse(self._solvent_topology_proposal, self._ligand_positions_new_solvated, self._ligand_positions_old_solvated, beta)
if not self._solvent_topology_proposal.unique_new_atoms:
assert self._geometry_engine.forward_final_context_reduced_potential == None, f"There are no unique new atoms but the geometry_engine's final context reduced potential is not None (i.e. {self._geometry_engine.forward_final_context_reduced_potential})"
assert self._geometry_engine.forward_atoms_with_positions_reduced_potential == None, f"There are no unique new atoms but the geometry_engine's forward atoms-with-positions-reduced-potential in not None (i.e. { self._geometry_engine.forward_atoms_with_positions_reduced_potential})"
self._solvated_added_valence_energy = 0.0
else:
self._solvated_added_valence_energy = self._geometry_engine.forward_final_context_reduced_potential - self._geometry_engine.forward_atoms_with_positions_reduced_potential
if not self._solvent_topology_proposal.unique_old_atoms:
assert self._geometry_engine.reverse_final_context_reduced_potential == None, f"There are no unique old atoms but the geometry_engine's final context reduced potential is not None (i.e. {self._geometry_engine.reverse_final_context_reduced_potential})"
assert self._geometry_engine.reverse_atoms_with_positions_reduced_potential == None, f"There are no unique old atoms but the geometry_engine's atoms-with-positions-reduced-potential in not None (i.e. { self._geometry_engine.reverse_atoms_with_positions_reduced_potential})"
self._solvated_subtracted_valence_energy = 0.0
else:
self._solvated_subtracted_valence_energy = self._geometry_engine.reverse_final_context_reduced_potential - self._geometry_engine.reverse_atoms_with_positions_reduced_potential
self._solvated_forward_neglected_angles = self._geometry_engine.forward_neglected_angle_terms
self._solvated_reverse_neglected_angles = self._geometry_engine.reverse_neglected_angle_terms
self._solvent_geometry_engine = copy.deepcopy(self._geometry_engine)
if 'vacuum' in phases:
_logger.info(f"Detected solvent...")
# need to change nonbonded cutoff and remove barostat for vacuum leg
_logger.info(f"assgning noCutoff to nonbonded_method")
self._nonbonded_method = app.NoCutoff
self._system_generator.barostat = None
_logger.info(f'Removing barostat for vacuum phase')
self._system_generator.forcefield_kwargs['nonbondedMethod'] = self._nonbonded_method
_logger.info(f'Setting nonbondedMethod to NoCutoff for vacuum phase')
_logger.info(f"calling SystemGenerator to create ligand systems.")
if self._proposal_phase is None:
_logger.info('No complex or solvent leg, so performing topology proposal for vacuum leg')
self._vacuum_topology_old, self._vacuum_positions_old, self._vacuum_system_old = self._solvate_system(self._ligand_topology_old,
self._ligand_positions_old,vacuum=True)
self._vacuum_topology_proposal = self._proposal_engine.propose(self._vacuum_system_old,
self._vacuum_topology_old,current_mol=self._ligand_oemol_old,proposed_mol=self._ligand_oemol_new)
self.non_offset_new_to_old_atom_map = self._proposal_engine.non_offset_new_to_old_atom_map
self._proposal_phase = 'vacuum'
elif self._proposal_phase == 'complex':
_logger.info('Using the topology proposal from the complex leg')
self._vacuum_topology_proposal, self._vacuum_positions_old = self._generate_vacuum_topologies(
self._complex_topology_proposal, self._complex_positions_old_solvated)
elif self._proposal_phase == 'solvent':
_logger.info('Using the topology proposal from the solvent leg')
self._vacuum_topology_proposal, self._vacuum_positions_old = self._generate_vacuum_topologies(
self._solvent_topology_proposal, self._ligand_positions_old_solvated)
_logger.info(f"conducting geometry proposal...")
self._vacuum_positions_new, self._vacuum_logp_proposal = self._geometry_engine.propose(self._vacuum_topology_proposal,
self._vacuum_positions_old,
beta)
self._vacuum_logp_reverse = self._geometry_engine.logp_reverse(self._vacuum_topology_proposal, self._vacuum_positions_new, self._vacuum_positions_old, beta)
if not self._vacuum_topology_proposal.unique_new_atoms:
assert self._geometry_engine.forward_final_context_reduced_potential == None, f"There are no unique new atoms but the geometry_engine's final context reduced potential is not None (i.e. {self._geometry_engine.forward_final_context_reduced_potential})"
assert self._geometry_engine.forward_atoms_with_positions_reduced_potential == None, f"There are no unique new atoms but the geometry_engine's forward atoms-with-positions-reduced-potential in not None (i.e. { self._geometry_engine.forward_atoms_with_positions_reduced_potential})"
self._vacuum_added_valence_energy = 0.0
else:
self._vacuum_added_valence_energy = self._geometry_engine.forward_final_context_reduced_potential - self._geometry_engine.forward_atoms_with_positions_reduced_potential
if not self._vacuum_topology_proposal.unique_old_atoms:
assert self._geometry_engine.reverse_final_context_reduced_potential == None, f"There are no unique old atoms but the geometry_engine's final context reduced potential is not None (i.e. {self._geometry_engine.reverse_final_context_reduced_potential})"
assert self._geometry_engine.reverse_atoms_with_positions_reduced_potential == None, f"There are no unique old atoms but the geometry_engine's atoms-with-positions-reduced-potential in not None (i.e. { self._geometry_engine.reverse_atoms_with_positions_reduced_potential})"
self._vacuum_subtracted_valence_energy = 0.0
else:
self._vacuum_subtracted_valence_energy = self._geometry_engine.reverse_final_context_reduced_potential - self._geometry_engine.reverse_atoms_with_positions_reduced_potential
self._vacuum_forward_neglected_angles = self._geometry_engine.forward_neglected_angle_terms
self._vacuum_reverse_neglected_angles = self._geometry_engine.reverse_neglected_angle_terms
self._vacuum_geometry_engine = copy.deepcopy(self._geometry_engine)
def _setup_complex_phase(self,protein_pdb_filename,receptor_mol2_filename,mol_list):
"""
Runs setup on the protein/receptor file for relative simulations
Parameters
----------
protein_pdb_filename : str, default None
Protein pdb filename. If none, receptor_mol2_filename must be provided
receptor_mol2_filename : str, default None
Receptor mol2 filename. If none, protein_pdb_filename must be provided
"""
if protein_pdb_filename:
self._protein_pdb_filename = protein_pdb_filename
protein_pdbfile = open(self._protein_pdb_filename, 'r')
pdb_file = app.PDBFile(protein_pdbfile)
protein_pdbfile.close()
self._receptor_positions_old = pdb_file.positions
self._receptor_topology_old = pdb_file.topology
self._receptor_md_topology_old = md.Topology.from_openmm(self._receptor_topology_old)
elif receptor_mol2_filename:
self._receptor_mol2_filename = receptor_mol2_filename
self._receptor_mol = createOEMolFromSDF(self._receptor_mol2_filename)
mol_list.append(self._receptor_mol)
self._receptor_positions_old = extractPositionsFromOEMol(self._receptor_mol)
self._receptor_topology_old = forcefield_generators.generateTopologyFromOEMol(self._receptor_mol)
self._receptor_md_topology_old = md.Topology.from_openmm(self._receptor_topology_old)
else:
raise ValueError("You need to provide either a protein pdb or a receptor mol2 to run a complex simulation.")
self._complex_md_topology_old = self._receptor_md_topology_old.join(self._ligand_md_topology_old)
self._complex_topology_old = self._complex_md_topology_old.to_openmm()
n_atoms_complex_old = self._complex_topology_old.getNumAtoms()
n_atoms_protein_old = self._receptor_topology_old.getNumAtoms()
self._complex_positions_old = unit.Quantity(np.zeros([n_atoms_complex_old, 3]), unit=unit.nanometers)
self._complex_positions_old[:n_atoms_protein_old, :] = self._receptor_positions_old
self._complex_positions_old[n_atoms_protein_old:, :] = self._ligand_positions_old
def _generate_solvent_topologies(self, topology_proposal, old_positions):
"""
This method generates ligand-only topologies and positions from a TopologyProposal containing a solvated complex.
The output of this method is then used when building the solvent-phase simulation with the same atom map.
Parameters
----------
old_positions : array
Positions of the fully solvated protein ligand syste
Returns
-------
ligand_topology_proposal : perses.rjmc.topology_proposal.TopologyProposal
Topology proposal object of the ligand without complex
old_solvated_positions : array
positions of the system without complex
"""
old_complex = md.Topology.from_openmm(topology_proposal.old_topology)
new_complex = md.Topology.from_openmm(topology_proposal.new_topology)
atom_map = topology_proposal.old_to_new_atom_map
old_mol_start_index, old_mol_len = self._proposal_engine._find_mol_start_index(old_complex.to_openmm())
new_mol_start_index, new_mol_len = self._proposal_engine._find_mol_start_index(new_complex.to_openmm())
old_pos = unit.Quantity(np.zeros([len(old_positions), 3]), unit=unit.nanometers)
old_pos[:, :] = old_positions
old_ligand_positions = old_pos[old_mol_start_index:(old_mol_start_index + old_mol_len), :]
# subset the topologies:
old_ligand_topology = old_complex.subset(old_complex.select("resname == 'MOL' "))
new_ligand_topology = new_complex.subset(new_complex.select("resname == 'MOL' "))
# solvate the old ligand topology:
old_solvated_topology, old_solvated_positions, old_solvated_system = self._solvate_system(
old_ligand_topology.to_openmm(), old_ligand_positions)
old_solvated_md_topology = md.Topology.from_openmm(old_solvated_topology)
# now remove the old ligand, leaving only the solvent
solvent_only_topology = old_solvated_md_topology.subset(old_solvated_md_topology.select("not resname MOL"))
# append the solvent to the new ligand-only topology:
new_solvated_ligand_md_topology = new_ligand_topology.join(solvent_only_topology)
nsl, b = new_solvated_ligand_md_topology.to_dataframe()
# dirty hack because new_solvated_ligand_md_topology.to_openmm() was throwing bond topology error
new_solvated_ligand_md_topology = md.Topology.from_dataframe(nsl, b)
new_solvated_ligand_omm_topology = new_solvated_ligand_md_topology.to_openmm()
new_solvated_ligand_omm_topology.setPeriodicBoxVectors(old_solvated_topology.getPeriodicBoxVectors())
# create the new ligand system:
new_solvated_system = self._system_generator.create_system(new_solvated_ligand_omm_topology)
new_to_old_atom_map = {atom_map[x] - new_mol_start_index: x - old_mol_start_index for x in
old_complex.select("resname == 'MOL' ") if x in atom_map.keys()}
# adjust the atom map to account for the presence of solvent degrees of freedom:
# By design, all atoms after the ligands are water, and should be mapped.
n_water_atoms = solvent_only_topology.to_openmm().getNumAtoms()
for i in range(n_water_atoms):
new_to_old_atom_map[new_mol_len + i] = old_mol_len + i
# make a TopologyProposal
ligand_topology_proposal = TopologyProposal(new_topology=new_solvated_ligand_omm_topology,
new_system=new_solvated_system,
old_topology=old_solvated_topology, old_system=old_solvated_system,
new_to_old_atom_map=new_to_old_atom_map, old_chemical_state_key='A',
new_chemical_state_key='B')
return ligand_topology_proposal, old_solvated_positions
def _generate_vacuum_topologies(self, topology_proposal, old_positions):
"""
This method generates ligand-only topologies and positions from a TopologyProposal containing a solvated complex.
The output of this method is then used when building the solvent-phase simulation with the same atom map.
Parameters
----------
old_positions : array
Positions of the fully solvated protein ligand syste
Returns
-------
ligand_topology_proposal : perses.rjmc.topology_proposal.TopologyProposal
Topology proposal object of the ligand without complex
old_solvated_positions : array
positions of the system without complex
"""
old_complex = md.Topology.from_openmm(topology_proposal.old_topology)
new_complex = md.Topology.from_openmm(topology_proposal.new_topology)
atom_map = topology_proposal.old_to_new_atom_map
old_mol_start_index, old_mol_len = self._proposal_engine._find_mol_start_index(old_complex.to_openmm())
new_mol_start_index, new_mol_len = self._proposal_engine._find_mol_start_index(new_complex.to_openmm())
old_pos = unit.Quantity(np.zeros([len(old_positions), 3]), unit=unit.nanometers)
old_pos[:, :] = old_positions
old_ligand_positions = old_pos[old_mol_start_index:(old_mol_start_index + old_mol_len), :]
# subset the topologies:
old_ligand_topology = old_complex.subset(old_complex.select("resname == 'MOL' "))
new_ligand_topology = new_complex.subset(new_complex.select("resname == 'MOL' "))
# convert to openmm topology object
old_ligand_topology = old_ligand_topology.to_openmm()
new_ligand_topology = new_ligand_topology.to_openmm()
# create the new ligand system:
old_ligand_system = self._system_generator.create_system(old_ligand_topology)
new_ligand_system = self._system_generator.create_system(new_ligand_topology)
new_to_old_atom_map = {atom_map[x] - new_mol_start_index: x - old_mol_start_index for x in
old_complex.select("resname == 'MOL' ") if x in atom_map.keys()}
# make a TopologyProposal
ligand_topology_proposal = TopologyProposal(new_topology=new_ligand_topology,
new_system=new_ligand_system,
old_topology=old_ligand_topology, old_system=old_ligand_system,
new_to_old_atom_map=new_to_old_atom_map, old_chemical_state_key='A',
new_chemical_state_key='B')
return ligand_topology_proposal, old_ligand_positions
def _solvate_system(self, topology, positions, model='tip3p',vacuum=False):
"""
Generate a solvated topology, positions, and system for a given input topology and positions.
For generating the system, the forcefield files provided in the constructor will be used.
Parameters
----------
topology : app.Topology
Topology of the system to solvate
positions : [n, 3] ndarray of Quantity nm
the positions of the unsolvated system
forcefield : SystemGenerator.forcefield
forcefield file of solvent to add
model : str, default 'tip3p'
solvent model to use for solvation
Returns
-------
solvated_topology : app.Topology
Topology of the system with added waters
solvated_positions : [n + 3(n_waters), 3] ndarray of Quantity nm
Solvated positions
solvated_system : openmm.System
The parameterized system, containing a barostat if one was specified.
"""
# DEBUG: Write PDB file being fed into Modeller to check why MOL isn't being matched
from simtk.openmm.app import PDBFile
import os
pdb_filename = os.path.join(os.environ['PWD'], 'modeller.pdb')
with open(pdb_filename, 'w') as outfile:
PDBFile.writeFile(topology, positions, outfile)
modeller = app.Modeller(topology, positions)
# retaining protein protonation from input files
#hs = [atom for atom in modeller.topology.atoms() if atom.element.symbol in ['H'] and atom.residue.name not in ['MOL','OLD','NEW']]
#modeller.delete(hs)
#modeller.addHydrogens(forcefield=self._system_generator.forcefield)
if not vacuum:
_logger.info(f"\tpreparing to add solvent")
modeller.addSolvent(self._system_generator.forcefield, model=model, padding=self._padding, ionicStrength=0.15*unit.molar)
else:
_logger.info(f"\tSkipping solvation of vacuum perturbation")
solvated_topology = modeller.getTopology()
solvated_positions = modeller.getPositions()
# canonicalize the solvated positions: turn tuples into np.array
solvated_positions = unit.quantity.Quantity(value = np.array([list(atom_pos) for atom_pos in solvated_positions.value_in_unit_system(unit.md_unit_system)]), unit = unit.nanometers)
_logger.info(f"\tparameterizing...")
solvated_system = self._system_generator.create_system(solvated_topology)
_logger.info(f"\tSystem parameterized")
return solvated_topology, solvated_positions, solvated_system
@property
def complex_topology_proposal(self):
return self._complex_topology_proposal
@property
def complex_old_positions(self):
return self._complex_positions_old_solvated
@property
def complex_new_positions(self):
return self._complex_positions_new_solvated
@property
def solvent_topology_proposal(self):
return self._solvent_topology_proposal
@property
def solvent_old_positions(self):
return self._ligand_positions_old_solvated
@property
def solvent_new_positions(self):
return self._ligand_positions_new_solvated
@property
def vacuum_topology_proposal(self):
return self._vacuum_topology_proposal
@property
def vacuum_old_positions(self):
return self._vacuum_positions_old
@property
def vacuum_new_positions(self):
return self._vacuum_positions_new
class DaskClient(object):
"""
This class manages the dask scheduler.
Parameters
----------
LSF: bool, default False
whether we are using the LSF dask Client
num_processes: int, default 2
number of processes to run. If not LSF, this argument does nothing
adapt: bool, default False
whether to use an adaptive scheduler. If not LSF, this argument does nothing
"""
def __init__(self):
_logger.info(f"Initializing DaskClient")
def activate_client(self,
LSF = True,
num_processes = 2,
adapt = False):
if LSF:
from dask_jobqueue import LSFCluster
cluster = LSFCluster()
self._adapt = adapt
self.num_processes = num_processes
if self._adapt:
_logger.debug(f"adapting cluster from 1 to {self.num_processes} processes")
cluster.adapt(minimum = 2, maximum = self.num_processes, interval = "1s")
else:
_logger.debug(f"scaling cluster to {self.num_processes} processes")
cluster.scale(self.num_processes)
_logger.debug(f"scheduling cluster with client")
self.client = distributed.Client(cluster)
else:
self.client = None
self._adapt = False
self.num_processes = 0
def deactivate_client(self):
"""
NonequilibriumSwitchingFEP is not pickleable with the self.client or self.cluster activated.
This must be called before pickling
"""
if self.client is not None:
self.client.close()
self.client = None
def scatter(self, df):
"""
wrapper to scatter the local data df
"""
if self.client is None:
#don't actually scatter
return df
else:
return self.client.scatter(df)
def deploy(self, func, arguments):
"""
wrapper to map a function and its arguments to the client for scheduling
Arguments
---------
func : function to map
arguments: tuple of the arguments that the function will take
argument : tuple of argument lists
Returns
---------
futures
"""
if self.client is None:
if len(arguments) == 1:
futures = [func(plug) for plug in arguments[0]]
else:
futures = [func(*plug) for plug in zip(*arguments)]
else:
futures = self.client.map(func, *arguments)
return futures
def gather_results(self, futures):
"""
wrapper to gather a function given its arguments
Arguments
---------
futures : future pointers
Returns
---------
results
"""
if self.client is None:
return futures
else:
results = self.client.gather(futures)
return results
def wait(self, futures):
"""
wrapper to wait until futures are complete.
"""
if self.client is None:
pass
else:
distributed.wait(futures)
class NonequilibriumSwitchingFEP(DaskClient):
"""
This class manages Nonequilibrium switching based relative free energy calculations, carried out on a distributed computing framework.
"""
def __init__(self,
hybrid_factory,
protocol = 'default',
n_equilibrium_steps_per_iteration = 100,
temperature=300.0 * unit.kelvin,
trajectory_directory=None,
trajectory_prefix=None,
atom_selection="not water",
eq_splitting_string="V R O R V",
neq_splitting_string = "V R O R V",
measure_shadow_work=False,
timestep=1.0*unit.femtoseconds,
ncmc_save_interval = None,
write_ncmc_configuration = False,
relative_transform = True):
"""
Create an instance of the NonequilibriumSwitchingFEP driver class.
NOTE : defining self.client and self.cluster renders this class non-pickleable; call self.deactivate_client() to close the cluster/client
objects to render this pickleable.
Parameters
----------
hybrid_factory : perses.annihilation.relative.HybridTopologyFactory
hybrid topology factory
protocol : dict of str: str, default protocol as defined by top of file
How each force's scaling parameter relates to the main lambda that is switched by the integrator from 0 to 1
In this case, the trailblaze threshold must be set explicitly
n_equilibrium_steps_per_iteration : int, default 100
Number of steps to run in an iteration of equilibration to generate an iid sample
temperature : float unit.Quantity
Temperature at which to perform the simulation, default 300K
trajectory_directory : str, default None
Where to write out trajectories resulting from the calculation. If none, no writing is done.
trajectory_prefix : str, default None
What prefix to use for this calculation's trajectory files. If none, no writing is done.
atom_selection : str, default not water
MDTraj selection syntax for which atomic coordinates to save in the trajectories. Default strips
all water.
eq_splitting_string : str, default 'V R O R V'
The integrator splitting to use for equilibrium simulation
neq_splitting_string : str, default 'V R O R V'
The integrator splitting to use for nonequilibrium switching simulation
ncmc_save_interval : int, default None
interval with which to write ncmc trajectory. If None, trajectory will not be saved.
We will assert that the n_lambdas % ncmc_save_interval = 0; otherwise, the protocol will not be complete
write_ncmc_configuration : bool, default False
whether to write ncmc annealing perturbations; if True, will write every ncmc_save_interval iterations
relative_transform : bool, default True
whether a relative or absolute alchemical transformation will be conducted. This extends the utility of the NonequilibriumSwitchingFEP to absolute transforms.
"""
#Specific to LSF clusters
# NOTE: assume that the
_logger.debug(f"instantiating NonequilibriumSwitchingFEP...")
# construct the hybrid topology factory object
_logger.info(f"writing HybridTopologyFactory")
self._factory = hybrid_factory
topology_proposal = self._factory._topology_proposal
# use default functions if none specified
self._protocol = protocol
self._write_ncmc_configuration = write_ncmc_configuration
# setup splitting string:
self._eq_splitting_string = eq_splitting_string
self._neq_splitting_string = neq_splitting_string
self._measure_shadow_work = measure_shadow_work
# set up some class attributes
self._hybrid_system = self._factory.hybrid_system
self._initial_hybrid_positions = self._factory.hybrid_positions
self._n_equil_steps = n_equilibrium_steps_per_iteration
self._trajectory_prefix = trajectory_prefix
self._trajectory_directory = trajectory_directory
self._atom_selection = atom_selection
self._current_iteration = 0
self._endpoint_growth_thermostates = dict()
self._timestep = timestep
_logger.info(f"instantiating trajectory filenames")
if self._trajectory_directory and self._trajectory_prefix:
self._write_traj = True
self._trajectory_filename = {lambda_state: os.path.join(os.getcwd(), self._trajectory_directory, f"{trajectory_prefix}.eq.lambda_{lambda_state}.h5") for lambda_state in [0,1]}
_logger.debug(f"eq_traj_filenames: {self._trajectory_filename}")
self._neq_traj_filename = {direct: os.path.join(os.getcwd(), self._trajectory_directory, f"{trajectory_prefix}.neq.lambda_{direct}") for direct in ['forward', 'reverse']}
_logger.debug(f"neq_traj_filenames: {self._neq_traj_filename}")
else:
self._write_traj = False
self._trajectory_filename = {0: None, 1: None}
self._neq_traj_filename = {'forward': None, 'reverse': None}
# instantiating equilibrium file/rp collection dicts
self._eq_dict = {0: [], 1: [], '0_decorrelated': None, '1_decorrelated': None, '0_reduced_potentials': [], '1_reduced_potentials': []}
self._eq_files_dict = {0: [], 1: []}
self._eq_timers = {0: [], 1: []}
# instantiating neq_switching collection dicts:
self._nonalchemical_reduced_potentials = {'from_0': [], 'from_1': [], 'to_0': [], 'to_1': []}
self._added_valence_reduced_potentials = {'from_0': [], 'from_1': [], 'to_0': [], 'to_1': []}
self._alchemical_reduced_potentials = {'from_0': [], 'from_1': [], 'to_0': [], 'to_1': []}
self._alchemical_reduced_potential_differences = {'forward': [], 'reverse': []}
self._nonalchemical_reduced_potential_differences = {0: [], 1: []} # W_f is work from 0(1)_alch to 0(1)_nonalch
self._EXP = {'forward': [], 'reverse': [], 0: [], 1: []}
self._BAR = []
#instantiate nonequilibrium work dicts: the keys indicate from which equilibrium thermodynamic state the neq_switching is conducted FROM (as opposed to TO)
self._nonequilibrium_cumulative_work = {'forward': None, 'reverse': None}
self._nonequilibrium_shadow_work = copy.deepcopy(self._nonequilibrium_cumulative_work)
self._nonequilibrium_timers = copy.deepcopy(self._nonequilibrium_cumulative_work)
self.online_timer = []
self._failures = copy.deepcopy(self._nonequilibrium_cumulative_work)
self.survival = copy.deepcopy(self._nonequilibrium_cumulative_work)
self.dg_EXP = copy.deepcopy(self._nonequilibrium_cumulative_work)
self.dg_BAR = None
# create the thermodynamic state
self.relative_transform = relative_transform
if self.relative_transform:
_logger.info(f"Instantiating thermodynamic states 0 and 1.")
lambda_zero_alchemical_state = RelativeAlchemicalState.from_system(self._hybrid_system)
lambda_one_alchemical_state = copy.deepcopy(lambda_zero_alchemical_state)
lambda_zero_alchemical_state.set_alchemical_parameters(0.0)
lambda_one_alchemical_state.set_alchemical_parameters(1.0)
# ensure their states are set appropriately
self._hybrid_alchemical_states = {0: lambda_zero_alchemical_state, 1: lambda_one_alchemical_state}
# create the base thermodynamic state with the hybrid system
self._thermodynamic_state = ThermodynamicState(self._hybrid_system, temperature=temperature)
# Now create the compound states with different alchemical states
self._hybrid_thermodynamic_states = {0: CompoundThermodynamicState(self._thermodynamic_state,
composable_states=[
self._hybrid_alchemical_states[0]]),
1: CompoundThermodynamicState(copy.deepcopy(self._thermodynamic_state),
composable_states=[
self._hybrid_alchemical_states[1]])}
self._temperature = temperature
# set the SamplerState for the lambda 0 and 1 equilibrium simulations
_logger.info(f"Instantiating SamplerStates")
self._lambda_one_sampler_state = SamplerState(self._initial_hybrid_positions,
box_vectors=self._hybrid_system.getDefaultPeriodicBoxVectors())
self._lambda_zero_sampler_state = copy.deepcopy(self._lambda_one_sampler_state)
self._sampler_states = {0: copy.deepcopy(self._lambda_zero_sampler_state), 1: copy.deepcopy(self._lambda_one_sampler_state)}
# initialize by minimizing
_logger.info(f"Instantiating equilibrium results by minimization")
[feptasks.minimize(self._hybrid_thermodynamic_states[key], self._sampler_states[key]) for key in [0,1]]
self._minimized_sampler_states = {i: copy.deepcopy(self._sampler_states[i]) for i in [0,1]}
# subset the topology appropriately:
if atom_selection is not None:
atom_selection_indices = self._factory.hybrid_topology.select(atom_selection)
self._atom_selection_indices = atom_selection_indices
else:
self._atom_selection_indices = None
# create an empty dict of starting and ending sampler_states
self.start_sampler_states = {_direction: [] for _direction in ['forward', 'reverse']}
self.end_sampler_states = {_direction: [] for _direction in ['forward', 'reverse']}
#create observable list
self.iterations = {_direction: [0] for _direction in ['forward', 'reverse']}
self.observable = {_direction: [1.0] for _direction in ['forward', 'reverse']}
self.allowable_resampling_methods = {'multinomial': NonequilibriumSwitchingFEP.multinomial_resample}
self.allowable_observables = {'ESS': NonequilibriumSwitchingFEP.ESS, 'CESS': NonequilibriumSwitchingFEP.CESS}
_logger.info(f"constructed")
def compute_nonalchemical_perturbations(self, start, end, sampler_state, geometry_engine):
"""
In order to correct for the artifacts associated with CustomNonbondedForces, we need to compute the potential difference between the alchemical endstates and the
valence energy-corrected nonalchemical endstates.
Parameters
----------
start : int (0 or 1)
the starting lambda value which the equilibrium sampler state corresponds to
end : int (0 or 1)
the alternate lambda value which the equilibrium sampler state will be annealed to
sampler_state : openmmtools.states.SamplerState
the equilibrium sampler state of the current lambda (start)
geometry_engine : perses.rjmc.geometry.FFAllAngleGeometryEngine
geometry engine used to create and compute the RJMCMC; we use this to compute the importance weight from the old/new system to the hybrid system (neglecting added valence terms)
"""
if not hasattr(NonequilibriumSwitchingFEP, 'geometry_engine'):
self.geometry_engine = geometry_engine
# Create thermodynamic states for the nonalchemical endpoints
topology_proposal = self._factory.topology_proposal
self._nonalchemical_thermodynamic_states = {
0: ThermodynamicState(topology_proposal.old_system, temperature=self._temperature),
1: ThermodynamicState(topology_proposal.new_system, temperature=self._temperature)}
#instantiate growth system thermodynamic and samplerstates
assert not self.geometry_engine.use_sterics, f"The geometry engine has use_sterics...this is not currently supported."
if self.geometry_engine.forward_final_growth_system: # if it exists, we have to create a thermodynamic state and set alchemical parameters
self._endpoint_growth_thermostates[1] = ThermodynamicState(self.geometry_engine.forward_final_growth_system, temperature=temperature) #sampler state is compatible with new system
else:
self._endpoint_growth_thermostates[1] = None
if self.geometry_engine.reverse_final_growth_system: # if it exists, we have to create a thermodynamic state and set alchemical parameters
self._endpoint_growth_thermostates[0] = ThermodynamicState(self.geometry_engine.reverse_final_growth_system, temperature=temperature) #sampler state is compatible with old system
else:
self._endpoint_growth_thermostates[0] = None
#define the nonalchemical_perturbation_args for endstate perturbations before running nonequilibrium switching
_lambda, _lambda_rev = start, end
nonalchemical_perturbation_args = {'hybrid_thermodynamic_states': [self._hybrid_thermodynamic_states[_lambda], self._hybrid_thermodynamic_states[_lambda_rev]],
'_endpoint_growth_thermostates': [self._endpoint_growth_thermostates[_lambda_rev], self._endpoint_growth_thermostates[_lambda]],
'factory': self._factory,
'nonalchemical_thermostates': [self._nonalchemical_thermodynamic_states[_lambda], self._nonalchemical_thermodynamic_states[_lambda_rev]],
'lambdas': [_lambda, _lambda_rev]}
_logger.debug(f"\tnonalchemical_perturbation_args for lambda_start = {_lambda}, lambda_end = {_lambda_rev}: {nonalchemical_perturbation_args}")
#then we will conduct a perturbation on the given sampler state with the appropriate arguments
valence_energy, nonalchemical_reduced_potential, hybrid_reduced_potential = feptasks.compute_nonalchemical_perturbation(nonalchemical_perturbation_args['hybrid_thermodynamic_states'][0],
nonalchemical_perturbation_args['_endpoint_growth_thermostates'][0],
sampler_state,
nonalchemical_perturbation_args['factory'],
nonalchemical_perturbation_args['nonalchemical_thermostates'][0],
nonalchemical_perturbation_args['lambdas'][0])
alt_valence_energy, alt_nonalchemical_reduced_potential, alt_hybrid_reduced_potential = feptasks.compute_nonalchemical_perturbation(nonalchemical_perturbation_args['hybrid_thermodynamic_states'][1],
nonalchemical_perturbation_args['_endpoint_growth_thermostates'][1],
sampler_state,
nonalchemical_perturbation_args['factory'],
nonalchemical_perturbation_args['nonalchemical_thermostates'][1],
nonalchemical_perturbation_args['lambdas'][1])
nonalch_perturbations = {'valence_energies': (valence_energy, alt_valence_energy),
'nonalchemical_reduced_potentials': (nonalchemical_reduced_potential, alt_nonalchemical_reduced_potential),
'hybrid_reduced_potentials': (hybrid_reduced_potential, alt_hybrid_reduced_potential)}
#now to write the results to the logger attributes
nonalchemical_reduced_potentials = nonalchemical_perturbation_dict['nonalchemical_reduced_potentials']
self._nonalchemical_reduced_potentials[f"from_{_lambda}"].append(nonalchemical_reduced_potentials[0])
self._nonalchemical_reduced_potentials[f"to_{_lambda_rev}"].append(nonalchemical_reduced_potentials[1])
valence_energies = nonalchemical_perturbation_dict['valence_energies']
self._added_valence_reduced_potentials[f"from_{_lambda}"].append(valence_energies[0])
self._added_valence_reduced_potentials[f"to_{_lambda_rev}"].append(valence_energies[1])
hybrid_reduced_potentials = nonalchemical_perturbation_dict['hybrid_reduced_potentials']
self._alchemical_reduced_potentials[f"from_{_lambda}"].append(hybrid_reduced_potentials[0])
self._alchemical_reduced_potentials[f"to_{_lambda_rev}"].append(hybrid_reduced_potentials[1])
def instantiate_particles(self,
n_lambdas = None,
n_particles = 5,
direction = None,
ncmc_save_interval = None,