In [None]:
# Demonstration of phonon calculations using FireWorks

# Import modules from ase and fireworks
import os
import numpy as np

from ase.build import bulk
from ase.calculators.emt import EMT

from fireworks import Firework, FWorker, LaunchPad, PyTask, FWAction, Workflow
from fireworks.core.rocket_launcher import launch_rocket, rapidfire

# Minimal hilde inputs to make dictionary conversion easier
from hilde.structure.structure import pAtoms, dict2patoms, patoms2dict
from hilde.helpers.hash import hash_atoms

# Modified ASE database for phonon calculations
from hilde.phonon_db.row import phonon_to_dict, PhononRow
from hilde.phonon_db.phonon_db import connect as connect_ph

from phonopy import Phonopy
from phonopy.structure.atoms import PhonopyAtoms

mod_name = __name__

In [None]:
# Utility function to convert from ASE atoms to Phonopy Atoms
def to_phonopy_atoms(atoms):
    phonopy_atoms = PhonopyAtoms(
        symbols=atoms.get_chemical_symbols(),
        cell=atoms.get_cell(),
        masses=atoms.get_masses(),
        positions=atoms.get_positions(wrap=True),
    )
    return phonopy_atoms

In [None]:
# Define calculate with a mod_spec instead of update_spec
# mod_spec allows for appending items to the end of lists
def calculate(atoms_dict):
    print("Single point calculation")
    at = dict2patoms(atoms_dict)
    at.calc.calculate(at, properties=['forces'])
    # mod_spec allows the user to add elements at the end of a list stored inside the spec
    # with key defined by the input dict ("calc_forces" in this key)
    return FWAction(mod_spec=[{'_push': {"calc_forces": patoms2dict(at)}}])

In [None]:
# Function to setup multiple calculate Fireworks
def calculate_multiple(atom_dicts):
    print("Appending calculations to the workflow")
    firework_detours = []
    for i, cell in enumerate(atom_dicts):
        task = PyTask({"func": mod_name+".calculate",
                       "args": [cell]})
        firework_detours.append(Firework(task, name=f"calc_{i}"))
    # detours appends the new Fireworks as the children of the current one and moves the
    # previous children of the Firework to be the children of the new Fireworks
    return FWAction(detours=firework_detours)

In [None]:
# Function to calculate the force constants from the Fireworks spec
def get_fcs(phonon_dict, atoms_ideal, calc_atoms):
    print("Getting the force constants")
    atoms = dict2patoms(atoms_ideal)
    disp_cells = [dict2patoms(ca) for ca in calc_atoms]
    # Sort the database list as there is no gaurentee that the calculations will finish in order
    disp_cells = sorted(disp_cells, key=lambda x: x.info['disp_num'])
    phonon = PhononRow(phonon_dict).to_phonon()
    phonon.generate_displacements(distance=0.01)
    phonon.set_forces([cell.get_forces() for cell in disp_cells])
    phonon.produce_force_constants()
    return FWAction(update_spec={"phonon_dict": phonon_to_dict(phonon)})


In [None]:
# Redefine the databse addition function to work with phonopy objects
def calc_to_db(db_path, sc_dict, phonon_dict):
    print("Adding results to the database")
    db = connect_ph(db_path)
    at = dict2patoms(sc_dict)
    at.calc.atoms = at
    atoms_hash, calc_hash = hash_atoms(at)
    selection = [("atoms_hash", "=", atoms_hash), 
                 ("calc_hash", "=", calc_hash), 
                 ("sc_matrix_2", "=", phonon_dict["sc_matrix_2"])]
    try:
        rows = list(db.select(selection=selection))
        if not rows:
            raise KeyError()
        for row in rows:
            db.update(row.id, phonon_dict, atoms_hash=atoms_hash, calc_hash=calc_hash)
    except KeyError:
        db.write(phonon_dict, atoms_hash=atoms_hash, calc_hash=calc_hash)
    return FWAction()

In [None]:
#Initialize structures
smatrix = np.array([3, 0, 0, 0, 3, 0, 0, 0, 3]).reshape(3,3)
db_path = os.getcwd() + "test_ph.db"

