Source: https://github.com/BlackPianoCat/EasyOpenMM/tree/main

# Imports

In [53]:
import copy
import numpy as np
import time
from utils import *
import openmm as mm
import openmm.unit as u
from tqdm import tqdm
from sys import stdout
from mdtraj.reporters import HDF5Reporter
from openmm.app import PDBFile, PDBxFile, ForceField, Simulation, PDBReporter, PDBxReporter, DCDReporter, StateDataReporter, CharmmPsfFile,  DCDFile
import random
import pyvista as pv
import mdtraj as md

# Initial structure generation functions

In [54]:
def line(n):
    points = []
    for i in range(n):
        points.append([i, 0, 0])
    return np.array(points)

def random_walk(n):
    points=[[0,0,0] for _ in range(n) ]
    for i in range(1,n):
        val_x = random.randint(-1,1)
        val_y = random.randint(-1,1)
        val_z = random.randint(-1,1)
        base = points[i-1]
        points[i] = [base[0]+val_x,base[1]+val_y,base[2]+val_z]
    return points

def random_walk2(n):
    steps_x = np.random.normal(loc=0, scale=1, size=n)
    steps_y = np.random.normal(loc=0, scale=1, size=n)
    steps_z = np.random.normal(loc=0, scale=1, size=n)
    positions_x = np.cumsum(steps_x)
    positions_y = np.cumsum(steps_y)
    positions_z = np.cumsum(steps_z)
    return np.column_stack((positions_x,positions_y,positions_z))


def hilbert_curve2d(n):
    points=[[0,0,0]] # starting point

    instructions_dict = {"A": ["D","A","A","B"],
                         "B": ["C","B","B","A"],
                         "C": ["B","C","C","D"],
                         "D": ["A","D","D","C"]} # production rules from Wikipedia
    
    n_iter = np.ceil(n/4) #number of blocks needed to connect given number of points (n)
    stack = ["A"]  # list of blocks
    
    # defining blocks
    while len(stack) < n_iter:
        instructions_list=[]
        while len(stack) > 0:
            instructions_list = np.concatenate((instructions_list,np.array(instructions_dict[stack.pop()])))
        stack = instructions_list[::-1].tolist()

    i = 1
    while i <= n_iter:
        current_stage = stack.pop() #get current block (stage)
        base = points[-1] #get the base point, we will start drawing from start's coordinates

        if current_stage == 'A': #draw A
            points.append([base[0], base[1]+1,base[2]])
            points.append([base[0]+1, base[1]+1 ,base[2]])
            points.append([base[0]+1, base[1],base[2]])
            base = points[-1]
            next_stage = stack[-1] if len(stack) >0 else None
            if next_stage  == "A" or next_stage == "D": # prepare base for next stage
                points.append([base[0]+1, base[1],base[2]])
            else:
                points.append([base[0], base[1]-1,base[2]])

        if current_stage == 'B':
            points.append([base[0]-1, base[1],base[2]])
            points.append([base[0]-1, base[1]-1,base[2]])
            points.append([base[0], base[1]-1,base[2]])
            base = points[-1]
            next_stage = stack[-1] if len(stack) >0 else None
            if next_stage  == "B" or next_stage == "C":
                points.append([base[0], base[1]-1,base[2]])
            else:
                points.append([base[0]+1, base[1],base[2]])

        if current_stage == 'C':
            points.append([base[0], base[1]-1,base[2]])
            points.append([base[0]-1, base[1]-1,base[2]])
            points.append([base[0]-1, base[1],base[2]])
            base = points[-1]
            next_stage = stack[-1] if len(stack) >0 else None
            if next_stage  == "C" or next_stage == "B":
                points.append([base[0]-1, base[1],base[2]])
            else:
                points.append([base[0], base[1]+1,base[2]])

        if current_stage == 'D':
            points.append([base[0]+1, base[1],base[2]])
            points.append([base[0]+1, base[1]+1,base[2]])
            points.append([base[0], base[1]+1,base[2]])
            base = points[-1]
            next_stage = stack[-1] if len(stack) >0 else None
            if next_stage  == "D" or next_stage == "A":
                points.append([base[0], base[1]+1,base[2]])
            else:
                points.append([base[0]-1, base[1],base[2]])
        
        i +=1
        
    while len(points) > n:
        points.pop() # delete redundant points
    return points

def helisa(n, radius = 1, step_z = 0.1, n_rotations = 5):
    steps_z = np.repeat(step_z,n)
    angles = np.linspace(0,n_rotations* 2*np.pi, n, endpoint= False)
    x = radius * np.cos(angles)
    y = radius * np.sin(angles)
    z = np.cumsum(steps_z)

    return np.column_stack((x,y,z))

