In [17]:
## packages import from run

from functools import partial, update_wrapper
from threading import Event, Lock
from collections import deque
from dataclasses import dataclass
from datetime import datetime
from random import shuffle, sample
from pathlib import Path
from typing import Dict, Any, Optional
from dataclasses import asdict
import hashlib
import logging
import argparse
import shutil
import json
import sys

import ase
from ase.db import connect
from ase.md.velocitydistribution import MaxwellBoltzmannDistribution
from colmena.models import Result
from colmena.queue import ColmenaQueues
from colmena.queue.redis import RedisQueues
from colmena.thinker import BaseThinker, event_responder, result_processor, ResourceCounter, task_submitter
import proxystore as ps
import numpy as np
import torch
from proxystore.store import register_store
from proxystore.store.file import FileStore
from proxystore.store.globus import GlobusStore, GlobusEndpoints
from proxystore.store.redis import RedisStore
from proxystore.store.utils import get_key

from fff.learning.gc.ase import SchnetCalculator
from fff.learning.gc.functions import GCSchNetForcefield
from fff.learning.gc.models import SchNet, load_pretrained_model
from fff.learning.util.messages import TorchMessage
from fff.sampling.md import MolecularDynamics
from fff.simulation import run_calculator
from fff.simulation.utils import read_from_string, write_to_string
from _pytest.fixtures import fixture
from ase.build import molecule
from ase.calculators.lj import LennardJones
from ase.calculators.singlepoint import SinglePointCalculator
from ase.md.velocitydistribution import MaxwellBoltzmannDistribution

from config import wsl as make_config

In [2]:
## packages for monitor cpu and gpu
import psutil
import pynvml
import time
import os
from memory_profiler import profile

In [34]:
## search space

@dataclass
class Trajectory:
    """Tracks the state of searching along individual trajectories

    We mark the starting point, the last point produced from sampling,
    and the last point we produced that has been validated
    """
    id: int  # ID number of the
    starting: ase.Atoms  # Starting point of the trajectory
    current_timestep = 0  # How many timesteps have been used so far
    last_validated: ase.Atoms = None  # Last validated point on the trajectory
    current: ase.Atoms = None  # Last point produced along the trajectory
    last_run_length: int = 0  # How long between current and last_validated
    name: str = None  # Name of the trajectory

    def __post_init__(self):
        self.last_validated = self.current = self.starting

    def update_current_structure(self, strc: ase.Atoms, run_length: int):
        """Update the structure that has yet to be updated

        Args:
            strc: Structure produced by sampling
            run_length: How many timesteps were performed in sampling run
        """
        self.current = strc.copy()
        self.last_run_length = run_length

    def set_validation(self, success: bool):
        """Set whether the trajectory was successfully validated

        Args:
            success: Whether the validation was successful
        """
        if success:
            self.last_validated = self.current  # Move the last validated forward
            self.current_timestep += self.last_run_length

with connect('/home/yxx/work/project/colmena/multisite_/finetuning-surrogates/tests/files/test.db') as db:
    search_space = [Trajectory(i, x.toatoms(), name=x.get('filename', f'traj-{i}')) for i, x in enumerate(db.select(''))]
    for i, x in enumerate(db.select('')):
        print(x.get('filename', f'traj-{i}'))
        print(x.toatoms)
        print(search_space[i].starting)
            
    print(len(search_space))

