-
Notifications
You must be signed in to change notification settings - Fork 83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ability to save shapes #341
Changes from all commits
ac3c250
34084cc
e8a6d98
907cf64
86842e6
651c440
fc25940
1357f5d
44358ee
f146c77
5906e5a
681e35c
5dc47ff
c775942
355be0b
3eb0202
c14a67e
d12b824
972d95a
850cc44
2c44796
1b09b8e
f4106f3
78b67d6
2515a2d
7f3ea4e
cdf6578
d16d1de
384b71c
cd8a4d1
c4881b7
b5fc689
dd434c6
4fe8df0
fa664d3
b5b29b1
131ec44
dee7106
5c89fa4
a893d91
1f94933
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -420,6 +420,17 @@ def _prepare_collections(self): | |
self.prepared_collections = True | ||
|
||
#### End of Save Manager methods #### | ||
@staticmethod | ||
def _close_given_writer_map(writer_dict): | ||
NihalHarish marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# Delete all the dist training writers | ||
to_delete_writers = [] | ||
for key, writer in writer_dict.items(): | ||
# close calls flush | ||
writer.close() | ||
to_delete_writers.append(key) | ||
|
||
for key in to_delete_writers: | ||
del writer_dict[key] | ||
|
||
def _close_writers(self) -> None: | ||
if self.dry_run: | ||
|
@@ -433,16 +444,7 @@ def _close_writers(self) -> None: | |
self.writer.close() | ||
self.writer = None | ||
|
||
to_delete_writers = [] | ||
|
||
# Delete all the tb writers | ||
for mode, writer in self.tb_writers.items(): | ||
if writer is not None: | ||
writer.flush() | ||
writer.close() | ||
to_delete_writers.append(mode) | ||
for mode in to_delete_writers: | ||
del self.tb_writers[mode] | ||
self._close_given_writer_map(self.tb_writers) | ||
|
||
def _initialize_writers(self, only_initialize_if_missing=False) -> None: | ||
# Function is overridden in smdebug/tensorflow/base_hook.py | ||
|
@@ -470,8 +472,12 @@ def _initialize_writers(self, only_initialize_if_missing=False) -> None: | |
if self.save_all_workers is False: | ||
if self.worker != self.chief_worker: | ||
return | ||
|
||
self.writer = FileWriter(trial_dir=self.out_dir, step=self.step, worker=self.worker) | ||
|
||
def _get_main_writer(self) -> List[FileWriter]: | ||
return [self.writer] if self.writer else [] | ||
|
||
def _get_writers(self, tensor_name, tensor_ref=None) -> List[FileWriter]: | ||
""" | ||
:param tensor_name: | ||
|
@@ -480,7 +486,7 @@ def _get_writers(self, tensor_name, tensor_ref=None) -> List[FileWriter]: | |
""" | ||
if self.save_all_workers is False and self.worker != self.chief_worker: | ||
return [] | ||
return [self.writer] if self.writer else [] | ||
return self._get_main_writer() | ||
|
||
def _maybe_get_tb_writer(self) -> Optional[FileWriter]: | ||
""" Returns a FileWriter object if `hook.tensorboard_dir` has been specified, else None. | ||
|
@@ -749,6 +755,31 @@ def _write_raw_tensor(self, tensor_name, tensor_value, save_collections, tensor_ | |
self._write_raw_tensor_simple(tensor_name, tensor_value, tensor_ref=tensor_ref) | ||
break | ||
|
||
def _write_shape(self, tensor_name, tensor_value, save_collections, tensor_ref=None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if tensor_value is always going to be a tuple, can we add type annotations to this function? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tensor value is a framework data format, it's not a tuple here. It becomes a tuple in this function. |
||
writers = self._get_writers(tensor_name, tensor_ref=tensor_ref) | ||
for s_col in save_collections: | ||
reduction_config = s_col.reduction_config | ||
if self.dry_run is False and reduction_config.save_shape is True: | ||
numpy_tensor_value = self._make_numpy_array(tensor_value) | ||
this_size, this_shape = size_and_shape(numpy_tensor_value) | ||
# In TF Keras and Variables in all interfaces of TF, sometimes we output tensors with | ||
# more meaningful names than the origina name. Outputting | ||
# both Smdebug given name and original name in such cases | ||
if tensor_ref is not None and tensor_ref.tf_obj is not None: | ||
original_name = tensor_ref.tf_obj.name | ||
else: | ||
original_name = None | ||
Comment on lines
+768
to
+771
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add comments explaining the need for this if-else block? Which framework and which mode of execution requires this check? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||
|
||
for writer in writers: | ||
writer.write_shape( | ||
tensor_name, | ||
this_shape, | ||
self.mode, | ||
self.mode_steps[self.mode], | ||
original_name=original_name, | ||
) | ||
break | ||
|
||
def _write_raw_tensor_simple(self, tensor_name, tensor_value, tensor_ref=None, timestamp=None): | ||
# tensor_ref is used by TF | ||
# todo: if fp16, check perf of saving as fp16 in proto vs as fp32 | ||
|
@@ -828,6 +859,9 @@ def _write_for_tensor(self, tensor_name, tensor_value, save_collections, tensor_ | |
:param save_collections: list of collections which are being saved for this step | ||
""" | ||
self._log_save(tensor_name, save_collections) | ||
|
||
self._write_shape(tensor_name, tensor_value, save_collections, tensor_ref=tensor_ref) | ||
|
||
# write reductions defined for collections this tensor may be part of | ||
self._write_reductions(tensor_name, tensor_value, save_collections, tensor_ref=tensor_ref) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,7 @@ | |
MISSING_EVENT_FILE_RETRY_LIMIT, | ||
MISSING_EVENT_FILE_RETRY_LIMIT_KEY, | ||
) | ||
from smdebug.core.locations import IndexFileLocationUtils, TensorLocation | ||
from smdebug.core.locations import IndexFileLocationUtils, TensorLocation, TensorShape | ||
from smdebug.core.logger import get_logger | ||
from smdebug.core.modes import ModeKeys | ||
from smdebug.core.s3_utils import list_s3_objects | ||
|
@@ -120,12 +120,22 @@ def fetch_tensor_value(self, tensor_location: TensorLocation): | |
def list_event_files(self, start_after_prefix): | ||
pass | ||
|
||
@abstractmethod | ||
NihalHarish marked this conversation as resolved.
Show resolved
Hide resolved
|
||
def load_tensor_data_from_index_files( | ||
self, start_after_key=None, range_steps=None | ||
) -> Tuple[Dict[str, Dict[int, Dict[str, TensorLocation]]], str]: | ||
"""Return a triply nested dict referring to tensor data.""" | ||
|
||
responses, steps, last_index_token, workers = self.read_index_files( | ||
start_after_key, range_steps | ||
) | ||
|
||
tensor_data = {} | ||
for step, response, worker in zip(steps, responses, workers): | ||
tensor_data = self._update_tensors_from_json( | ||
tensor_data, step, response, self.path, worker | ||
) | ||
return tensor_data, last_index_token | ||
|
||
@abstractmethod | ||
def _is_event_file_present(self, file_name) -> bool: | ||
pass | ||
|
@@ -203,8 +213,10 @@ def _validate(index_dict): | |
raise IndexReaderException("meta section is not present") | ||
if len(index_dict["meta"]) == 0: | ||
raise IndexReaderException("meta section is empty") | ||
if "tensor_payload" not in index_dict: | ||
raise IndexReaderException("tensor_payload section is not present") | ||
if "tensor_payload" not in index_dict and "shape_payload" not in index_dict: | ||
raise IndexReaderException( | ||
"neither tensor_payload nor shape_payload sections are present" | ||
) | ||
|
||
def _update_tensors_from_json( | ||
self, index_tensors_dict, step, response: bytes, path, worker | ||
|
@@ -233,28 +245,41 @@ def _update_tensors_from_json( | |
mode = index_meta["mode"] | ||
mode = ModeKeys[mode.strip()] | ||
mode_step = index_meta["mode_step"] | ||
event_file_name = os.path.join(path, index_meta["event_file_name"]) | ||
tensors = index_dict["tensor_payload"] | ||
for tensor in tensors: | ||
tensor_name = tensor["tensorname"] | ||
start_idx = tensor["start_idx"] | ||
length = tensor["length"] | ||
tensor_location = TensorLocation( | ||
tensor_name, mode, mode_step, event_file_name, start_idx, length, worker | ||
) | ||
|
||
to_update_index_dict = [] | ||
|
||
if "tensor_payload" in index_dict and len(index_dict["tensor_payload"]): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There can be empty payload |
||
event_file_name = os.path.join(path, index_meta["event_file_name"]) | ||
for tensor in index_dict["tensor_payload"]: | ||
tensor_name = tensor["tensorname"] | ||
start_idx = tensor["start_idx"] | ||
length = tensor["length"] | ||
tensor_location = TensorLocation( | ||
tensor_name, mode, mode_step, event_file_name, start_idx, length, worker | ||
) | ||
to_update_index_dict.append((tensor_name, step, tensor_location)) | ||
|
||
if "shape_payload" in index_dict and len(index_dict["shape_payload"]): | ||
for tensor in index_dict["shape_payload"]: | ||
tensor_name = tensor["tensorname"] | ||
original_name = tensor["originalname"] | ||
shape = tensor["shape"] | ||
ts = TensorShape(tensor_name, mode, mode_step, shape, original_name) | ||
to_update_index_dict.append((tensor_name, step, ts)) | ||
|
||
for tu in to_update_index_dict: | ||
tensor_name, step, obj = tu | ||
if isinstance(obj, TensorLocation): | ||
obj_dict = {"tensor_location": obj} | ||
elif isinstance(obj, TensorShape): | ||
obj_dict = {"tensor_shape": obj} | ||
if tensor_name in index_tensors_dict: | ||
if step in index_tensors_dict[tensor_name]: | ||
index_tensors_dict[tensor_name][step].update( | ||
{worker: {"tensor_location": tensor_location}} | ||
) | ||
index_tensors_dict[tensor_name][step].update({worker: obj_dict}) | ||
else: | ||
index_tensors_dict[tensor_name].update( | ||
{step: {worker: {"tensor_location": tensor_location}}} | ||
) | ||
index_tensors_dict[tensor_name].update({step: {worker: obj_dict}}) | ||
else: | ||
index_tensors_dict[tensor_name] = { | ||
step: {worker: {"tensor_location": tensor_location}} | ||
} | ||
index_tensors_dict[tensor_name] = {step: {worker: obj_dict}} | ||
Comment on lines
+278
to
+282
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this simply a lint change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just made the innermost dict a variable |
||
return index_tensors_dict | ||
|
||
|
||
|
@@ -285,22 +310,6 @@ def fetch_tensor_value(self, tensor_location: TensorLocation) -> np.ndarray: | |
tensor_name, step, tensor_data, mode, mode_step = tensor_tuple | ||
return tensor_data | ||
|
||
def load_tensor_data_from_index_files( | ||
self, start_after_key=None, range_steps=None | ||
) -> Tuple[Dict[str, Dict[int, Dict[str, TensorLocation]]], str]: | ||
"""Return a triply nested dict referring to tensor data.""" | ||
|
||
responses, steps, last_index_token, workers = self.read_index_files( | ||
start_after_key, range_steps | ||
) | ||
|
||
tensor_data = {} | ||
for step, response, worker in zip(steps, responses, workers): | ||
tensor_data = self._update_tensors_from_json( | ||
tensor_data, step, response, self.path, worker | ||
) | ||
return tensor_data, last_index_token | ||
|
||
def read_index_files( | ||
self, start_after_key: str, range_steps=None | ||
) -> Tuple[List[bytes], list, str, List[str]]: | ||
|
@@ -398,21 +407,6 @@ def fetch_tensor_value(self, tensor_location: TensorLocation) -> np.ndarray: | |
tensor_name, step, tensor_data, mode, mode_step = tensor_tuple | ||
return tensor_data | ||
|
||
def load_tensor_data_from_index_files( | ||
self, start_after_key=None, range_steps=None | ||
) -> Tuple[Dict[str, Dict[int, Dict[str, TensorLocation]]], str]: | ||
"""Return a triply nested dict referring to tensor data.""" | ||
|
||
responses, steps, last_index_token, workers = self.read_index_files( | ||
start_after_key, range_steps | ||
) | ||
tensor_data = {} | ||
for step, response, worker in zip(steps, responses, workers): | ||
tensor_data = self._update_tensors_from_json( | ||
tensor_data, step, response, self.path, worker | ||
) | ||
return tensor_data, last_index_token | ||
|
||
def read_index_files( | ||
self, start_after_key: str, range_steps=None | ||
) -> Tuple[List[bytes], list, str, List[str]]: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this diff incorrect?
If not, why have you changed the API from
values
toshapes
?Is this not a breaking change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This diff looks weird. I copied values contents and modified it become shapes. And moved them to group them appropriately. I didnt remove anything