Skip to content

Commit

Permalink
some changes based on savin's comments.
Browse files Browse the repository at this point in the history
- Added changes to task datastore for different reason : (todo) Decouple these
- Added comments to SFN for reference.
- Airflow DAG is no longer dependent on metaflow
  • Loading branch information
valayDave committed Mar 2, 2022
1 parent f32d089 commit a9f0468
Show file tree
Hide file tree
Showing 7 changed files with 357 additions and 41 deletions.
63 changes: 33 additions & 30 deletions metaflow/datastore/task_datastore.py
Expand Up @@ -229,6 +229,38 @@ def init_task(self):
"""
self.save_metadata({self.METADATA_ATTEMPT_SUFFIX: {"time": time.time()}})

def safely_pickle(self, obj, name, only_v4=False):
if only_v4:
encode_type = "gzip+pickle-v4"
if encode_type not in self._encodings:
raise DataException(
"Artifact *%s* requires a serialization encoding that "
"requires Python 3.4 or newer." % name
)
try:
blob = pickle.dumps(obj, protocol=4)
except TypeError as e:
raise UnpicklableArtifactException(name)
else:
try:
blob = pickle.dumps(obj, protocol=2)
encode_type = "gzip+pickle-v2"
except (SystemError, OverflowError):
encode_type = "gzip+pickle-v4"
if encode_type not in self._encodings:
raise DataException(
"Artifact *%s* is very large (over 2GB). "
"You need to use Python 3.4 or newer if you want to "
"serialize large objects." % name
)
try:
blob = pickle.dumps(obj, protocol=4)
except TypeError as e:
raise UnpicklableArtifactException(name)
except TypeError as e:
raise UnpicklableArtifactException(name)
return blob, encode_type

@only_if_not_done
@require_mode("w")
def save_artifacts(self, artifacts_iter, force_v4=False, len_hint=0):
Expand Down Expand Up @@ -264,36 +296,7 @@ def pickle_iter():
if isinstance(force_v4, bool)
else force_v4.get(name, False)
)
if do_v4:
encode_type = "gzip+pickle-v4"
if encode_type not in self._encodings:
raise DataException(
"Artifact *%s* requires a serialization encoding that "
"requires Python 3.4 or newer." % name
)
try:
blob = pickle.dumps(obj, protocol=4)
except TypeError as e:
raise UnpicklableArtifactException(name)
else:
try:
blob = pickle.dumps(obj, protocol=2)
encode_type = "gzip+pickle-v2"
except (SystemError, OverflowError):
encode_type = "gzip+pickle-v4"
if encode_type not in self._encodings:
raise DataException(
"Artifact *%s* is very large (over 2GB). "
"You need to use Python 3.4 or newer if you want to "
"serialize large objects." % name
)
try:
blob = pickle.dumps(obj, protocol=4)
except TypeError as e:
raise UnpicklableArtifactException(name)
except TypeError as e:
raise UnpicklableArtifactException(name)

blob, encode_type = self.safely_pickle(obj, name, only_v4=do_v4)
self._info[name] = {
"size": len(blob),
"type": str(type(obj)),
Expand Down
6 changes: 4 additions & 2 deletions metaflow/plugins/airflow/af_deploy.py
@@ -1,7 +1,9 @@
from metaflow.plugins.airflow.airflow_utils import Workflow
import base64

CONFIG = "{{{metaflow_workflow_compile_params}}}"

dag = Workflow.from_json(CONFIG).compile()
{{{AIRFLOW_UTILS}}}

dag = Workflow.from_json(base64.b64decode(CONFIG).decode("utf-8")).compile()
with dag:
pass

0 comments on commit a9f0468

Please sign in to comment.