Skip to content

Commit

Permalink
Merge pull request #23 from aiida-phonopy/expose-inputs
Browse files Browse the repository at this point in the history
Expose inputs to forces and nac params workchains
  • Loading branch information
atztogo committed Dec 17, 2021
2 parents c104447 + af24321 commit f8462dc
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 104 deletions.
170 changes: 106 additions & 64 deletions aiida_phonopy/common/builders.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Utilities related to process builder or inputs dist."""

import copy
from aiida.engine import calcfunction
from aiida.plugins import DataFactory, WorkflowFactory
from aiida.common import InputValidationError
Expand All @@ -11,15 +11,6 @@
PotcarData = DataFactory("vasp.potcar")


def get_calcjob_inputs(
calculator_settings, structure, calc_type=None, label=None, ctx=None
):
"""Return builder inputs of a calculation."""
return _get_calcjob_inputs(
calculator_settings, structure, calc_type=calc_type, label=label, ctx=ctx
)


def get_plugin_names(calculator_settings):
"""Return plugin names of calculators."""
code_strings = []
Expand All @@ -37,59 +28,71 @@ def get_plugin_names(calculator_settings):
return plugin_names


def _get_calcjob_inputs(
def get_calcjob_inputs(
calculator_settings, structure, calc_type=None, label=None, ctx=None
):
"""Return builder inputs of a calculation."""
if calc_type is None:
if "sequence" in calculator_settings.keys():
key = calculator_settings["sequence"][ctx.iteration - 1]
settings = calculator_settings[key]
else:
settings = calculator_settings
if "sequence" in calculator_settings.keys():
key = calculator_settings["sequence"][ctx.iteration - 1]
calculator_inputs = calculator_settings[key]
else:
settings = Dict(dict=calculator_settings[calc_type])
calculator_inputs = calculator_settings

code = Code.get_from_string(settings["code_string"])
code = Code.get_from_string(calculator_inputs["code_string"])
plugin_name = code.get_input_plugin_name()
if plugin_name == "vasp.vasp":
if isinstance(calculator_inputs["options"], dict):
options = Dict(dict=calculator_inputs["options"])
else:
options = calculator_inputs["options"]
if isinstance(calculator_inputs["potential_family"], str):
potential_family = Str(calculator_inputs["potential_family"])
else:
potential_family = calculator_inputs["potential_family"]
if isinstance(calculator_inputs["potential_mapping"], dict):
potential_mapping = Dict(dict=calculator_inputs["potential_mapping"])
else:
potential_mapping = calculator_inputs["potential_mapping"]
builder_inputs = {
"options": _get_vasp_options(settings),
"parameters": _get_parameters(settings),
"settings": get_vasp_settings(settings),
"kpoints": _get_kpoints(settings, structure),
"options": options,
"parameters": _get_parameters_Dict(calculator_inputs),
"settings": _get_vasp_settings(calculator_inputs),
"kpoints": _get_kpoints(calculator_inputs, structure),
"clean_workdir": Bool(False),
"structure": structure,
"code": code,
"potential_family": potential_family,
"potential_mapping": potential_mapping,
}
if label:
builder_inputs.update({"metadata": {"label": label}})
potential_family = Str(settings["potential_family"])
potential_mapping = Dict(dict=settings["potential_mapping"])
builder_inputs.update(
{
"potential_family": potential_family,
"potential_mapping": potential_mapping,
}
)
builder_inputs["metadata"] = {"label": label}
elif plugin_name == "quantumespresso.pw":
family = load_group(settings["pseudo_family_string"])
family = load_group(calculator_inputs["pseudo_family_string"])
pseudos = family.get_pseudos(structure=structure)
metadata = {"options": calculator_inputs["options"]}
if label:
metadata["label"] = label
pw = {
"metadata": {"options": _get_options(settings), "label": label},
"parameters": _get_parameters(settings),
"metadata": metadata,
"parameters": _get_parameters_Dict(calculator_inputs),
"structure": structure,
"pseudos": pseudos,
"code": code,
}
builder_inputs = {"kpoints": _get_kpoints(settings, structure), "pw": pw}
builder_inputs = {
"kpoints": _get_kpoints(calculator_inputs, structure),
"pw": pw,
}
elif plugin_name == "quantumespresso.ph":
qpoints = KpointsData()
qpoints.set_kpoints_mesh([1, 1, 1], offset=[0, 0, 0])
metadata = {"options": calculator_inputs["options"]}
if label:
metadata["label"] = label
ph = {
"metadata": {"options": _get_options(settings), "label": label},
"metadata": metadata,
"qpoints": qpoints,
"parameters": _get_parameters(settings),
"parameters": _get_parameters_Dict(calculator_inputs),
"parent_folder": ctx.nac_params_calcs[0].outputs.remote_folder,
"code": code,
}
Expand Down Expand Up @@ -130,10 +133,19 @@ def get_vasp_immigrant_inputs(folder_path, calculator_settings, label=None):
inputs = {}
inputs["code"] = code
inputs["folder_path"] = Str(folder_path)
if "settings" in calculator_settings:
settings = copy.deepcopy(calculator_settings["settings"])
else:
settings = {}
if "parser_settings" in calculator_settings:
inputs["settings"] = Dict(
dict={"parser_settings": calculator_settings["parser_settings"]}
)
if "parser_settings" in settings:
settings["parser_settings"].update(
calculator_settings["parser_settings"]
)
else:
settings["parser_settings"] = calculator_settings["parser_settings"]
if settings:
inputs["settings"] = Dict(dict=settings)
if "options" in calculator_settings:
inputs["options"] = Dict(dict=calculator_settings["options"])
if "metadata" in calculator_settings:
Expand All @@ -154,43 +166,73 @@ def get_vasp_immigrant_inputs(folder_path, calculator_settings, label=None):
return inputs


def _get_options(settings_dict):
return settings_dict["options"]
def _get_parameters_Dict(calculator_inputs):
"""Return parameters for inputs.parameters.
If calculator_inputs["parameters"] is already a Dict,
a new Dict will not be made, and just it will be returned.
"""
if isinstance(calculator_inputs["parameters"], dict):
return Dict(dict=calculator_inputs["parameters"])
else:
return calculator_inputs["parameters"]


def _get_vasp_options(settings):
return Dict(dict=settings["options"])
def _get_vasp_settings(calculator_inputs):
"""Update VASP settings.
If no update of settings and calculator_inputs["settings"] is already a Dict,
a new Dict will not be made, and just it will be returned.
def _get_parameters(settings):
parameters = settings["parameters"]
return Dict(dict=parameters)
"""
updated = False
if "settings" in calculator_inputs.keys():
settings = calculator_inputs["settings"]
else:
settings = {}
if "parser_settings" in calculator_inputs.keys():
settings["parser_settings"] = calculator_inputs["parser_settings"]
updated = True
if (
"parser_settings" not in settings
or "add_forces" not in settings["parser_settings"]
):
settings["parser_settings"].update({"add_forces": True})
updated = True

assert settings

if updated:
return create_vasp_inputs_settings(Dict(dict=settings))
else:
return settings


@calcfunction
def get_vasp_settings(settings):
"""Update VASP settings."""
if "parser_settings" in settings.keys():
parser_settings_dict = settings["parser_settings"]
else:
parser_settings_dict = {}
if "add_forces" not in parser_settings_dict:
parser_settings_dict.update({"add_forces": True})
return Dict(dict={"parser_settings": parser_settings_dict})
def create_vasp_inputs_settings(settings):
"""Store Dict for VaspWorkChain.inputs.settings."""
return Dict(dict=settings.get_dict())


def _get_kpoints(settings, structure):
def _get_kpoints(calculator_inputs, structure):
"""Return KpointsData."""
if "kpoints" in calculator_inputs.keys():
assert isinstance(calculator_inputs["kpoints"], KpointsData)
return calculator_inputs["kpoints"]
kpoints = KpointsData()
kpoints.set_cell_from_structure(structure)
if "kpoints_density" in settings.keys():
kpoints.set_kpoints_mesh_from_density(settings["kpoints_density"])
elif "kpoints_mesh" in settings.keys():
if "kpoints_offset" in settings.keys():
kpoints_offset = settings["kpoints_offset"]
if "kpoints_density" in calculator_inputs.keys():
kpoints.set_kpoints_mesh_from_density(calculator_inputs["kpoints_density"])
elif "kpoints_mesh" in calculator_inputs.keys():
if "kpoints_offset" in calculator_inputs.keys():
kpoints_offset = calculator_inputs["kpoints_offset"]
else:
kpoints_offset = [0.0, 0.0, 0.0]

kpoints.set_kpoints_mesh(settings["kpoints_mesh"], offset=kpoints_offset)
kpoints.set_kpoints_mesh(
calculator_inputs["kpoints_mesh"], offset=kpoints_offset
)
else:
raise InputValidationError(
"no kpoint definition in input. "
Expand Down
6 changes: 0 additions & 6 deletions aiida_phonopy/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@
BandsData = DataFactory("array.bands")


@calcfunction
def select_calculator_settings(calculator_settings, calc_type):
"""Select calculator settings from string."""
return Dict(dict=calculator_settings[calc_type.value])


@calcfunction
def get_remote_fc_calculation_settings(phonon_settings):
"""Create remote force constants phonopy calculation setting.
Expand Down
12 changes: 6 additions & 6 deletions aiida_phonopy/workflows/forces.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def define(cls, spec):
"""Define inputs, outputs, and outline."""
super().define(spec)
spec.input("structure", valid_type=StructureData, required=True)
spec.input("calculator_settings", valid_type=Dict, required=True)
spec.input("calculator_inputs", valid_type=dict, required=True, non_db=True)
spec.input("symmetry_tolerance", valid_type=Float, default=lambda: Float(1e-5))
spec.input("immigrant_calculation_folder", valid_type=Str, required=False)
spec.outline(
Expand Down Expand Up @@ -161,12 +161,12 @@ def run_calculation(self):
"""Run supercell force calculation."""
self.report("calculate supercell forces")
process_inputs = get_calcjob_inputs(
self.inputs.calculator_settings,
self.inputs.calculator_inputs,
self.inputs.structure,
label=self.metadata.label,
)
CalculatorProcess = get_calculator_process(
self.inputs.calculator_settings["code_string"]
self.inputs.calculator_inputs["code_string"]
)
future = self.submit(CalculatorProcess, **process_inputs)
self.report("{} pk = {}".format(self.metadata.label, future.pk))
Expand All @@ -178,7 +178,7 @@ def read_calculation_from_folder(self):
force_folder = self.inputs.immigrant_calculation_folder
inputs = get_vasp_immigrant_inputs(
force_folder.value,
self.inputs.calculator_settings.dict,
self.inputs.calculator_inputs.dict,
label=self.metadata.label,
)
VaspImmigrant = WorkflowFactory("vasp.immigrant")
Expand All @@ -200,14 +200,14 @@ def finalize(self):
"""Finalize force calculation."""
outputs = self.ctx.calc.outputs
self.report("create forces ArrayData")
forces = _get_forces(outputs, self.inputs.calculator_settings["code_string"])
forces = _get_forces(outputs, self.inputs.calculator_inputs["code_string"])
if forces is None:
return self.exit_codes.ERROR_NO_FORCES
else:
self.out("forces", forces)

self.report("create energy ArrayData")
energy = _get_energy(outputs, self.inputs.calculator_settings["code_string"])
energy = _get_energy(outputs, self.inputs.calculator_inputs["code_string"])
if energy is None:
return self.exit_codes.ERROR_NO_ENERGY
else:
Expand Down
12 changes: 6 additions & 6 deletions aiida_phonopy/workflows/nac_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def define(cls, spec):
"""Define inputs, outputs, and outline."""
super().define(spec)
spec.input("structure", valid_type=StructureData, required=True)
spec.input("calculator_settings", valid_type=Dict, required=True)
spec.input("calculator_inputs", valid_type=dict, required=True, non_db=True)
spec.input("symmetry_tolerance", valid_type=Float, default=lambda: Float(1e-5))
spec.input("immigrant_calculation_folder", valid_type=Str, required=False)

Expand Down Expand Up @@ -158,12 +158,12 @@ def initialize(self):
"""Initialize outline control parameters."""
self.report("initialization")
self.ctx.iteration = 0
if "sequence" in self.inputs.calculator_settings.keys():
self.ctx.max_iteration = len(self.inputs.calculator_settings["sequence"])
if "sequence" in self.inputs.calculator_inputs.keys():
self.ctx.max_iteration = len(self.inputs.calculator_inputs["sequence"])
else:
self.ctx.max_iteration = 1

self.ctx.plugin_names = get_plugin_names(self.inputs.calculator_settings)
self.ctx.plugin_names = get_plugin_names(self.inputs.calculator_inputs)

def run_calculation(self):
"""Run NAC params calculation."""
Expand All @@ -172,7 +172,7 @@ def run_calculation(self):
)
label = "nac_params_%d" % self.ctx.iteration
process_inputs = get_calcjob_inputs(
self.inputs.calculator_settings,
self.inputs.calculator_inputs,
self.inputs.structure,
ctx=self.ctx,
label=label,
Expand All @@ -192,7 +192,7 @@ def read_calculation_from_folder(self):
label = "nac_params_%d" % self.ctx.iteration
force_folder = self.inputs.immigrant_calculation_folder
inputs = get_vasp_immigrant_inputs(
force_folder.value, self.inputs.calculator_settings.dict, label=label
force_folder.value, self.inputs.calculator_inputs.dict, label=label
)
VaspImmigrant = WorkflowFactory("vasp.immigrant")
future = self.submit(VaspImmigrant, **inputs)
Expand Down
15 changes: 15 additions & 0 deletions aiida_phonopy/workflows/phonopy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ class BasePhonopyWorkChain(WorkChain, metaclass=ABCMeta):
options : dict
AiiDA calculation options for phonon calculation used when both of
run_phonopy and remote_phonopy are True.
calculator_inputs.force : dict, optional
This is used for supercell force calculation.
calculator_inputs.nac : Dict, optional
This is used for Born effective charges and dielectric constant calculation
in primitive cell. The primitive cell is chosen by phonopy
automatically.
subtract_residual_forces : Bool, optional
Run a perfect supercell force calculation and subtract the residual
forces from forces in supercells with displacements. Default is False.
Expand All @@ -71,6 +77,15 @@ def define(cls, spec):
super().define(spec)
spec.input("structure", valid_type=StructureData, required=True)
spec.input("phonon_settings", valid_type=Dict, required=True)
spec.input_namespace(
"calculator_inputs", help="Inputs passed to force and NAC calculators."
)
spec.input(
"calculator_inputs.force", valid_type=dict, required=False, non_db=True
)
spec.input(
"calculator_inputs.nac", valid_type=dict, required=False, non_db=True
)
spec.input("symmetry_tolerance", valid_type=Float, default=lambda: Float(1e-5))
spec.input("dry_run", valid_type=Bool, default=lambda: Bool(False))
spec.input(
Expand Down

0 comments on commit f8462dc

Please sign in to comment.