Skip to content

Commit

Permalink
[card-client][card-datastore] data read methods
Browse files Browse the repository at this point in the history
- Added methods to card datastore to read data updates
- Add a private method in card client that can help get the latest data.
- Refactored methods in the code to explictly provide all arguments to disambiguate between data reads and HTML reads.
  • Loading branch information
valayDave committed Oct 18, 2023
1 parent 856627d commit 6890054
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 21 deletions.
14 changes: 12 additions & 2 deletions metaflow/plugins/cards/card_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from metaflow.datastore import FlowDataStore
from metaflow.metaflow_config import CARD_SUFFIX
from .card_resolver import resolve_paths_from_task, resumed_info
from .card_datastore import CardDatastore
from .card_datastore import CardDatastore, CardNameSuffix
from .exception import (
UnresolvableDatastoreException,
IncorrectArguementException,
Expand Down Expand Up @@ -57,6 +57,15 @@ def __init__(
# Tempfile to open stuff in browser
self._temp_file = None

def _get_data(self) -> Optional[dict]:
# currently an internal method to retrieve a card's data.
data_paths = self._card_ds.extract_data_paths(
card_type=self.type, card_hash=self.hash, card_id=self._card_id
)
if len(data_paths) == 0:
return None
return self._card_ds.get_card_data(data_paths[0])

def get(self) -> str:
"""
Retrieves the HTML contents of the card from the
Expand Down Expand Up @@ -172,7 +181,7 @@ def _get_card(self, index):
if index >= self._high:
raise IndexError
path = self._card_paths[index]
card_info = self._card_ds.card_info_from_path(path)
card_info = self._card_ds.info_from_path(path, suffix=CardNameSuffix.CARD)
# todo : find card creation date and put it in client.
return Card(
self._card_ds,
Expand Down Expand Up @@ -252,6 +261,7 @@ def get_cards(
# Exception that the task argument should be of form `Task` or `str`
raise IncorrectArguementException(_TYPE(task))

origin_taskpathspec = None
if follow_resumed:
origin_taskpathspec = resumed_info(task)
if origin_taskpathspec:
Expand Down
109 changes: 90 additions & 19 deletions metaflow/plugins/cards/card_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@
CardInfo = namedtuple("CardInfo", ["type", "hash", "id", "filename"])


class CardNameSuffix:
DATA = "data.json"
CARD = "html"


class CardPathSuffix:
DATA = "runtime"
CARD = "cards"


def path_spec_resolver(pathspec):
splits = pathspec.split("/")
splits.extend([None] * (4 - len(splits)))
Expand Down Expand Up @@ -86,18 +96,24 @@ def __init__(self, flow_datastore, pathspec=None):
self._run_id = run_id
self._step_name = step_name
self._pathspec = pathspec
self._temp_card_save_path = self._get_write_path(base_pth=TEMP_DIR_NAME)
self._temp_card_save_path = self._get_write_path(
base_pth=TEMP_DIR_NAME, suffix=CardPathSuffix.CARD
)

@classmethod
def get_card_location(cls, base_path, card_name, uuid, card_id=None, suffix="html"):
def get_card_location(
cls, base_path, card_name, uuid, card_id=None, suffix=CardNameSuffix.CARD
):
chash = uuid
if card_id is None:
card_file_name = "%s-%s.%s" % (card_name, chash, suffix)
else:
card_file_name = "%s-%s-%s.%s" % (card_name, card_id, chash, suffix)
return os.path.join(base_path, card_file_name)

def _make_path(self, base_pth, pathspec=None, with_steps=False, suffix="cards"):
def _make_path(
self, base_pth, pathspec=None, with_steps=False, suffix=CardPathSuffix.CARD
):
sysroot = base_pth
if pathspec is not None:
# since most cards are at a task level there will always be 4 non-none values returned
Expand Down Expand Up @@ -138,16 +154,27 @@ def _make_path(self, base_pth, pathspec=None, with_steps=False, suffix="cards"):
pth_arr.pop(0)
return os.path.join(*pth_arr)

def _get_write_path(self, base_pth="", suffix="cards"):
def _get_write_path(self, base_pth="", suffix=CardPathSuffix.CARD):
return self._make_path(
base_pth, pathspec=self._pathspec, with_steps=True, suffix=suffix
)

def _get_read_path(self, base_pth="", with_steps=False):
return self._make_path(base_pth, pathspec=self._pathspec, with_steps=with_steps)
def _get_read_path(self, base_pth="", with_steps=False, suffix=CardPathSuffix.CARD):
# Data paths will always be under the path with steps
if suffix == CardPathSuffix.DATA:
return self._make_path(
base_pth=base_pth,
pathspec=self._pathspec,
with_steps=True,
suffix=suffix,
)

return self._make_path(
base_pth, pathspec=self._pathspec, with_steps=with_steps, suffix=suffix
)

@staticmethod
def card_info_from_path(path):
def info_from_path(path, suffix=CardNameSuffix.CARD):
"""
Args:
path (str): The path to the card
Expand All @@ -163,8 +190,8 @@ def card_info_from_path(path):

if len(file_split) not in [2, 3]:
raise Exception(
"Invalid card file name %s. Card file names should be of form TYPE-HASH.html or TYPE-ID-HASH.html"
% card_file_name
"Invalid file name %s. Card/Data file names should be of form TYPE-HASH.%s or TYPE-ID-HASH.%s"
% (card_file_name, suffix, suffix)
)
card_type, card_hash, card_id = None, None, None

Expand All @@ -173,17 +200,17 @@ def card_info_from_path(path):
else:
card_type, card_id, card_hash = file_split

card_hash = card_hash.split(".html")[0]
card_hash = card_hash.split("." + suffix)[0]
return CardInfo(card_type, card_hash, card_id, card_file_name)

def save_data(self, uuid, card_type, json_data, card_id=None):
card_file_name = card_type
loc = self.get_card_location(
self._get_write_path(suffix="runtime"),
self._get_write_path(suffix=CardPathSuffix.DATA),
card_file_name,
uuid,
card_id=card_id,
suffix="data.json",
suffix=CardNameSuffix.DATA,
)
self._backend.save_bytes(
[(loc, BytesIO(json.dumps(json_data).encode("utf-8")))], overwrite=True
Expand All @@ -209,7 +236,11 @@ def save_card(self, uuid, card_type, card_html, card_id=None, overwrite=True):
# It will also easily end up breaking the metaflow-ui (which maybe using a client from an older version).
# Hence, we are writing cards to both paths so that we can introduce breaking changes later in the future.
card_path_with_steps = self.get_card_location(
self._get_write_path(), card_file_name, uuid, card_id=card_id
self._get_write_path(suffix=CardPathSuffix.CARD),
card_file_name,
uuid,
card_id=card_id,
suffix=CardNameSuffix.CARD,
)
if SKIP_CARD_DUALWRITE:
self._backend.save_bytes(
Expand All @@ -218,28 +249,29 @@ def save_card(self, uuid, card_type, card_html, card_id=None, overwrite=True):
)
else:
card_path_without_steps = self.get_card_location(
self._get_read_path(with_steps=False),
self._get_read_path(with_steps=False, suffix=CardPathSuffix.CARD),
card_file_name,
uuid,
card_id=card_id,
suffix=CardNameSuffix.CARD,
)
for cp in [card_path_with_steps, card_path_without_steps]:
self._backend.save_bytes(
[(cp, BytesIO(bytes(card_html, "utf-8")))], overwrite=overwrite
)

return self.card_info_from_path(card_path_with_steps)
return self.info_from_path(card_path_with_steps, suffix=CardNameSuffix.CARD)

def _list_card_paths(self, card_type=None, card_hash=None, card_id=None):
# Check for new cards first
card_paths = []
card_paths_with_steps = self._backend.list_content(
[self._get_read_path(with_steps=True)]
[self._get_read_path(with_steps=True, suffix=CardPathSuffix.CARD)]
)

if len(card_paths_with_steps) == 0:
card_paths_without_steps = self._backend.list_content(
[self._get_read_path(with_steps=False)]
[self._get_read_path(with_steps=False, suffix=CardPathSuffix.CARD)]
)
if len(card_paths_without_steps) == 0:
# If there are no files found on the Path then raise an error of
Expand All @@ -256,7 +288,7 @@ def _list_card_paths(self, card_type=None, card_hash=None, card_id=None):
cards_found = []
for task_card_path in card_paths:
card_path = task_card_path.path
card_info = self.card_info_from_path(card_path)
card_info = self.info_from_path(card_path, suffix=CardNameSuffix.CARD)
if card_type is not None and card_info.type != card_type:
continue
elif card_hash is not None:
Expand All @@ -270,11 +302,34 @@ def _list_card_paths(self, card_type=None, card_hash=None, card_id=None):

return cards_found

def _list_card_data(self, card_type=None, card_hash=None, card_id=None):
card_data_paths = self._backend.list_content(
[self._get_read_path(suffix=CardPathSuffix.DATA, with_steps=False)]
)
data_found = []

for data_path in card_data_paths:
_pth = data_path.path
card_info = self.info_from_path(_pth, suffix=CardNameSuffix.DATA)
if card_type is not None and card_info.type != card_type:
continue
elif card_hash is not None:
if not card_info.hash.startswith(card_hash):
continue
elif card_id is not None and card_info.id != card_id:
continue
if data_path.is_file:
data_found.append(_pth)

return data_found

def create_full_path(self, card_path):
return os.path.join(self._backend.datastore_root, card_path)

def get_card_names(self, card_paths):
return [self.card_info_from_path(path) for path in card_paths]
return [
self.info_from_path(path, suffix=CardNameSuffix.CARD) for path in card_paths
]

def get_card_html(self, path):
with self._backend.load_bytes([path]) as get_results:
Expand All @@ -283,6 +338,13 @@ def get_card_html(self, path):
with open(path, "r") as f:
return f.read()

def get_card_data(self, path):
with self._backend.load_bytes([path]) as get_results:
for _, path, _ in get_results:
if path is not None:
with open(path, "r") as f:
return json.loads(f.read())

def cache_locally(self, path, save_path=None):
"""
Saves the data present in the `path` the `metaflow_card_cache` directory or to the `save_path`.
Expand All @@ -308,6 +370,15 @@ def cache_locally(self, path, save_path=None):
shutil.copy(path, main_path)
return main_path

def extract_data_paths(self, card_type=None, card_hash=None, card_id=None):
return self._list_card_data(
# card_hash is the unique identifier to the card.
# Its no longer the actual hash!
card_type=card_type,
card_hash=card_hash,
card_id=card_id,
)

def extract_card_paths(self, card_type=None, card_hash=None, card_id=None):
return self._list_card_paths(
card_type=card_type, card_hash=card_hash, card_id=card_id
Expand Down

0 comments on commit 6890054

Please sign in to comment.