Skip to content

Commit

Permalink
remove s3fs requirement
Browse files Browse the repository at this point in the history
  • Loading branch information
JosephMontoya-TRI committed Apr 6, 2021
1 parent 9357e84 commit 1288c68
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 48 deletions.
105 changes: 57 additions & 48 deletions camd/experiment/dft.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@

import boto3
from botocore.errorfactory import ClientError
from tqdm import tqdm
import pandas as pd
from monty.os import cd
from monty.tempfile import ScratchDir
from pymatgen.io.vasp.outputs import Vasprun
from pymatgen import Composition
from camd.experiment.base import Experiment
from camd.utils.data import QMPY_REFERENCES, QMPY_REFERENCES_HUBBARD, \
get_chemsys
get_chemsys, get_common_prefixes


class OqmdDFTonMC1(Experiment):
Expand Down Expand Up @@ -235,61 +236,69 @@ def fetch_cached(self, candidate_data):
tri_path = os.environ.get("TRI_PATH")
tri_bucket = os.environ.get("TRI_BUCKET")
s3_client = boto3.client("s3")
# Scrub tri_path and replace model with simulation
# to get s3 key
s3_parent = self.parent_dir.replace('model', 'simulation')
s3_parent = s3_parent.replace(tri_path + '/', "")
cached_experiments = pd.DataFrame()
for structure_id, row in candidate_data.iterrows():
# Get all experiment folders
chemsyses = set([get_chemsys(s) for s in candidate_data['structure']])
experiment_dirs = []
for chemsys in chemsyses:
chemsys_dirs = get_common_prefixes(
tri_bucket,
os.path.join(s3_parent, chemsys)
)
experiment_dirs.extend(chemsys_dirs)
for structure_id, row in tqdm(candidate_data.iterrows(), total=len(candidate_data)):
if not structure_id.replace('-', '') in experiment_dirs:
continue
calc_path = os.path.join(
self.parent_dir, get_chemsys(row['structure']),
s3_parent, get_chemsys(row['structure']),
structure_id.replace('-', ''), "_1/")
# Scrub tri_path and replace model with simulation
# to get s3 key
calc_path = calc_path.replace(tri_path + '/', "")
calc_path = calc_path.replace('model', 'simulation')
with ScratchDir('.'):
# Figure out whether prior submission exists
response = s3_client.list_objects_v2(
Bucket=tri_bucket, Prefix=calc_path, Delimiter='/')
if response.get('Contents'):
cached_experiments = cached_experiments.append(row)
# TODO: figure out whether file exists in s3
# TODO: this is a little crude, could use boto3
cached_experiments = cached_experiments.append(row)
# TODO: figure out whether file exists in s3
# TODO: this is a little crude, could use boto3
try:
# import pdb; pdb.set_trace()
vr_path = os.path.join(calc_path, "static", "vasprun.xml")
cmd = "aws s3 cp s3://{}/{} .".format(tri_bucket, vr_path)
subprocess.call(shlex.split(cmd))
vr = Vasprun("vasprun.xml")
vr_dict = vr.as_dict()
delta_e = get_qmpy_formation_energy(
vr_dict["output"]["final_energy_per_atom"],
vr_dict["pretty_formula"],
1,
)
data = {
"status": "SUCCEEDED",
"error": None,
"result": vr,
"delta_e": delta_e,
}
except Exception as e:
error_doc = {}
try:
# import pdb; pdb.set_trace()
vr_path = os.path.join(calc_path, "static", "vasprun.xml")
cmd = "aws s3 cp s3://{}/{} .".format(tri_bucket, vr_path)
subprocess.call(shlex.split(cmd))
vr = Vasprun("vasprun.xml")
vr_dict = vr.as_dict()
delta_e = get_qmpy_formation_energy(
vr_dict["output"]["final_energy_per_atom"],
vr_dict["pretty_formula"],
1,
)
data = {
"status": "SUCCEEDED",
"error": None,
"result": vr,
"delta_e": delta_e,
}
except Exception as e:
error_doc = {}
try:
err_obj = s3_client.get_object(
Bucket=tri_bucket, Key=os.path.join(calc_path, 'err'))
errtxt = err_obj['Body'].read().decode('utf-8')
error_doc.update(
{"mc1_stderr": errtxt}
)
except ClientError:
print('No error file for {}'.format(calc_path))
err_obj = s3_client.get_object(
Bucket=tri_bucket, Key=os.path.join(calc_path, 'err'))
errtxt = err_obj['Body'].read().decode('utf-8')
error_doc.update(
{
"camd_exception": "{}".format(e),
"camd_traceback": traceback.format_exc(),
}
{"mc1_stderr": errtxt}
)
# Dump error docs to avoid Pandas issues with dict values
data = {"status": "FAILED", "error": json.dumps(error_doc)}
update_dataframe_row(cached_experiments, structure_id, data, add_columns=True)
except ClientError:
print('No error file for {}'.format(calc_path))
error_doc.update(
{
"camd_exception": "{}".format(e),
"camd_traceback": traceback.format_exc(),
}
)
# Dump error docs to avoid Pandas issues with dict values
data = {"status": "FAILED", "error": json.dumps(error_doc)}
update_dataframe_row(cached_experiments, structure_id, data, add_columns=True)
return cached_experiments

def submit_dft_calcs_to_mc1(self):
Expand Down
22 changes: 22 additions & 0 deletions camd/utils/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,3 +496,25 @@ def upload_s3_file(key, bucket, filename):
s3_client = boto3.client('s3')
s3_client.upload_file(filename, bucket, key)
return True


def get_common_prefixes(bucket, prefix):
"""
Helper function to get common "subfolders" of folders
in S3
Args:
bucket (str): bucket name
prefix (str): prefix for which to list common prefixes
Returns:
"""
if not prefix.endswith('/'):
prefix += "/"
client = boto3.client('s3')
paginator = client.get_paginator('list_objects')
result = paginator.paginate(Bucket=bucket, Delimiter='/', Prefix=prefix)
return [common_prefix['Prefix'].split('/')[-2]
for common_prefix in result.search("CommonPrefixes")
if common_prefix]

0 comments on commit 1288c68

Please sign in to comment.