Skip to content

Commit

Permalink
Merge pull request #49 from julianhess/hdf5
Browse files Browse the repository at this point in the history
Resubmission: Save output dataframe as HDF5, not Pickle
  • Loading branch information
agraubert committed Apr 21, 2020
2 parents bdfaaec + 0ab18ee commit 15e7dc5
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 11 deletions.
20 changes: 11 additions & 9 deletions canine/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
from .adapters import AbstractAdapter, ManualAdapter, FirecloudAdapter
from .backends import AbstractSlurmBackend, LocalSlurmBackend, RemoteSlurmBackend, DummySlurmBackend, TransientGCPSlurmBackend, TransientImageSlurmBackend
from .localization import AbstractLocalizer, BatchedLocalizer, LocalLocalizer, RemoteLocalizer, NFSLocalizer
from .utils import check_call
from .utils import check_call, pandas_read_hdf5_buffered, pandas_write_hdf5_buffered
import yaml
import numpy as np
import pandas as pd
from agutil import status_bar
version = '0.8.1'
version = '0.9.0'

ADAPTERS = {
'Manual': ManualAdapter,
Expand Down Expand Up @@ -416,6 +416,8 @@ def make_output_DF(self, batch_id, outputs, cpu_time, prev_acct, localizer = Non
# that don't receive any transformation with transformed columns
df["outputs"] = df["outputs"].agg({ **self.output_map, **identity_map })
except:
df = pd.DataFrame()
print("Error generating output dataframe; see stack trace for details.", file = sys.stderr)
traceback.print_exc()


Expand All @@ -426,11 +428,11 @@ def make_output_DF(self, batch_id, outputs, cpu_time, prev_acct, localizer = Non
# save DF to disk
if isinstance(localizer, AbstractLocalizer):
with localizer.transport_context() as transport:
dest = localizer.reserve_path("results.k9df.pickle").remotepath
dest = localizer.reserve_path("results.k9df.hdf5").remotepath
if not transport.isdir(os.path.dirname(dest)):
transport.makedirs(os.path.dirname(dest))
with transport.open(dest, 'wb') as w:
df.to_pickle(w, compression=None)
pandas_write_hdf5_buffered(df, buf = w, key = "results")
return df

def submit_batch_job(self, entrypoint_path, compute_env, extra_sbatch_args = {}) -> int:
Expand Down Expand Up @@ -459,7 +461,7 @@ def job_avoid(self, localizer: AbstractLocalizer, overwrite: bool = False) -> in
Succeeded jobs are skipped. Failed jobs are reset and rerun
"""
with localizer.transport_context() as transport:
df_path = localizer.reserve_path("results.k9df.pickle").remotepath
df_path = localizer.reserve_path("results.k9df.hdf5").remotepath

#remove all output if specified
if overwrite:
Expand All @@ -472,8 +474,8 @@ def job_avoid(self, localizer: AbstractLocalizer, overwrite: bool = False) -> in
if transport.exists(df_path):
try:
# load in results and job spec dataframes
with transport.open(df_path) as r:
r_df = pd.read_pickle(r, compression=None)
with transport.open(df_path, mode = "rb") as r:
r_df = pandas_read_hdf5_buffered(key = "results", buf = r)
js_df = pd.DataFrame.from_dict(self.job_spec, orient = "index").rename_axis(index = "_job_id")

if r_df.empty or \
Expand Down Expand Up @@ -537,8 +539,8 @@ def job_avoid(self, localizer: AbstractLocalizer, overwrite: bool = False) -> in

return np.count_nonzero(~fail_idx)
except (ValueError, OSError) as e:
print(e)
print("Overwriting output and aborting job avoidance.")
print("Cannot recover preexisting task outputs: " + str(e), file = sys.stderr)
print("Overwriting output and aborting job avoidance.", file = sys.stderr)
transport.rmtree(localizer.staging_dir)
transport.makedirs(localizer.staging_dir)
return 0
Expand Down
2 changes: 1 addition & 1 deletion canine/test/test_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def test_cmd(self):
shell=True
)

@with_timeout(180)
@with_timeout(300)
def test_xargs(self):
"""
Runs a quick test of the xargs orchestrator
Expand Down
27 changes: 27 additions & 0 deletions canine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import paramiko
import shutil
import time
import pandas as pd
import requests

def isatty(*streams: typing.IO) -> bool:
Expand Down Expand Up @@ -315,3 +316,29 @@ def gcp_hourly_cost(mtype: str, preemptible: bool = False, ssd_size: int = 0, hd
)

# rmtree_retry removed in favor of AbstractTransport.rmtree

def pandas_write_hdf5_buffered(df: pd.DataFrame, key: str, buf: io.BufferedWriter):
"""
Write a Pandas dataframe in HDF5 format to a buffer.
"""
with pd.HDFStore(
"/dev/null",
mode = "w",
driver = "H5FD_CORE",
driver_core_backing_store = 0
) as store:
store["results"] = df
buf.write(store._handle.get_file_image())

def pandas_read_hdf5_buffered(key: str, buf: io.BufferedReader) -> pd.DataFrame:
"""
Read a Pandas dataframe in HDF5 format from a buffer.
"""
with pd.HDFStore(
"dummy_hdf5",
mode = "r",
driver = "H5FD_CORE",
driver_core_backing_store = 0,
driver_core_image = buf.read()
) as store:
return store[key]
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
'google-api-python-client>=1.7.11',
'docker>=4.1.0',
'psutil>=5.6.7',
'port-for>=0.4'
'port-for>=0.4',
'tables>=3.6.1'
],
classifiers = [
"Development Status :: 4 - Beta",
Expand Down

0 comments on commit 15e7dc5

Please sign in to comment.