-
Notifications
You must be signed in to change notification settings - Fork 3
/
mixture_system.py
180 lines (145 loc) · 7.81 KB
/
mixture_system.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import tempfile
import numpy as np
import os
import mdtraj as md
from simtk.openmm import app
import simtk.openmm as mm
from simtk import unit as u
from .target import Target
from .simulation_parameters import *
from .cirpy import resolve
import utils
from protein_system import System
import gaff2xml
import itertools
from pymbar import timeseries as ts
import pandas as pd
N_STEPS_MIXTURES = 500000 # 0.5 ns (at a time)
N_EQUIL_STEPS_MIXTURES = 5000000 # 5ns
OUTPUT_FREQUENCY_MIXTURES = 10000 # 10ps
OUTPUT_DATA_FREQUENCY_MIXTURES = 250 # 0.25ps
STD_ERROR_TOLERANCE = 0.0002 # g/mL
TIMESTEP = 1.0 * u.femtoseconds
class MixtureSystem(System):
def __init__(self, cas_strings, n_monomers, temperature, pressure=PRESSURE, output_frequency=OUTPUT_FREQUENCY_MIXTURES, output_data_frequency=OUTPUT_DATA_FREQUENCY_MIXTURES, n_steps=N_STEPS_MIXTURES, equil_output_frequency=OUTPUT_FREQUENCY_MIXTURES, stderr_tolerance = STD_ERROR_TOLERANCE, **kwargs):
super(MixtureSystem, self).__init__(temperature=temperature, pressure=pressure, output_frequency=output_frequency, n_steps=n_steps, equil_output_frequency=equil_output_frequency, **kwargs)
self._main_dir = os.getcwd()
self.cas_strings = cas_strings
self.n_monomers = n_monomers
identifier = list(itertools.chain(cas_strings, [str(n) for n in n_monomers], [str(temperature).split(' ')[0]]))
self._target_name = '_'.join(identifier)
self.output_data_frequency = output_data_frequency
self.stderr_tolerance = stderr_tolerance
self.ran_equilibrate = False
def build(self):
utils.make_path('monomers/')
utils.make_path('boxes/')
utils.make_path('ffxml/')
self.monomer_pdb_filenames = ["monomers/"+string+".pdb" for string in self.cas_strings]
self.box_pdb_filename = "boxes/" + self.identifier + ".pdb"
self.ffxml_filename = "ffxml/" + '_'.join(self.cas_strings) + ".xml"
utils.make_path(self.box_pdb_filename)
rungaff = False
if not os.path.exists(self.ffxml_filename):
rungaff = True
if not os.path.exists(self.box_pdb_filename):
for filename in self.monomer_pdb_filenames:
if not os.path.exists(filename):
rungaff = True
if rungaff:
self.smiles_strings = []
for mlc in self.cas_strings:
self.smiles_strings.append(resolve(mlc, 'smiles'))
oemlcs = []
with gaff2xml.utils.enter_temp_directory(): # Avoid dumping 50 antechamber files in local directory.
for smiles_string in self.smiles_strings:
m = gaff2xml.openeye.smiles_to_oemol(smiles_string)
m = gaff2xml.openeye.get_charges(m, strictStereo=False, keep_confs=1)
oemlcs.append(m)
ligand_trajectories, ffxml = gaff2xml.openeye.oemols_to_ffxml(oemlcs)
if not os.path.exists(self.ffxml_filename):
outfile = open(self.ffxml_filename, 'w')
outfile.write(ffxml.read())
outfile.close()
ffxml.seek(0)
for k, ligand_traj in enumerate(ligand_trajectories):
pdb_filename = self.monomer_pdb_filenames[k]
if not os.path.exists(pdb_filename):
ligand_traj.save(pdb_filename)
self.ffxml = app.ForceField(self.ffxml_filename)
if not os.path.exists(self.box_pdb_filename):
self.packed_trj = gaff2xml.packmol.pack_box(self.monomer_pdb_filenames, self.n_monomers)
self.packed_trj.save(self.box_pdb_filename)
else:
self.packed_trj = md.load(self.box_pdb_filename)
def equilibrate(self):
self.ran_equilibrate = True
utils.make_path('equil/')
self.equil_dcd_filename = "equil/"+self.identifier +"_equil.dcd"
self.equil_pdb_filename = "equil/"+self.identifier +"_equil.pdb"
utils.make_path(self.equil_pdb_filename)
if os.path.exists(self.equil_pdb_filename):
return
positions = self.packed_trj.openmm_positions(0)
topology = self.packed_trj.top.to_openmm()
topology.setUnitCellDimensions(mm.Vec3(*self.packed_trj.unitcell_lengths[0]) * u.nanometer)
ff = self.ffxml
system = ff.createSystem(topology, nonbondedMethod=app.PME, nonbondedCutoff=self.cutoff, constraints=app.HBonds)
integrator = mm.LangevinIntegrator(self.temperature, self.equil_friction, self.equil_timestep)
system.addForce(mm.MonteCarloBarostat(self.pressure, self.temperature, self.barostat_frequency))
simulation = app.Simulation(topology, system, integrator)
simulation.context.setPositions(positions)
print('Minimizing.')
simulation.minimizeEnergy()
simulation.context.setVelocitiesToTemperature(self.temperature)
print('Equilibrating.')
simulation.reporters.append(app.DCDReporter(self.equil_dcd_filename, self.equil_output_frequency))
simulation.step(self.n_equil_steps)
# Re-write a better PDB with correct box sizes.
traj = md.load(self.equil_dcd_filename, top=self.box_pdb_filename)[-1]
traj.save(self.equil_pdb_filename)
def production(self):
utils.make_path('production/')
self.production_dcd_filename = "production/"+self.identifier +"_production.dcd"
self.production_pdb_filename = "production/"+self.identifier +"_production.pdb"
self.production_data_filename = "production/"+self.identifier +"_production.csv"
utils.make_path(self.production_dcd_filename)
if os.path.exists(self.production_pdb_filename):
return
if self.ran_equilibrate:
pdb = app.PDBFile(self.equil_pdb_filename)
topology = pdb.topology
positions = pdb.positions
else:
positions = self.packed_trj.openmm_positions(0)
topology = self.packed_trj.top.to_openmm()
topology.setUnitCellDimensions(mm.Vec3(*self.packed_trj.unitcell_lengths[0]) * u.nanometer)
ff = self.ffxml
system = ff.createSystem(topology, nonbondedMethod=app.PME, nonbondedCutoff=self.cutoff, constraints=app.HBonds)
integrator = mm.LangevinIntegrator(self.temperature, self.friction, self.timestep)
system.addForce(mm.MonteCarloBarostat(self.pressure, self.temperature, self.barostat_frequency))
simulation = app.Simulation(topology, system, integrator)
simulation.context.setPositions(positions)
if not self.ran_equilibrate:
print('Minimizing.')
simulation.minimizeEnergy()
simulation.context.setVelocitiesToTemperature(self.temperature)
print('Production.')
simulation.reporters.append(app.DCDReporter(self.production_dcd_filename, self.output_frequency))
simulation.reporters.append(app.StateDataReporter(self.production_data_filename, self.output_data_frequency, step=True, potentialEnergy=True, temperature=True, density=True))
converged = False
while not converged:
simulation.step(self.n_steps)
d = pd.read_csv(self.production_data_filename, names=["step", "U", "Temperature", "Density"], skiprows=1)
density_ts = np.array(d.Density)
[t0, g, Neff] = ts.detectEquilibration(density_ts, nskip=1000)
density_ts = density_ts[t0:]
density_mean_stderr = density_ts.std() / np.sqrt(Neff)
if density_mean_stderr < self.stderr_tolerance:
converged = True
del(simulation)
if self.ran_equilibrate:
traj = md.load(self.production_dcd_filename, top=self.equil_pdb_filename)[-1]
else:
traj = md.load(self.production_dcd_filename, top=self.box_pdb_filename)[-1]
traj.save(self.production_pdb_filename)