Skip to content

Commit

Permalink
Publish BodhiMessages instead of dictionaries.
Browse files Browse the repository at this point in the history
Now that we have bodhi.messages.schemas, it is time to start
publishing messages using them instead of using the unschema'd
dictionaries that we have been using until now. This commit adjusts
our message publishing API to accept these objects, and adjusts
all server code to publish with them instead of with dictionaries.

fixes #3124

Signed-off-by: Randy Barlow <randy@electronsweatshop.com>
  • Loading branch information
bowlofeggs committed Apr 22, 2019
1 parent 016311b commit 5c4cc81
Show file tree
Hide file tree
Showing 21 changed files with 1,478 additions and 1,442 deletions.
32 changes: 32 additions & 0 deletions bodhi/messages/schemas/base.py
Expand Up @@ -21,6 +21,7 @@
messages.
"""

import json
import re
import typing

Expand Down Expand Up @@ -70,6 +71,24 @@ def agent_avatar(self) -> typing.Union[None, str]:
return None
return user_avatar_url(username)

@classmethod
def from_dict(cls, message: dict) -> 'BodhiMessage':
"""
Generate a message based on the given message dictionary.
Args:
message: A dictionary representation of the message you wish to instantiate.
Returns:
A Message.
"""
# Dirty, nasty hack that I feel shame for: use the fedmsg encoder that modifies
# messages quietly if they have objects with __json__ methods on them.
# For now, copy that behavior. In the future, callers should pass
# fedora_messaging.api.Message sub-classes or this whole API should go away.
body = json.loads(json.dumps(message, cls=FedMsgEncoder))

return cls(body=body)

@property
def usernames(self) -> typing.List[str]:
"""
Expand Down Expand Up @@ -231,3 +250,16 @@ def schema() -> dict:
},
'required': ['name']
}


class FedMsgEncoder(json.encoder.JSONEncoder):
"""Encoder with convenience support.
If an object has a ``__json__()`` method, use it to serialize to JSON.
"""

def default(self, obj):
"""Encode objects which don't have a more specific encoding method."""
if hasattr(obj, "__json__"):
return obj.__json__()
return super().default(obj)
46 changes: 46 additions & 0 deletions bodhi/messages/schemas/compose.py
Expand Up @@ -236,3 +236,49 @@ def summary(self) -> str:
A summary for this message.
"""
return f"bodhi composer is waiting for {self.repo} to hit the master mirror"


class RepoDoneV1(BodhiMessage):
"""Sent when a repo is created and ready to be signed or otherwise processed."""

body_schema = {
'id': f'{SCHEMA_URL}/v1/bodhi.repo.done#',
'$schema': 'http://json-schema.org/draft-04/schema#',
'description': 'Schema for message sent when a repo is created and ready to be signed',
'type': 'object',
'properties': {
'agent': {
'type': 'string',
'description': 'The name of the user who started this compose.'
},
'path': {
'type': 'string',
'description': 'The path of the repository that was composed.'
},
'repo': {
'type': 'string',
'description': 'The name of the repository that was composed.'
},
},
'required': ['agent', 'path', 'repo'],
}

topic = "bodhi.repo.done"

@property
def repo(self) -> str:
"""Return the name of the repository being composed."""
return self.body.get('repo')

@property
def summary(self) -> str:
"""
Return a short, human-readable representation of this message.
This should provide a short summary of the message, much like the subject line
of an email.
Returns:
A summary for this message.
"""
return f"bodhi composer is finished building {self.repo}"
35 changes: 35 additions & 0 deletions bodhi/messages/schemas/update.py
Expand Up @@ -173,6 +173,41 @@ def _update(self) -> dict:
return self.body['comment']['update']


class UpdateCompleteStableV1(UpdateMessage):
"""Sent when an update is available in the stable repository."""

body_schema = {
'id': f'{SCHEMA_URL}/v1/bodhi.update.complete.stable#',
'$schema': 'http://json-schema.org/draft-04/schema#',
'description': 'Schema for message sent when an update is pushed stable',
'type': 'object',
'properties': {
'update': UpdateV1.schema(),
},
'required': ['update'],
'definitions': {
'build': BuildV1.schema(),
}
}

topic = "bodhi.update.complete.stable"

@property
def summary(self) -> str:
"""
Return a short, human-readable representation of this message.
This should provide a short summary of the message, much like the subject line
of an email.
Returns:
A summary for this message.
"""
return (
f"{self.update.user.name}'s {truncate(' '.join([b.nvr for b in self.update.builds]))} "
f"bodhi update completed push to {self.update.status}")


class UpdateCompleteTestingV1(UpdateMessage):
"""Sent when an update is available in the testing repository."""

Expand Down
77 changes: 35 additions & 42 deletions bodhi/server/consumers/composer.py
Expand Up @@ -42,7 +42,7 @@
import jinja2
import fedora_messaging


from bodhi.messages.schemas import compose as compose_schemas, update as update_schemas
from bodhi.server import bugs, initialize_db, buildsys, notifications, mail
from bodhi.server.config import config, validate_path
from bodhi.server.exceptions import BodhiException
Expand Down Expand Up @@ -181,10 +181,11 @@ def __call__(self, message: fedora_messaging.api.Message):
Args:
message: The message we are processing. This is how we know what compose jobs to run.
"""
message = message.body["msg"]
message = message.body
resume = message.get('resume', False)
agent = message.get('agent')
notifications.publish(topic="composer.start", msg=dict(agent=agent), force=True)
notifications.publish(compose_schemas.ComposeStartV1.from_dict(dict(agent=agent)),
force=True)

