-
Notifications
You must be signed in to change notification settings - Fork 36
/
_detect.py
388 lines (347 loc) · 13.7 KB
/
_detect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
# cython: language_level=3
# cython: linetrace=True
"""Detect molecules.
There are two types of input files that could be imported by ReacNetGen,
the first part of necessary is the trajectory from reactive MD, the second
part can be the bond information normally given by simulation using ReaxFF.
In fact, atomic coordinates can be converted to the bond information with
the Open Babel software. As a results, ReacNetGen can both processes ReaxFF
trajectories, AIMD trajectories, and other kinds of reactive trajectories.
With the bond information, molecules can be detected from atoms by Depth-first
search at every timestep. By using this way, all molecules in a given
trajectory can be acquired. Molecules consisting of same atoms and bonds can
be considered as the same molecule.
Reference:
[1] O’Boyle, N. M.; Banck, M.; James, C. A.; Morley, C.; Vandermeersch, T.;
Hutchison, G. Open Babel: An open chemical toolbox. J. Cheminf. 2011, 3(1),
33-47.
[2] Tarjan, R. Depth-first search and linear graph algorithms. SIAM J. Comput.
1972, 1 (2), 146-160.
"""
import tempfile
import fileinput
import operator
from abc import ABCMeta, abstractmethod
from collections import defaultdict
from enum import Enum, auto
import numpy as np
try:
from openbabel import openbabel
except ImportError: # pragma: no cover
import openbabel
from scipy.spatial import cKDTree
from ase import Atom, Atoms
from .dps import dps
from .utils import WriteBuffer, listtobytes, run_mp, SharedRNGData
class _Detect(SharedRNGData, metaclass=ABCMeta):
"""Detect molecules.
Parameters
----------
rng: reacnetgenerator.ReacNetGenerator
The ReacNetGenerator class.
"""
subclasses = {}
def __init__(self, rng):
SharedRNGData.__init__(self, rng, ['inputfilename', 'atomname', 'stepinterval', 'nproc', 'pbc',
'cell'],
['N', 'atomtype', 'step', 'timestep', 'temp1it', 'moleculetempfilename'])
@classmethod
def gettype(cls, rng):
"""Get the class for the input file type.
Now ReacNetGen support the following files:
- lammpsbondfile: LAMMPS bond files, see http://lammps.sandia.gov/doc/fix_reax_bonds.html
- lammpsdumpfile: LAMMPS dump files, see https://lammps.sandia.gov/doc/dump.html
- xyz: xyz files
Parameters
----------
rng: reacnetgenerator.ReacNetGenerator
The ReacNetGenerator class.
Returns
-------
cls.subclasses[rng.inputfiletype](rng): class
The _Detect class for specific file type.
"""
if rng.inputfiletype not in cls.subclasses:
raise ValueError(
f"Unsupported input file type {rng.inputfiletype}")
return cls.subclasses[rng.inputfiletype](rng)
@classmethod
def register_subclass(cls, message_type):
"""Register the file type, used as a decorator.
Parameters
----------
message_type: str
The type name to register, such as `xyz`.
Returns
-------
decorator: function
The decorator that used for a subclass.
Examples
--------
@_Detect.register_subclass("lammpsbondfile")
class _DetectLAMMPSbond(_Detect):
"""
def decorator(subclass):
cls.subclasses[message_type] = subclass
return subclass
return decorator
def detect(self):
"""Detect molecules."""
self._readinputfile()
self.returnkeys()
def _readinputfile(self):
"""Read the input file."""
d = defaultdict(list)
timestep = {}
with fileinput.input(files=self.inputfilename) as f:
_steplinenum = self._readNfunc(f)
with fileinput.input(files=self.inputfilename) as f:
results = run_mp(self.nproc, func=self._readstepfunc, l=f, nlines=_steplinenum, return_num=True, interval=self.stepinterval,
desc="Read bond information and Detect molecules", unit="timestep")
for molecules, (step, thetimestep) in results:
for molecule in molecules:
d[molecule].append(step)
timestep[step] = thetimestep
self.temp1it = len(d)
values_c = list(run_mp(self.nproc, func=self._compressvalue, l=d.values(
), unordered=False, desc="Save molecules", unit="molecule", total=self.temp1it))
self._writemoleculetempfile((d.keys(), values_c))
self.timestep = timestep
self.step = len(timestep)
def _compressvalue(self, x):
return listtobytes(np.array(x))
@abstractmethod
def _readNfunc(self, f):
pass
@abstractmethod
def _readstepfunc(self, item):
pass
def _connectmolecule(self, bond, level):
return list([b''.join((listtobytes(mol),
listtobytes(bondlist))) for mol, bondlist in zip(*dps(bond, level))])
def _writemoleculetempfile(self, d):
with WriteBuffer(tempfile.NamedTemporaryFile('wb', delete=False)) as f:
self.moleculetempfilename = f.name
for mol in zip(*d):
f.extend(mol)
@_Detect.register_subclass("bond")
@_Detect.register_subclass("lammpsbondfile")
class _DetectLAMMPSbond(_Detect):
def _readNfunc(self, f):
iscompleted = False
for index, line in enumerate(f):
if line[0] == '#':
if line.startswith("# Number of particles"):
if iscompleted:
steplinenum = index-stepaindex
break
else:
iscompleted = True
stepaindex = index
N = [int(s) for s in line.split() if s.isdigit()][0]
atomtype = np.zeros(N, dtype=int)
else:
s = line.split()
atomtype[int(s[0])-1] = int(s[1])-1
else:
steplinenum = index + 1
self.N = N
self.atomtype = atomtype
return steplinenum
def _readstepfunc(self, item):
step, lines = item
bond = [None]*self.N
level = [None]*self.N
for line in lines:
if line:
if line[0] == "#":
if line.startswith("# Timestep"):
timestep = int(line.split()[-1])
else:
s = line.split()
s0 = int(s[0])-1
s2 = int(s[2])
bond[s0] = map(self._get_idx, s[3:3+s2])
level[s0] = map(self._get_bo, s[4+s2:4+2*s2])
molecules = self._connectmolecule(bond, level)
return molecules, (step, timestep)
@staticmethod
def _get_idx(x):
return int(x) - 1
@staticmethod
def _get_bo(x):
return max(1, round(float(x)))
class _DetectCrd(_Detect):
def _getbondfromcrd(self, step_atoms, cell):
atomnumber = len(step_atoms)
if self.pbc:
# Apply period boundry conditions
step_atoms.set_pbc(True)
step_atoms.set_cell(cell)
# add ghost atoms
repeated_atoms = step_atoms.repeat(2)[atomnumber:]
tree = cKDTree(step_atoms.get_positions())
d = tree.query(repeated_atoms.get_positions(), k=1)[0]
nearest = d < 5
ghost_atoms = repeated_atoms[nearest]
realnumber = np.where(nearest)[0] % atomnumber
step_atoms += ghost_atoms
# Use openbabel to connect atoms
mol = openbabel.OBMol()
mol.BeginModify()
for idx, (num, position) in enumerate(zip(step_atoms.get_atomic_numbers(), step_atoms.positions)):
a = mol.NewAtom(idx)
a.SetAtomicNum(int(num))
a.SetVector(*position)
mol.ConnectTheDots()
mol.PerceiveBondOrders()
mol.EndModify()
bond = [[] for i in range(atomnumber)]
bondlevel = [[] for i in range(atomnumber)]
for b in openbabel.OBMolBondIter(mol):
s1 = b.GetBeginAtom().GetId()
s2 = b.GetEndAtom().GetId()
if s1 >= atomnumber and s2 >= atomnumber:
# duplicated
continue
elif s1 >= atomnumber:
s1 = realnumber[s1-atomnumber]
elif s2 >= atomnumber:
s2 = realnumber[s2-atomnumber]
level = b.GetBondOrder()
if level == 5:
# aromatic, 5 in openbabel but 12 in rdkit
level = 12
bond[s1].append(s2)
bond[s2].append(s1)
bondlevel[s1].append(level)
bondlevel[s2].append(level)
return bond, bondlevel
@_Detect.register_subclass("dump")
@_Detect.register_subclass("lammpsdumpfile")
class _DetectLAMMPSdump(_DetectCrd):
class LineType(Enum):
"""Line type in the LAMMPS dump files."""
TIMESTEP = auto()
ATOMS = auto()
NUMBER = auto()
BOX = auto()
OTHER = auto()
@classmethod
def linecontent(cls, line):
"""Return line content."""
if line.startswith("ITEM: TIMESTEP"):
return cls.TIMESTEP
if line.startswith("ITEM: ATOMS"):
return cls.ATOMS
if line.startswith("ITEM: NUMBER OF ATOMS"):
return cls.NUMBER
if line.startswith("ITEM: BOX"):
return cls.BOX
return cls.OTHER
def _readNfunc(self, f):
iscompleted = False
for index, line in enumerate(f):
if line.startswith("ITEM:"):
linecontent = self.LineType.linecontent(line)
if linecontent == self.LineType.ATOMS:
keys = line.split()
self.id_idx = keys.index('id') - 2
self.tidx = keys.index('type') - 2
self.xidx = keys.index('x') - 2
self.yidx = keys.index('y') - 2
self.zidx = keys.index('z') - 2
else:
if linecontent == self.LineType.NUMBER:
if iscompleted:
steplinenum = index-stepaindex
break
else:
iscompleted = True
stepaindex = index
N = int(line.split()[0])
atomtype = np.zeros(N, dtype=int)
elif linecontent == self.LineType.ATOMS:
s = line.split()
atomtype[int(s[0])-1] = int(s[1])-1
else:
steplinenum = index + 1
self.N = N
self.atomtype = atomtype
return steplinenum
def _readstepfunc(self, item):
step, lines = item
step_atoms = []
ss = []
for line in lines:
if line:
if line.startswith("ITEM:"):
linecontent = self.LineType.linecontent(line)
else:
if linecontent == self.LineType.ATOMS:
s = line.split()
step_atoms.append(
(int(s[self.id_idx]),
Atom(
self.atomname[int(s[self.tidx]) - 1],
(float(s[self.xidx]), float(s[self.yidx]), float(s[self.zidx])))))
elif linecontent == self.LineType.TIMESTEP:
timestep = step, int(line.split()[0])
elif linecontent == self.LineType.BOX:
s = line.split()
ss.append(list(map(float, s)))
ss = np.array(ss)
if ss.shape[1]>2:
xy = ss[0][2]
xz = ss[1][2]
yz = ss[2][2]
else:
xy, xz, yz = 0., 0., 0.
xlo = ss[0][0] - min(0., xy, xz, xy+xz)
xhi = ss[0][1] - max(0., xy, xz, xy+xz)
ylo = ss[1][0] - min(0., yz)
yhi = ss[1][1] - max(0., yz)
zlo = ss[2][0]
zhi = ss[2][1]
boxsize = np.array([[xhi-xlo, 0., 0.],
[xy, yhi-ylo, 0.],
[xz, yz, zhi-zlo]])
_, step_atoms = zip(*sorted(step_atoms, key=operator.itemgetter(0)))
step_atoms = Atoms(step_atoms)
bond, level = self._getbondfromcrd(step_atoms, boxsize)
molecules = self._connectmolecule(bond, level)
return molecules, timestep
@_Detect.register_subclass("xyz")
class _Detectxyz(_DetectCrd):
"""xyz file. Two frames are connected. Cell information must be inputed by user."""
def _readNfunc(self, f):
atomname_dict = dict(zip(self.atomname.tolist(), range(self.atomname.size)))
for index, line in enumerate(f):
s = line.split()
if index == 0:
N = int(line.strip())
atomtype = np.zeros(N, dtype=int)
elif index > N+1:
break
elif index > 1:
atomtype[index-2] = atomname_dict[s[0]]
self.N = N
self.atomtype = atomtype
steplinenum = N + 2
return steplinenum
def _readstepfunc(self, item):
step, lines = item
timestep = step, step
step_atoms = []
boxsize = self.cell
if self.pbc and boxsize is None:
raise RuntimeError("No cell information is given.")
for index, line in enumerate(lines):
s = line.split()
if index > 1:
step_atoms.append((index-1, Atom(s[0] ,tuple((float(x) for x in s[1:4])))))
_, step_atoms = zip(*sorted(step_atoms, key=operator.itemgetter(0)))
step_atoms = Atoms(step_atoms)
bond, level = self._getbondfromcrd(step_atoms, boxsize)
molecules = self._connectmolecule(bond, level)
return molecules, timestep