-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integrating Message Infrastructure into Zambeze #77
Integrating Message Infrastructure into Zambeze #77
Conversation
zambeze/orchestration/processor.py
Outdated
msg_template = self._msg_factory.createTemplate( | ||
MessageType.ACTIVITY, "rsync", "transfer" | ||
) | ||
|
||
msg_template[1]["body"]["cmd"] = [ | ||
{ | ||
"transfer": { | ||
"source": { | ||
"ip": file_url.netloc, | ||
"path": file_url.path, | ||
"user": file_url.username, | ||
}, | ||
"destination": { | ||
"ip": socket.gethostbyname(socket.gethostname()), | ||
"path": str(pathlib.Path().resolve()), | ||
"user": getpass.getuser(), | ||
}, | ||
} | ||
} | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tskluzac here I have tried to demonstrate the use of the template idea that you proposed, doesn't work to well right now because we are returning a dict where the user can just add arbitrary keys on accident. I have been talking with @renan-souza about this. It would be ideal if the template had predefined keys that were immutable this would ensure consistency. If you know a good way to do this it would be appreciated.
await self._queue_client.send(ChannelType.ACTIVITY, immutable_msg) | ||
await self._queue_client.send(ChannelType.ACTIVITY, immutable_msg_move) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the difference between immutable_msg and immutable_msg_move? Unclear from variable names + docstring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first is doing a transfer between Globus imports the second is moving files from and enpoint to somewhere else on the POSIX file system. I should probably add some comments, or do you have some ideas for better variable names?
zambeze/orchestration/processor.py
Outdated
# directory | ||
move_to_file_path_args = { | ||
# Dependency on transfer needs to be defined | ||
msg_template_move[1]["body"]["cmd"] = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious what this is doing. It looks like it's 'muting' the second element of a list. Is there a cleaner way of doing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
msg_template_move is a tuple the first element is the MessageType, and the second element is a dict which contains the message envelope as defined in the detailed design document. Do you have any suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could have a template object that is returned.
class MessageTemplate:
def __init__(self):
self.__type = MessageType.ACTIVITY
self.__message = { ...Template... }
@property
def type(self) -> MessageType:
return self.__type
@property
def data(self):
return self.__message
zambeze/orchestration/processor.py
Outdated
|
||
local_globus_uri = "globus://" | ||
local_globus_uri = local_globus_uri + default_endpoint + os.sep | ||
local_globus_uri = local_globus_uri + source_file_name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally the "+" string concatenation is frowned upon due to unpredictable behavior. Could we use f-strings or even URL/Path-join?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I did not know that. Yes.
zambeze/orchestration/processor.py
Outdated
) | ||
|
||
msg_template[1]["body"]["cmd"] = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Repeat comment from earlier (are we editing second element of list? and can this be done cleaner)
Ideally the only thing you need to work with in this PR are the MessageFactory, the AbstractMessage and the template. # Globus example
plugins = Plugins()
msg_factory = MessageFactory(plugins)
activity_template = msg_factory.createTemplate(
MessageType.ACTIVITY,
ActivityType.PLUGIN,
args={"plugin": "globus", "action": "transfer"},
)
activity_template[1].message_id = str(uuid.uuid4())
activity_template[1].activity_id = str(uuid.uuid4())
activity_template[1].agent_id = str(uuid.uuid4())
activity_template[1].campaign_id = str(uuid.uuid4())
activity_template[1].credential = {}
activity_template[1].submission_time = str(int(time.time()))
activity_template[1].body.parameters.transfer.items[0].source = ""
activity_template[1].body.parameters.transfer.items[0].destination = ""
activity_template[1].needs = []
# Will check that the required message values are filled
msg = factory.create(activity_template)
# Will check that the msg can be executed on local machine
results = plugins.check(msg)
# Will run the activity
plugins.run(msg) This behavior has been added and integrated both in the processor and the agent files. |
@wilkinson This is ready for your input. @tskluzac I have tried to address your concerns here as well. |
…n-test Working unit tests for shell plugin
First step, in integrating the messaging infrastructure into Zambeze.