traj-0
<bound method AtomsRow.toatoms of <ase.db.row.AtomsRow object at 0x7f87c417df10>>
Atoms(symbols='OH2', pbc=False, calculator=SinglePointCalculator(...))
traj-1
<bound method AtomsRow.toatoms of <ase.db.row.AtomsRow object at 0x7f87c4168c10>>
Atoms(symbols='OH2', pbc=False, calculator=SinglePointCalculator(...))
traj-2
<bound method AtomsRow.toatoms of <ase.db.row.AtomsRow object at 0x7f87c4168910>>
Atoms(symbols='OH2', pbc=False, calculator=SinglePointCalculator(...))
traj-3
<bound method AtomsRow.toatoms of <ase.db.row.AtomsRow object at 0x7f87c4168c10>>
Atoms(symbols='OH2', pbc=False, calculator=SinglePointCalculator(...))
traj-4
<bound method AtomsRow.toatoms of <ase.db.row.AtomsRow object at 0x7f87c4168910>>
Atoms(symbols='OH2', pbc=False, calculator=SinglePointCalculator(...))
traj-5
<bound method AtomsRow.toatoms of <ase.db.row.AtomsRow object at 0x7f87c4168c10>>
Atoms(symbols='OH2', pbc=False, calculator=SinglePointCalculator(...))
traj-6
<bound method AtomsRow.toatoms of

In [None]:
## test finetune-surrogate model train and infernece


In [22]:
## test sampling task
def test_md(atoms):
    calc = LennardJones()
    MaxwellBoltzmannDistribution(atoms, temperature_K=60)
    md = MolecularDynamics()
    atoms, traj = md.run_sampling(atoms, 1000, calc, timestep=1, log_interval=100)
    assert len(traj) == 9

    # Make sure it has both the energy and the forces
    assert isinstance(traj[0].calc, SinglePointCalculator)  # SPC is used to store results
    assert traj[0].get_forces().shape == (3, 3)
    assert traj[0].get_total_energy()
    assert traj[0].get_total_energy() == traj[-1].get_total_energy()
    
    return atoms, traj

In [35]:
# a,t = test_md(molecule('H2O'))
# print(a)
# print(molecule('H2O'))
# print(t)

simulation_pool = []

for a in search_space:
    a,t = test_md(a.starting)
    simulation_pool.append((a,t))
    print(a,t)

