## Prepare input structures

The MOF structures are catagorized into three classes based on the value of $CO_2$ of adsorption heat ($Q_{st}$): DAC, Flue Gas, and General. 

In [8]:
import glob
from dotenv import load_dotenv

import pandas as pd
from ase.io import read
from pathlib import Path

load_dotenv()

files = glob.glob("../structures/*/*.cif")

data = []

class_map = {
    "flue_gas": "Flue Gas",
    "dac": "DAC",
    "general": "General"
}

for file in files:

    row = {
        # "file": file,
        "name": file.split("/")[-1].split(".")[0],
        "class": class_map[file.split("/")[-2]],
        "structure": read(file)
    }
    
    data.append(row)

df = pd.DataFrame(data)
df.to_pickle("input.pkl")
df = pd.read_pickle("input.pkl")



In [9]:
df

Unnamed: 0,name,class,structure
0,UTSA-16,Flue Gas,"(Atom('K', [3.226651885197136, 4.3289875091971..."
1,MIL-96(Al),Flue Gas,"(Atom('O', [4.747352350000005, 12.286770663425..."
2,MIL-120,Flue Gas,"(Atom('Al', [3.205338673962497e-05, -9.4470429..."
3,CALF20,Flue Gas,"(Atom('Zn', [-0.24129790834544718, 0.559411884..."
4,MUF-16,Flue Gas,"(Atom('C', [6.197196279188237, 0.0090222, 3.25..."
5,ZnH-MFU-4l,Flue Gas,"(Atom('Zn', [9.759696748000001, 9.759696748000..."
6,Al-PyrMOF,Flue Gas,"(Atom('O', [6.3373961, 2.3106933200000004, 1.4..."
7,ZIF-8,General,"(Atom('Zn', [-2.4522332256501764, 6.9362179409..."
8,HKUST-1,General,"(Atom('Cu', [14.627739280500004, 2.30914745804..."
9,UiO-66,General,"(Atom('C', [17.367786750000004, 2.917324207765..."


## Define flow

In [6]:

from ase import Atoms
from ase.build import molecule
from tqdm.auto import tqdm

from mlip_arena.models import MLIPEnum
from mlip_arena.tasks.utils import get_calculator
from mlip_arena.tasks.mof.flow import widom_insertion

from prefect.states import State
from prefect import Task, flow, task
from prefect.client.schemas.objects import TaskRun

import itertools
import functools

@task
def load_row_from_df(fpath: str):
    df = pd.read_pickle(fpath)

    for _, row in df.iterrows():
        yield row

def save_result(
        tsk: Task, run: TaskRun, state: State, 
        row: dict | pd.DataFrame, 
        model: MLIPEnum,
        gas: Atoms,
        fpath: Path | str
    ):
    
    result = run.state.result()

    assert isinstance(result, dict)

    copied = row.copy()
    copied["model"] = model.name
    copied["gas"] = gas

    for k, v in result.items():
        copied[k] = v

    fpath = Path(f"{model.name}.pkl")

    if fpath.exists():
        df = pd.read_pickle(fpath)
        df = pd.concat([df, pd.DataFrame([copied])], ignore_index=True)
    else:
        df = pd.DataFrame([copied])

    df.drop_duplicates(subset=["name", "model"], keep='last', inplace=True)
    df.to_pickle(fpath)


@flow
def run():
    futures = []
    gas = molecule("CO2")

    for model, row in tqdm(itertools.product(MLIPEnum, load_row_from_df('input.pkl'))):

        future = widom_insertion.with_options(
            on_completion=[functools.partial(
                save_result,
                row=row,
                model=model,
                gas=gas,
                fpath=f"{model.name}.pkl"
            )]
        ).submit(
            structure=row["structure"],
            gas=gas,
            calculator=get_calculator(
                model,
                dispersion=False
            ),
            criterion=dict(fmax=0.05, steps=50),
            init_structure_optimize_loops = 10,
        )

        futures.append(future)

    
    return [f.result(raise_on_failure=False) for f in futures]

In [None]:

from dask.distributed import Client
from dask_jobqueue import SLURMCluster
from prefect_dask import DaskTaskRunner

# Orchestrate your awesome dask workflow runner

nodes_per_alloc = 1
gpus_per_alloc = 4
ntasks = 1

cluster_kwargs = dict(
    cores=1,
    memory="64 GB",
    shebang="#!/bin/bash",
    account="matgen",
    walltime="00:30:00",
    job_mem="0",
    job_script_prologue=[
        "source ~/.bashrc",
        "module load python",
        "source activate /pscratch/sd/c/cyrusyc/.conda/mlip-arena",
    ],
    job_directives_skip=["-n", "--cpus-per-task", "-J"],
    job_extra_directives=[
        "-J mof",
        "-q regular",
        f"-N {nodes_per_alloc}",
        "-C gpu",
        f"-G {gpus_per_alloc}",
    ],
)

cluster = SLURMCluster(**cluster_kwargs)
print(cluster.job_script())
cluster.adapt(minimum_jobs=10, maximum_jobs=20)
client = Client(cluster)

# Run the workflow on HPC cluster in parallel

results = run.with_options(
    task_runner=DaskTaskRunner(address=client.scheduler.address),
    # log_prints=True,
)()