# Intialize Structures
al = pAtoms(bulk('Al', 'fcc'))
al.set_calculator(EMT())
al_dict = patoms2dict(al)
al_hash, calc_hash = hash_atoms(al)

ph_atoms = to_phonopy_atoms(al)

phonon = Phonopy(ph_atoms, supercell_matrix=smatrix, symprec=1e-5, is_symmetry=True, factor=15.633302)
phonon_dict = phonon_to_dict(phonon)
pc_dict = patoms2dict(al)
phonon.generate_displacements(distance=0.01)
scs = phonon.get_supercells_with_displacements()
sc_dicts = []
for i, sc in enumerate(scs):
    scs[i] = pAtoms(phonopy_atoms=sc)
    scs[i].info['disp_num'] = i
    scs[i].calc = al.calc
    sc_dicts.append(patoms2dict(scs[i]))

In [None]:
# Create Firetasks for each part of the calculation
ft_initialize = PyTask({"func": mod_name + ".calculate_multiple", "args":[sc_dicts]})
ft_get_fc = PyTask({"func": mod_name + ".get_fcs", "args":[phonon_dict, pc_dict], "inputs":["calc_forces"]})
ft_calc_to_db = PyTask({"func": mod_name + ".calc_to_db", "args":[db_path, pc_dict], "inputs":["phonon_dict"]})

In [None]:
# Initialize the Fireworks
fw_initialize = Firework(ft_initialize)
fw_get_fc = Firework(ft_get_fc)
fw_calc_to_db = Firework(ft_calc_to_db)
# Note there is no need to define links for the Force evaluation calculations
wf = Workflow([fw_initialize, fw_get_fc, fw_calc_to_db], {fw_initialize:[fw_get_fc], fw_get_fc: [fw_calc_to_db]})

In [None]:
launchpad = LaunchPad()
launchpad.reset('', require_password=False)
launchpad.add_wf(wf)
# nlaunches=0 means keep running Fireworks until none are needed
rapidfire(launchpad, nlaunches=0, strm_lvl="CRITICAL")

In [None]:
#Access the database to check the results
db = connect_ph(db_path)
row = list(db.select(selection=[("atoms_hash", "=", al_hash), 
                                ("sc_matrix_2", "=", list(smatrix.flatten()))], 
                     columns=["fc_2"]))[0]
print(f"The force constants are:\n{row.get('fc_2')}")

In [None]:
# Combined local/remote queue launching with phonon calculations
from hilde.fireworks_api_adapter.combined_launcher import rapidfire as lq_rapidfire
from hilde.fireworks_api_adapter.launchpad import LaunchPadHilde as LaunchPad
# Import the hilde calculate function so both the local and remote machines have the same function in their path
from hilde.tasks.fireworks import calculate as hilde_calc
# Use ASE aims calculator
from ase.calculators.aims import Aims

In [None]:
launchpad = LaunchPad(port=27018)

In [None]:
# Redefine calculate_multiple so the calculate Fireworks have a modified _queueadapter spec
def calculate_multiple(workdir, atom_dicts, queue_spec):
    firework_detours = []
    for i, cell in enumerate(atom_dicts):
        task = PyTask({"func": hilde_calc.name,
                       "args": [workdir, "calc_forces", cell]})
        # Changes to queue submission properties occur at Fireworks level via spec
        firework_detours.append(Firework(task, name=f"calc_{i}", spec=queue_spec))
    return FWAction(detours=firework_detours)

In [None]:
# Remote Settings (Change these to match what you need)
remote_settings={
    # Has to be a full path to the host, draco.mpcdf.mpg.de will time out. Needs to be a list
    "remote_host": ["draco01.mpcdf.mpg.de"],
    # Not necessary if remote host is username@host
    "remote_user": "tpurcell",
    # Try to avoid using this
    "remote_password": None,
    "remote_config_dir": ["/u/tpurcell/.fireworks"],
    # If you are using Kerberos this needs to be True
    "gss_auth": True,
    "aims_command": "srun /u/tpurcell/git/fhiaims_dev/bin/aims.180824.scalapack.mpi.x",
    "aims_species_dir": "/u/tpurcell/git/fhiaims_dev/species_defaults/",
}