def sphere(n, radius = 1):
    angles_xz = np.random.uniform(0,np.pi, n)
    angles_xy = np.random.uniform(0,2*np.pi, n)
    x = radius * np.cos(angles_xy) * np.cos(angles_xz)
    y = radius * np.cos(angles_xy) * np.sin(angles_xz)
    z = radius * np.sin(angles_xy)

    return np.column_stack((x,y,z))

def cube(n, edge_len = 1):
    x = np.random.uniform(0,edge_len,n)
    y = np.random.uniform(0,edge_len,n)
    z = np.random.uniform(0,edge_len,n)

    return np.column_stack((x,y,z))

def cube_outer(n, edge_len = 1):
    walls = np.random.randint(1,7,n)
    points = []
    for wall in walls:
        x = 0 if wall == 1 else edge_len if wall == 3 else np.random.uniform(0,edge_len)
        y = 0 if wall == 2 else edge_len if wall == 4 else np.random.uniform(0,edge_len)
        z = 0 if wall == 5 else edge_len if wall == 6 else np.random.uniform(0,edge_len)
        points.append([x,y,z])
    return points

# Simulation

In [55]:
# 0. Generate some initial structure
N_beads=100
points = helisa(N_beads)
write_mmcif(points,'init_struct.cif')
generate_psf(N_beads,'LE_init_struct.psf')

# 1. Define System
pdb = PDBxFile('init_struct.cif')
forcefield = ForceField('forcefields/classic_sm_ff.xml')
system = forcefield.createSystem(pdb.topology, nonbondedCutoff=1*u.nanometer)
integrator = mm.LangevinIntegrator(310, 0.05, 100 * mm.unit.femtosecond)

# 2. Define the forcefield
# 2.1. Harmonic bond borce between succesive beads
bond_force = mm.HarmonicBondForce()
system.addForce(bond_force)
for i in range(system.getNumParticles() - 1):
    bond_force.addBond(i, i + 1, 0.1, 300000.0)
bond_force.addBond(10, 70, 0.001, 0.001) # connect bead 10 with bead 70

#2.2. Harmonic angle force between successive beads so as to make chromatin rigid
angle_force = mm.HarmonicAngleForce()
system.addForce(angle_force)
for i in range(system.getNumParticles() - 2):
    angle_force.addAngle(i, i + 1, i + 2, np.pi, 0.001)
    
# 3. Minimize energy
simulation = Simulation(pdb.topology, system, integrator)
simulation.reporters.append(StateDataReporter(stdout, 10, step=True, totalEnergy=True, potentialEnergy=True, temperature=True))
simulation.reporters.append(DCDReporter('stochastic_LE.dcd', 10))
simulation.context.setPositions(pdb.positions)
simulation.minimizeEnergy(tolerance=0.001)
state = simulation.context.getState(getPositions=True)
PDBxFile.writeFile(pdb.topology, state.getPositions(), open('minimized.cif', 'w')) # save minimized file

# 4. Run md simulation
simulation.context.setVelocitiesToTemperature(310, 0)
simulation.step(10000)
state = simulation.context.getState(getPositions=True)
PDBxFile.writeFile(pdb.topology, state.getPositions(), open('after_sim.cif', 'w')) # save minimized file
PDBFile.writeFile(pdb.topology, state.getPositions(), open('after_sim.pdb', 'w')) # save minimized file
df = DCDFile(open("after_sim.dcd", "wb"),pdb.topology, dt = 100 * mm.unit.femtosecond)
df.writeModel(state.getPositions(), pdb.topology.getUnitCellDimensions(), pdb.topology.getPeriodicBoxVectors())

#"Step","Potential Energy (kJ/mole)","Total Energy (kJ/mole)","Temperature (K)"
10,135.75782775878906,402.47991298884153,216.02217732340142
20,82.21783447265625,397.5208275169134,255.36857592897698
30,88.51806640625,409.31106531992555,259.81501320246747
40,113.13584899902344,423.259280025959,251.17357174100985
50,123.11409759521484,430.3524429574609,248.8368980211969
60,111.12345123291016,440.91904070973396,267.10634497698106
70,102.48455047607422,426.51841120421886,262.439835308826
80,107.70790100097656,441.53025322780013,270.3676799206901
90,141.01275634765625,465.2581296004355,262.6111424432289
100,107.29249572753906,461.26957818865776,286.6912952104511
110,96.69505310058594,457.645763322711,292.3393398839027
120,143.6744384765625,472.3052584901452,266.1629808933052
130,143.51968383789062,477.33309634029865,270.360439505142
140,175.40606689453125,493.2406103219837,257.4189161747427
150,151.5321044921875,482.02448754012585,267.67068843675594
160,176.40756225585938,508.9860779941082,2