Atoms(symbols='OH2', pbc=False, momenta=..., calculator=LennardJones(...)) [Atoms(symbols='OH2', pbc=False, momenta=..., calculator=SinglePointCalculator(...)), Atoms(symbols='OH2', pbc=False, momenta=..., calculator=SinglePointCalculator(...)), Atoms(symbols='OH2', pbc=False, momenta=..., calculator=SinglePointCalculator(...)), Atoms(symbols='OH2', pbc=False, momenta=..., calculator=SinglePointCalculator(...)), Atoms(symbols='OH2', pbc=False, momenta=..., calculator=SinglePointCalculator(...)), Atoms(symbols='OH2', pbc=False, momenta=..., calculator=SinglePointCalculator(...)), Atoms(symbols='OH2', pbc=False, momenta=..., calculator=SinglePointCalculator(...)), Atoms(symbols='OH2', pbc=False, momenta=..., calculator=SinglePointCalculator(...)), Atoms(symbols='OH2', pbc=False, momenta=..., calculator=SinglePointCalculator(...))]
Atoms(symbols='OH2', pbc=False, momenta=..., calculator=LennardJones(...)) [Atoms(symbols='OH2', pbc=False, momenta=..., calculator=SinglePointCalculator(...))

In [6]:
## test simulation task
calc = dict(calc='psi4', method='pbe0-d3', basis='aug-cc-pvdz', num_threads=64)
temp_path = "./temp"
## print complete whole path
path = os.path.abspath(temp_path)
print(path)

def _wrap(func, **kwargs):
    out = partial(func, **kwargs)
    update_wrapper(out, func)
    return out

# @fixture()
def atoms():
    return molecule('H2O')
atom = atoms()

# @fixture()
def cluster():
    xyz = """30

O       7.581982610000000     -0.663324770000000      5.483883860000000
H       8.362350460000000     -0.079370470000000      5.498567580000000
H       7.846055030000000     -1.464757200000000      5.041030880000000
O       9.456702229999999      1.642301080000000      8.570644379999999
H      10.114471399999999      1.655581000000000      9.261547090000001
H       9.181962009999999      2.562770840000000      8.428308489999999
O       9.664885520000000      1.027763610000000      5.758778100000000
H       9.485557560000000      1.914335850000000      5.411871910000000
H       9.760457990000001      1.144007330000000      6.710969450000000
O       6.000383380000000      4.009448050000000      7.349214080000000
H       5.983903880000000      4.025474550000000      6.383275510000000
H       5.536083220000000      3.203337670000000      7.608772750000000
O       4.833731170000000      1.482195020000000      7.883007530000000
H       5.628127100000000      0.955450120000000      8.084721569999999
H       4.134047510000000      1.149705890000000      8.438218120000000
O       7.110025880000000      0.051394890000000      8.205573080000001
H       7.262372970000000     -0.325556960000000      7.328944680000000
H       7.906465050000000      0.552607360000000      8.419908520000000
O       6.173881530000000      3.688445090000000      4.528872010000000
H       5.701079370000000      4.022632120000000      3.771772860000000
H       5.903837200000000      2.759263990000000      4.641063690000000
O       5.429551600000000      1.145089270000000      5.097751140000000
H       6.135000710000000      0.486118580000000      5.133624550000000
H       5.085167410000000      1.211727260000000      5.997292520000000
O       8.597597120000000      4.222480770000000      8.031750680000000
H       7.641802790000000      4.166800020000000      7.848542690000000
H       8.760176660000001      5.097825050000000      8.372748370000000
O       8.954336169999999      3.647526740000000      5.177083970000000
H       8.927373890000000      3.954237700000000      6.090421680000000
H       8.043264389999999      3.698852060000000      4.860042570000000
"""
    return read_from_string(xyz, 'xyz')
xyz = cluster()
xyz = write_to_string(xyz, 'xyz')
print(xyz)
xyz = write_to_string(atom, 'xyz')
print(xyz)
my_run_simulation = _wrap(run_calculator, calc=calc, temp_path=path)


/home/yxx/work/project/colmena/multisite_/finetuning-surrogates/runs/analysis/temp
30

O       7.581982610000000     -0.663324770000000      5.483883860000000
H       8.362350460000000     -0.079370470000000      5.498567580000000
H       7.846055030000000     -1.464757200000000      5.041030880000000
O       9.456702229999999      1.642301080000000      8.570644379999999
H      10.114471399999999      1.655581000000000      9.261547090000001
H       9.181962009999999      2.562770840000000      8.428308489999999
O       9.664885520000000      1.027763610000000      5.758778100000000
H       9.485557560000000      1.914335850000000      5.411871910000000
H       9.760457990000001      1.144007330000000      6.710969450000000
O       6.000383380000000      4.009448050000000      7.349214080000000
H       5.983903880000000      4.025474550000000      6.383275510000000
H       5.536083220000000      3.203337670000000      7.608772750000000
O       4.833731170000000      1.482195020000000 

In [41]:
## get atoms


# xyz = cluster()
# xyz = write_to_string(xyz, 'xyz')
# print(xyz)
xyz = write_to_string(atom, 'xyz')
print(xyz)

# %%memit
# for a in search_space:
#     xyz = write_to_string(a.starting, 'xyz')
#     res = my_run_simulation(xyz)

xyz = write_to_string(search_space[-1].starting, 'xyz')
print(xyz)
res = my_run_simulation(xyz)

3

O       0.000000000000000      0.000000000000000      0.119262000000000
H       0.000000000000000      0.763239000000000     -0.477047000000000
H       0.000000000000000     -0.763239000000000     -0.477047000000000

3

O      -3.515593541595766      1.353030720062226     14.708864644398979
H      16.671446270485440    124.162943844894613   -122.027107241268851
H      -8.491094992594517   -156.395148494692307   -138.094404296338126

  Threads set to 64 by Python driver.
  Threads set to 64 by Python driver.


ValueError: Calculation failed: Could not converge SCF iterations in 100 iterations.

In [9]:
## test cpu affinity impact on simulation task
start_time = time.time()
!taskset -c 0-3 python ./simulation_profile.py
end_time = time.time()

/home/yxx/work/project/colmena/multisite_/finetuning-surrogates/runs/analysis/temp
  Threads set to 64 by Python driver.
  Threads set to 64 by Python driver.


In [13]:
import time
import subprocess

num_cpus = [2, 4, 6, 8, 16]  # 要测试的 CPU 数量

for num_cpu in num_cpus:
    print(f"Testing with {num_cpu} CPUs:")
    start_time = time.time()
    
    # 使用 taskset 命令设置 CPU 亲和性，并运行 Python 脚本
    command = f"taskset -c 0-{num_cpu-1} python ./simulation_profile.py"
    subprocess.run(command, shell=True)
    
    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"Elapsed time: {elapsed_time} seconds\n")