In [None]:
# Aims calculator settings
aims_settings = {
    "xc": "pw-lda",
    "relativistic": "atomic_zora scalar",
    "sc_accuracy_rho": 1E-6,
    "sc_accuracy_forces": 1E-3,
    "sc_iter_limit": 50,
    "mixer": "pulay",
    "n_max_pulay": 10,
    "charge_mix_param": 0.3,
    "k_grid": [4, 4, 4],
    "output_level": "MD_light",
    # This information must be set to the remote commands
    "species_dir": remote_settings["aims_species_dir"] + "/light",
    "aims_command": remote_settings["aims_command"]
}

In [None]:
# Use diamond silicon as a test material
si = pAtoms(bulk('Si', 'diamond'))
si.calc = Aims(**aims_settings)
si_hash, calc_hash = hash_atoms(si)

# Initialize phononpy 
smatrix = np.array([-2, 2, 2, 2, -2, 2, 2, 2, -2]).reshape(3,3)
db_path = os.getcwd() + "test_ph.db"

ph_atoms = to_phonopy_atoms(si)

phonon = Phonopy(ph_atoms, supercell_matrix=smatrix, symprec=1e-5, is_symmetry=True, factor=15.633302)
phonon_dict = phonon_to_dict(phonon)
pc_dict = patoms2dict(si)
phonon.generate_displacements(distance=0.01)
scs = phonon.get_supercells_with_displacements()
sc_dicts = []
for i, sc in enumerate(scs):
    scs[i] = pAtoms(phonopy_atoms=sc)
    scs[i].info['disp_num'] = i
    scs[i].calc = si.calc
    sc_dicts.append(patoms2dict(scs[i]))

In [None]:
q_spec = {
    # Submission script changes are controled by the _queueadapter dictionary
    "_queueadapter":{
        # Keys are the same that you define in "my_qadapter.yaml"
        "walltime": "00:05:00",
        "nodes": 2
    }
}
# Create Firetasks for each part of the calculation
workdir = "/u/tpurcell/.fireworks/SiTutorialExample/"
ft_initialize = PyTask({"func": mod_name + ".calculate_multiple", "args":[workdir, sc_dicts, q_spec]})
ft_get_fc = PyTask({"func": mod_name + ".get_fcs", "args":[phonon_dict, pc_dict], "inputs":["calc_forces"]})
ft_calc_to_db = PyTask({"func": mod_name + ".calc_to_db", "args":[db_path, pc_dict], "inputs":["phonon_dict"]})

# Initialize the Fireworks
fw_initialize = Firework(ft_initialize)
fw_get_fc = Firework(ft_get_fc)
fw_calc_to_db = Firework(ft_calc_to_db)

wf = Workflow([fw_initialize, fw_get_fc, fw_calc_to_db], {fw_initialize:[fw_get_fc], fw_get_fc: [fw_calc_to_db]})

In [None]:
launchpad.add_wf(wf)
lq_rapidfire(launchpad, 
             launch_dir='.', 
             nlaunches=0, 
             njobs_queue=250, 
             wflow=wf, 
             njobs_block=500,
             sleep_time=60, 
             reserve=True, 
             remote_host=remote_settings['remote_host'],
             remote_user=remote_settings['remote_user'], 
             remote_password=remote_settings['remote_password'],
             remote_config_dir=remote_settings["remote_config_dir"],
             # List of tasks that should be done on the queue
             tasks2queue=[hilde_calc.name],
             gss_auth=remote_settings["gss_auth"])

In [None]:
#Access the database to check the results
db = connect_ph(db_path)
row = list(db.select(selection=[("atoms_hash", "=", si_hash),
                                ("sc_matrix_2", "=", list(smatrix.flatten()))],
                     columns=["fc_2"]))[0]
print(f"The force constants are:\n{row.get('fc_2')}")