Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resubmission: Save output dataframe as HDF5, not Pickle #49

Merged
merged 9 commits into from
Apr 21, 2020
Merged
20 changes: 11 additions & 9 deletions canine/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
from .adapters import AbstractAdapter, ManualAdapter, FirecloudAdapter
from .backends import AbstractSlurmBackend, LocalSlurmBackend, RemoteSlurmBackend, DummySlurmBackend, TransientGCPSlurmBackend, TransientImageSlurmBackend
from .localization import AbstractLocalizer, BatchedLocalizer, LocalLocalizer, RemoteLocalizer, NFSLocalizer
from .utils import check_call
from .utils import check_call, pandas_read_hdf5_buffered, pandas_write_hdf5_buffered
import yaml
import numpy as np
import pandas as pd
from agutil import status_bar
version = '0.8.1'
version = '0.9.0'

ADAPTERS = {
'Manual': ManualAdapter,
Expand Down Expand Up @@ -416,6 +416,8 @@ def make_output_DF(self, batch_id, outputs, cpu_time, prev_acct, localizer = Non
# that don't receive any transformation with transformed columns
df["outputs"] = df["outputs"].agg({ **self.output_map, **identity_map })
except:
df = pd.DataFrame()
print("Error generating output dataframe; see stack trace for details.", file = sys.stderr)
traceback.print_exc()


Expand All @@ -426,11 +428,11 @@ def make_output_DF(self, batch_id, outputs, cpu_time, prev_acct, localizer = Non
# save DF to disk
if isinstance(localizer, AbstractLocalizer):
with localizer.transport_context() as transport:
dest = localizer.reserve_path("results.k9df.pickle").remotepath
dest = localizer.reserve_path("results.k9df.hdf5").remotepath
if not transport.isdir(os.path.dirname(dest)):
transport.makedirs(os.path.dirname(dest))
with transport.open(dest, 'wb') as w:
df.to_pickle(w, compression=None)
pandas_write_hdf5_buffered(df, buf = w, key = "results")
return df

def submit_batch_job(self, entrypoint_path, compute_env, extra_sbatch_args = {}) -> int:
Expand Down Expand Up @@ -459,7 +461,7 @@ def job_avoid(self, localizer: AbstractLocalizer, overwrite: bool = False) -> in
Succeeded jobs are skipped. Failed jobs are reset and rerun
"""
with localizer.transport_context() as transport:
df_path = localizer.reserve_path("results.k9df.pickle").remotepath
df_path = localizer.reserve_path("results.k9df.hdf5").remotepath

#remove all output if specified
if overwrite:
Expand All @@ -472,8 +474,8 @@ def job_avoid(self, localizer: AbstractLocalizer, overwrite: bool = False) -> in
if transport.exists(df_path):
try:
# load in results and job spec dataframes
with transport.open(df_path) as r:
r_df = pd.read_pickle(r, compression=None)
with transport.open(df_path, mode = "rb") as r:
r_df = pandas_read_hdf5_buffered(key = "results", buf = r)
js_df = pd.DataFrame.from_dict(self.job_spec, orient = "index").rename_axis(index = "_job_id")

if r_df.empty or \
Expand Down Expand Up @@ -537,8 +539,8 @@ def job_avoid(self, localizer: AbstractLocalizer, overwrite: bool = False) -> in

return np.count_nonzero(~fail_idx)
except (ValueError, OSError) as e:
print(e)
print("Overwriting output and aborting job avoidance.")
print("Cannot recover preexisting task outputs: " + str(e), file = sys.stderr)
print("Overwriting output and aborting job avoidance.", file = sys.stderr)
transport.rmtree(localizer.staging_dir)
transport.makedirs(localizer.staging_dir)
return 0
Expand Down
2 changes: 1 addition & 1 deletion canine/test/test_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def test_cmd(self):
shell=True
)

@with_timeout(180)
@with_timeout(300)
def test_xargs(self):
"""
Runs a quick test of the xargs orchestrator
Expand Down
27 changes: 27 additions & 0 deletions canine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import paramiko
import shutil
import time
import pandas as pd
import requests

def isatty(*streams: typing.IO) -> bool:
Expand Down Expand Up @@ -315,3 +316,29 @@ def gcp_hourly_cost(mtype: str, preemptible: bool = False, ssd_size: int = 0, hd
)

# rmtree_retry removed in favor of AbstractTransport.rmtree

def pandas_write_hdf5_buffered(df: pd.DataFrame, key: str, buf: io.BufferedWriter):
"""
Write a Pandas dataframe in HDF5 format to a buffer.
"""
with pd.HDFStore(
"/dev/null",
mode = "w",
driver = "H5FD_CORE",
driver_core_backing_store = 0
) as store:
store["results"] = df
buf.write(store._handle.get_file_image())

def pandas_read_hdf5_buffered(key: str, buf: io.BufferedReader) -> pd.DataFrame:
"""
Read a Pandas dataframe in HDF5 format from a buffer.
"""
with pd.HDFStore(
"dummy_hdf5",
mode = "r",
driver = "H5FD_CORE",
driver_core_backing_store = 0,
driver_core_image = buf.read()
) as store:
return store[key]
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
'google-api-python-client>=1.7.11',
'docker>=4.1.0',
'psutil>=5.6.7',
'port-for>=0.4'
'port-for>=0.4',
'tables>=3.6.1'
],
classifiers = [
"Development Status :: 4 - Beta",
Expand Down