Testing with 2 CPUs:
/home/yxx/work/project/colmena/multisite_/finetuning-surrogates/runs/analysis/temp
  Threads set to 64 by Python driver.
  Threads set to 64 by Python driver.
Elapsed time: 18.155221700668335 seconds

Testing with 4 CPUs:
/home/yxx/work/project/colmena/multisite_/finetuning-surrogates/runs/analysis/temp
  Threads set to 64 by Python driver.
  Threads set to 64 by Python driver.
Elapsed time: 10.717058658599854 seconds

Testing with 6 CPUs:
/home/yxx/work/project/colmena/multisite_/finetuning-surrogates/runs/analysis/temp
  Threads set to 64 by Python driver.
  Threads set to 64 by Python driver.
Elapsed time: 7.596288204193115 seconds

Testing with 8 CPUs:
/home/yxx/work/project/colmena/multisite_/finetuning-surrogates/runs/analysis/temp
  Threads set to 64 by Python driver.
  Threads set to 64 by Python driver.
Elapsed time: 7.161304712295532 seconds

Testing with 16 CPUs:
/home/yxx/work/project/colmena/multisite_/finetuning-surrogates/runs/analysis/temp
  Threads

In [7]:
## gpu monitor test
# 初始化pynvml
pynvml.nvmlInit()

# 获取GPU数量
num_gpus = pynvml.nvmlDeviceGetCount()

# 遍历每个GPU
for i in range(num_gpus):
    # 获取GPU句柄
    handle = pynvml.nvmlDeviceGetHandleByIndex(i)

    # 获取GPU的利用率
    utilization = pynvml.nvmlDeviceGetUtilizationRates(handle).gpu

    # 获取GPU的温度
    temperature = pynvml.nvmlDeviceGetTemperature(handle, pynvml.NVML_TEMPERATURE_GPU)
    
    # 获取GPU的内存信息
    memory_info = pynvml.nvmlDeviceGetMemoryInfo(handle)

    # 打印GPU内存信息
    print(f"GPU {i+1}:")
    print(f"Total Memory: {memory_info.total / (1024**2)} MB")
    print(f"Used Memory: {memory_info.used / (1024**2)} MB")
    print(f"Free Memory: {memory_info.free / (1024**2)} MB")

    # 打印GPU信息
    print(f"GPU {i+1}: Utilization = {utilization}%, Temperature = {temperature}°C")

# 清理pynvml
pynvml.nvmlShutdown()

GPU 1:
Total Memory: 6144.0 MB
Used Memory: 3831.078125 MB
Free Memory: 2312.921875 MB
GPU 1: Utilization = 1%, Temperature = 56°C


In [49]:
## cpu monitor test
# 获取CPU利用率
cpu_percent = psutil.cpu_percent(interval=1)
print(f"CPU Utilization: {cpu_percent}%")

