Skip to content

WiP: Communication protocol#77

Merged
bradley-erickson merged 56 commits intomasterfrom
communication-protocol
Jul 6, 2023
Merged

WiP: Communication protocol#77
bradley-erickson merged 56 commits intomasterfrom
communication-protocol

Conversation

@pmitros
Copy link
Copy Markdown
Collaborator

@pmitros pmitros commented May 16, 2023

This is a format for how we define query, execution trees, and request data. It's a work-in-progress. Do not merge.

'inputs': self.inputs,
'context': self.context,
'timestamp': datetime.datetime.utcnow().isoformat(),
'traceback': ''.join(traceback.format_tb(self.__traceback__))
Copy link
Copy Markdown
Collaborator Author

@pmitros pmitros Jun 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a comment on data format:

Eventually, this should be a serialized traceback of some kind, rather than a string, with formatting happening in the debug interface.

response = []
for k in keys:
if isinstance(k, dict) and 'key' in k:
item = {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

query_response_element or something?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might also include some kind of comment that the value will be added below. I tend to do this not in English but e.g. in commented pseudocode. E.g.

'value': [To be added after query]

Or:
'value': None # Populated after query

But it's nice to give a sense of the full data structure.

Could also be in a docstring :)

)
kvs_out = await KVS[k['key']]
if kvs_out is None:
kvs_out = k['default']
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A short comment explaining why this is here would be helpful. E.g.
# We haven't run the reducer for this key yet, so we return the default value from the module

:return: The generated keys
:rtype: list
"""
pass
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this raise an UnimplementedException instead of a pass?



@handler(learning_observer.communication_protocol.query.DISPATCH_MODES.KEYS)
def hack_handle_keys(function, STUDENTS=None, STUDENTS_path=None, RESOURCES=None, RESOURCES_path=None):
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a docstring explaining the hack, why we have it, and how we plan to eventually fix it.


def _has_error(node):
'''
Non-recursive function to find and return 'error' value and its path from any dictionary within the node.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know:

  • Why you need this and when it's called
  • What an 'error' value is
  • What a node is

This sort of mile-high context is important.

while queue:
current, path = queue.pop(0)
if 'error' in current:
return current, path
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note this will only return one error, even if there are multiple. Of course, I have no idea right now what kinds of errors this is looking for until I understand the context in the codebase.

for idx, i in enumerate(current[c]):
if isinstance(i, dict):
queue.append((i, path + [c, idx]))
return None, []
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depth-first search will be exponentially slow (in big O). This will only matter if you have a complex, interconnected DAG, but keeping some log of visited nodes would prevent this.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be fixed or (since we have simple DAGs now) left as a TODO, but it should be clearly documented (with appropriate keywords if someone is looking for performance issues).


def _sanitaize_output(variable):
'''
Sanitizes output by removing specified keys from each level of a dictionary or a list of dictionaries.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sanitize <-- spelling

Shouldn't KEYS_TO_REMOVE be a parameter (perhaps with a default?). Why is this considered a sanitization? Would we be leaking sensitive data here, or are we just cleaning unnecessary data?

In the docstring, it's not so much necessary to explain what this does as why we're doing it.

If all we're using this for, the name might be strip_context or similar.


async def execute_dag(endpoint, parameters, functions, target_exports=None):
"""
Execute a flattened directed acyclic graph (DAG).
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain "flattened" here. Or move the flatten function inside so it doesn't need to be flattened. This should clearly document what can and cannot go in.

:param functions: The functions available for execution
:type functions: dict
:return: The result of the execution
:rtype: dict
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea from the comment what any of these are, or their format. Either:

  • Add a doctest (perhaps too complex here); or
  • Clearly point to the example we built before, so people know where to figure this out.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to know the format of all the parameters to all of the functions in enough detail to be able to make use of them. As a new developer, I don't want to need to add print() statements or search the entire codebase to understand what goes in and what comes out.

:rtype: dict
"""
if target_exports is None:
target_exports = []
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea what this is from the code. I also don't know why we'd call this with no exports. Should this parameter really have a default of None? That seems like a crazy default.

if KVS is None:
KVS = learning_observer.kvs.KVS()

async def dispatch_node(node):
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring


async def walk_dict(node_dict):
'''
This will walk a dictionary, and call `visit` on all variables, and make the requisite substitions
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expand docstring (What is a substitution? What does it mean to "walk a dictionary"? What does "visit" do?)

elif isinstance(child_value, dict):
await walk_dict(child_value)

async def visit(node_name):
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docstring

nodes[node_name] = await dispatch_node(nodes[node_name])

# import json
# print('*****', node_name, json.dumps(nodes[node_name], indent=2, default=str)) # useful but produces a lot
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't include commented-out code

visited.add(node_name)
return nodes[node_name]

# return everything in dev mode
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'everything' ==> something explaining that this refers to execution paths / providence. No one will know what a context is.

if learning_observer.settings.RUN_MODE == learning_observer.settings.RUN_MODES.DEV:
return {e: await visit(e) for e in target_nodes}

# otherwise remove context from outputs
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# Remove execution history if in deployed settings, with data flowing back to teacher dashboards

@bradley-erickson bradley-erickson merged commit 3f85805 into master Jul 6, 2023
@bradley-erickson bradley-erickson linked an issue Jul 10, 2023 that may be closed by this pull request
@bradley-erickson bradley-erickson deleted the communication-protocol branch March 10, 2026 12:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Determine communication stream

2 participants