results = []
threads = []
Expand Down Expand Up @@ -354,12 +355,11 @@ def work(self):

log.info('Running ComposerThread(%s)' % self.id)

notifications.publish(
topic="compose.composing",
msg=dict(repo=self.id,
updates=[' '.join([b.nvr for b in u.builds]) for u in self.compose.updates],
agent=self.agent,
ctype=self.ctype.value),
notifications.publish(compose_schemas.ComposeComposingV1.from_dict(
dict(repo=self.id,
updates=[' '.join([b.nvr for b in u.builds]) for u in self.compose.updates],
agent=self.agent,
ctype=self.ctype.value)),
force=True,
)

Expand Down Expand Up @@ -471,15 +471,15 @@ def eject_from_compose(self, update, reason):
koji=buildsys.get_session())
update.request = None
notifications.publish(
topic="update.eject",
msg=dict(
repo=self.id,
update=update,
reason=reason,
request=self.compose.request,
release=self.compose.release,
agent=self.agent,
),
update_schemas.UpdateEjectV1.from_dict(
dict(
repo=self.id,
update=update,
reason=reason,
request=self.compose.request,
release=self.compose.release,
agent=self.agent,
)),
force=True,
)

Expand Down Expand Up @@ -516,9 +516,8 @@ def finish(self, success):
success (bool): True if the compose had been successful, False otherwise.
"""
log.info('Thread(%s) finished. Success: %r' % (self.id, success))
notifications.publish(
topic="compose.complete",
msg=dict(success=success, repo=self.id, agent=self.agent, ctype=self.ctype.value),
notifications.publish(compose_schemas.ComposeCompleteV1.from_dict(dict(
dict(success=success, repo=self.id, agent=self.agent, ctype=self.ctype.value))),
force=True,
)

Expand Down Expand Up @@ -681,12 +680,12 @@ def send_notifications(self):
except OSError: # this can happen when building on koji
agent = 'composer'
for update in self.compose.updates:
topic = 'update.complete.%s' % update.request
notifications.publish(
topic=topic,
msg=dict(update=update, agent=agent),
force=True,
)
messages = {
UpdateRequest.stable: update_schemas.UpdateCompleteStableV1,
UpdateRequest.testing: update_schemas.UpdateCompleteTestingV1
}
message = messages[update.request].from_dict(dict(update=update, agent=agent))
notifications.publish(message, force=True)

@checkpoint
def modify_bugs(self):
Expand Down Expand Up @@ -1222,11 +1221,9 @@ def _wait_for_repo_signature(self):
"""Wait for a repo signature to appear."""
# This message indicates to consumers that the repos are fully created and ready to be
# signed or otherwise processed.
notifications.publish(
topic="repo.done",
msg=dict(repo=self.id, agent=self.agent, path=self.path),
force=True,
)
notifications.publish(compose_schemas.RepoDoneV1.from_dict(
dict(repo=self.id, agent=self.agent, path=self.path)),
force=True)
if config.get('wait_for_repo_sig'):
self.save_state(ComposeState.signing_repo)
sigpaths = []
Expand Down Expand Up @@ -1262,11 +1259,9 @@ def _wait_for_sync(self):
Exception: If no folder other than "source" was found in the compose_path.
"""
log.info('Waiting for updates to hit the master mirror')
notifications.publish(
topic="compose.sync.wait",
msg=dict(repo=self.id, agent=self.agent),
force=True,
)
notifications.publish(compose_schemas.ComposeSyncWaitV1.from_dict(
dict(repo=self.id, agent=self.agent)),
force=True)
compose_path = os.path.join(self.path, 'compose', 'Everything')
checkarch = None
# Find the first non-source arch to check against
Expand Down Expand Up @@ -1299,11 +1294,9 @@ def _wait_for_sync(self):
continue
if newsum == checksum:
log.info("master repomd.xml matches!")
notifications.publish(
topic="compose.sync.done",
msg=dict(repo=self.id, agent=self.agent),
force=True,
)
notifications.publish(compose_schemas.ComposeSyncDoneV1.from_dict(
dict(repo=self.id, agent=self.agent)),
force=True)
return

log.debug("master repomd.xml doesn't match! %s != %s for %r",
Expand Down

0 comments on commit 5c4cc81

Please sign in to comment.