# 获取内存使用情况
memory = psutil.virtual_memory()
print(f"Total Memory: {memory.total / (1024**3):.2f} GB")
print(f"Available Memory: {memory.available / (1024**3):.2f} GB")
print(f"Used Memory: {memory.used / (1024**3):.2f} GB")

# 获取磁盘I/O信息
disk_io = psutil.disk_io_counters()
print(f"Disk Read Count: {disk_io.read_count}")
print(f"Disk Write Count: {disk_io.write_count}")
print(f"Disk Read Bytes: {disk_io.read_bytes / (1024**2):.2f} MB")
print(f"Disk Write Bytes: {disk_io.write_bytes / (1024**2):.2f} MB")

# 等待一段时间
# time.sleep(1)
import os
# 定义测试文件路径和大小
test_file = 'disk_io_test_file.bin'
file_size = 1024 * 1024 * 100  # 100 MB
# 创建测试文件
with open(test_file, 'wb') as f:
    f.seek(file_size - 1)
    f.write(b'\0')

# 读取测试文件
# 获取初始的磁盘I/O信息
disk_io_start = psutil.disk_io_counters()
with open(test_file, 'rb') as f:
    data = f.read()
print(f"File size: {file_size / (1024**2)} MB")
print("Disk I/O test completed.")

# 删除测试文件
# os.remove(test_file)

# 获取最终的磁盘I/O信息
disk_io_end = psutil.disk_io_counters()

# 计算磁盘I/O增量数据
disk_io_diff = {}
disk_io_diff['read_count'] = disk_io_end.read_count - disk_io_start.read_count
disk_io_diff['write_count'] = disk_io_end.write_count - disk_io_start.write_count
disk_io_diff['read_bytes'] = disk_io_end.read_bytes - disk_io_start.read_bytes
disk_io_diff['write_bytes'] = disk_io_end.write_bytes - disk_io_start.write_bytes

# 打印增量数据
print(f"Disk Read Count: {disk_io_diff['read_count']}")
print(f"Disk Write Count: {disk_io_diff['write_count']}")
print(f"Disk Read Bytes: {disk_io_diff['read_bytes'] / (1024**2):.2f} MB")
print(f"Disk Write Bytes: {disk_io_diff['write_bytes'] / (1024**2):.2f} MB")

CPU Utilization: 0.5%
Total Memory: 15.62 GB
Available Memory: 4.17 GB
Used Memory: 11.12 GB
Disk Read Count: 421657
Disk Write Count: 1742382
Disk Read Bytes: 6572.50 MB
Disk Write Bytes: 26248.35 MB
File size: 100.0 MB
Disk I/O test completed.
Disk Read Count: 0
Disk Write Count: 1
Disk Read Bytes: 0.00 MB
Disk Write Bytes: 0.00 MB


In [47]:
import os
import psutil

# 定义测试文件路径和大小
test_file = 'disk_io_test_file.bin'
file_size = 1024 * 1024 * 1000 # 100 MB

# 创建测试文件
with open(test_file, 'wb') as f:
    f.seek(file_size - 1)
    f.write(b'\0')

# 获取初始的磁盘I/O信息
disk_io_start = psutil.disk_io_counters()

# 读取测试文件
with open(test_file, 'rb') as f:
    data = f.read()

# 获取最终的磁盘I/O信息
disk_io_end = psutil.disk_io_counters()

# 计算磁盘I/O增量数据
# disk_io_diff = disk_io_end - disk_io_start

# 打印增量数据
print(f"Disk Read Bytes: {(disk_io_end.read_bytes - disk_io_start.read_bytes) / (1024**2)} MB")
print(f"Disk Write Bytes: {(disk_io_end.write_bytes - disk_io_start.write_bytes) / (1024**2)} MB")

print("Disk I/O test completed.")

# 删除测试文件
os.remove(test_file)

Disk Read Bytes: 0.0 MB
Disk Write Bytes: 0.109375 MB
Disk I/O test completed.