# Visualization

In [56]:
traj = md.load_dcd("after_sim.dcd", top="after_sim.pdb")

positions = traj.xyz

mesh = pv.PolyData(positions[0])

# Create PyVista plotter
plotter = pv.Plotter(notebook=True)

# Add mesh to the plotter
plotter.add_mesh(mesh, color="blue", point_size=5)

# Create lines between consecutive points
lines = pv.lines_from_points(positions[0])

# Add lines to the plotter
plotter.add_mesh(lines, color="red", line_width=2)

# Show the plotter using Trame's notebook backend
plotter.show(jupyter_backend='trame')

Widget(value='<iframe src="http://localhost:61174/index.html?ui=P_0x7f9f93c6f310_17&reconnect=auto" class="pyv…

### Testing initial structure generation function

In [57]:
positions = cube_outer(1000)

mesh = pv.PolyData(positions)

# Create PyVista plotter
plotter = pv.Plotter(notebook=True)

# Add mesh to the plotter
plotter.add_mesh(mesh, color="blue", point_size=5)

# Create lines between consecutive points
lines = pv.lines_from_points(positions)

# Add lines to the plotter
plotter.add_mesh(lines, color="red", line_width=2)

# Show the plotter using Trame's notebook backend
plotter.show(jupyter_backend='trame')

Widget(value='<iframe src="http://localhost:61174/index.html?ui=P_0x7f9fc08e9520_18&reconnect=auto" class="pyv…

# Testing multiple initial structures

In [58]:
initial_structure_gen_list = [line,random_walk,random_walk2,hilbert_curve2d, helisa, sphere, cube, cube_outer]

In [59]:
import os
def make_directory_if_not_exists(directory_path):
    if not os.path.exists(directory_path):
        os.makedirs(directory_path)

for f in initial_structure_gen_list:
    make_directory_if_not_exists(f'initial_structures_tests/{f.__name__}')

In [60]:
def run_simulation_on_initial_structure(initail_structure_gen_fun, N_beads):    
    points = initail_structure_gen_fun(N_beads)
    write_mmcif(points,f'initial_structures_tests/{initail_structure_gen_fun.__name__}/init_struct.cif')
    generate_psf(N_beads,f'initial_structures_tests/{initail_structure_gen_fun.__name__}/LE_init_struct.psf')

    # 1. Define System
    pdb = PDBxFile(f'initial_structures_tests/{initail_structure_gen_fun.__name__}/init_struct.cif')
    forcefield = ForceField('forcefields/classic_sm_ff.xml')
    system = forcefield.createSystem(pdb.topology, nonbondedCutoff=1*u.nanometer)
    integrator = mm.LangevinIntegrator(310, 0.05, 100 * mm.unit.femtosecond)

    # 2. Define the forcefield
    # 2.1. Harmonic bond borce between succesive beads
    bond_force = mm.HarmonicBondForce()
    system.addForce(bond_force)
    for i in range(system.getNumParticles() - 1):
        bond_force.addBond(i, i + 1, 0.1, 300000.0)
    bond_force.addBond(10, 70, 0.0001, 100000) # connect bead 10 with bead 70

    #2.2. Harmonic angle force between successive beads so as to make chromatin rigid
    angle_force = mm.HarmonicAngleForce()
    system.addForce(angle_force)
    for i in range(system.getNumParticles() - 2):
        angle_force.addAngle(i, i + 1, i + 2, np.pi, 10)

    # 3. Minimize energy
    simulation = Simulation(pdb.topology, system, integrator)
    simulation.reporters.append(StateDataReporter(stdout, 10, step=True, totalEnergy=True, potentialEnergy=True, temperature=True))
    simulation.reporters.append(DCDReporter(f'initial_structures_tests/{initail_structure_gen_fun.__name__}/stochastic_LE.dcd', 10))
    simulation.context.setPositions(pdb.positions)
    simulation.minimizeEnergy(tolerance=0.001)
    state = simulation.context.getState(getPositions=True)
    PDBxFile.writeFile(pdb.topology, state.getPositions(), open(f'initial_structures_tests/{initail_structure_gen_fun.__name__}/minimized.cif', 'w')) # save minimized file

    # 4. Run md simulation
    simulation.context.setVelocitiesToTemperature(310, 0)
    simulation.step(10000)
    state = simulation.context.getState(getPositions=True)
    PDBxFile.writeFile(pdb.topology, state.getPositions(), open(f'initial_structures_tests/{initail_structure_gen_fun.__name__}/after_sim.cif', 'w')) # save minimized file
    PDBFile.writeFile(pdb.topology, state.getPositions(), open(f'initial_structures_tests/{initail_structure_gen_fun.__name__}/after_sim.pdb', 'w')) # save minimized file
    df = DCDFile(open(f'initial_structures_tests/{initail_structure_gen_fun.__name__}/after_sim.dcd', "wb"),pdb.topology, dt = 100 * mm.unit.femtosecond)
    df.writeModel(state.getPositions(), pdb.topology.getUnitCellDimensions(), pdb.topology.getPeriodicBoxVectors())

Task exception was never retrieved
future: <Task finished name='Task-11714' coro=<WslinkHandler.sendWrappedMessage() done, defined at /Users/krzysztof/opt/anaconda3/lib/python3.9/site-packages/wslink/protocol.py:385> exception=ConnectionResetError('Cannot write to closing transport')>
Traceback (most recent call last):
  File "/Users/krzysztof/opt/anaconda3/lib/python3.9/asyncio/tasks.py", line 256, in __step
    result = coro.send(None)
  File "/Users/krzysztof/opt/anaconda3/lib/python3.9/site-packages/wslink/protocol.py", line 417, in sendWrappedMessage
    await ws.send_bytes(chunk)
  File "/Users/krzysztof/opt/anaconda3/lib/python3.9/site-packages/aiohttp/web_ws.py", line 342, in send_bytes
    await self._writer.send(data, binary=True, compress=compress)
  File "/Users/krzysztof/opt/anaconda3/lib/python3.9/site-packages/aiohttp/http_websocket.py", line 727, in send
    await self._send_frame(message, WSMsgType.BINARY, compress)
  File "/Users/krzysztof/opt/anaconda3/lib/python3.9/

In [61]:
for f in initial_structure_gen_list:
    run_simulation_on_initial_structure(f,100)

#"Step","Potential Energy (kJ/mole)","Total Energy (kJ/mole)","Temperature (K)"
10,4513.15234375,6744.096734672785,1806.8749889942317
20,2731.2158203125,6391.250520285219,2964.316450530749
30,2002.56884765625,6103.024397600442,3321.0198366316795
40,1840.8203125,5865.564446166158,3259.7000363664984
50,2060.51171875,5535.30962485075,2814.290420634818
60,1697.599853515625,5243.069471001625,2871.5284890739013
70,1971.8028564453125,5002.436936169863,2454.555542365094
80,1798.4190673828125,4728.022599726915,2372.7294018620496
90,1501.64404296875,4477.130322933197,2409.89052046225
100,1525.2974853515625,4218.70876532793,2181.433789504383
110,1660.22509765625,3991.875907957554,1888.4386134156114
120,1512.74365234375,3749.8636274933815,1811.8766863594005
130,1377.0362548828125,3491.457082927227,1712.500825187156
140,1505.095458984375,3349.592999704182,1493.8859467534526
150,1385.3045654296875,3192.8539160192013,1463.961058931656
160,1328.4788818359375,3058.687860876322,1401.3219436044287
170,13

In [63]:
plotter = pv.Plotter(shape=(len(initial_structure_gen_list), 2), notebook=True, window_size=(1000,2000))
for i,f in enumerate(initial_structure_gen_list):
    traj = md.load_dcd(f'initial_structures_tests/{f.__name__}/after_sim.dcd', top=f'initial_structures_tests/{f.__name__}/after_sim.pdb')
    positions = traj.xyz
    mesh = pv.PolyData(positions[0])
    plotter.subplot(i,1)
    plotter.add_text(f"{f.__name__}, after simulation", font_size=10) 
    plotter.add_mesh(mesh, color="blue", point_size=5)
    lines = pv.lines_from_points(positions[0])
    plotter.add_mesh(lines, color="red", line_width=2)

    traj = md.load(f'initial_structures_tests/{f.__name__}/init_struct.cif')
    positions = traj.xyz
    mesh = pv.PolyData(positions[0])
    plotter.subplot(i,0)
    plotter.add_text(f"{f.__name__}, initial structure", font_size=10) 
    plotter.add_mesh(mesh, color="blue", point_size=5)
    lines = pv.lines_from_points(positions[0])
    plotter.add_mesh(lines, color="red", line_width=2)
plotter.show(jupyter_backend='trame')

Widget(value='<iframe src="http://localhost:61174/index.html?ui=P_0x7f9fc0928700_20&reconnect=auto" class="pyv…