diff --git a/fedmsg_migration_tools/bridges.py b/fedmsg_migration_tools/bridges.py index fb9bf48..634a19e 100644 --- a/fedmsg_migration_tools/bridges.py +++ b/fedmsg_migration_tools/bridges.py @@ -18,6 +18,7 @@ import datetime import json import logging +import re import socket import time @@ -31,6 +32,9 @@ _log = logging.getLogger(__name__) +YEAR_PREFIX_RE = re.compile("^[0-9]{4}-") + + def zmq_to_amqp(exchange, zmq_endpoints, topics): """ Connect to a set of ZeroMQ PUB sockets and re-publish the messages to an AMQP @@ -186,13 +190,14 @@ def __call__(self, message): # wrap messages bridged back into ZMQ with it so old consumers don't # explode with KeyErrors. self._message_counter += 1 + msg_id = message.id + if not YEAR_PREFIX_RE.match(msg_id[:5]): + msg_id = "{}-{}".format(datetime.datetime.utcnow().year, msg_id) wrapped_body = { "topic": message.topic, "msg": message._body, "timestamp": int(time.time()), - "msg_id": "{y}-{id}".format( - y=datetime.datetime.utcnow().year, id=message.id - ), + "msg_id": msg_id, "i": self._message_counter, "username": "amqp-bridge", } diff --git a/fedmsg_migration_tools/tests/test_bridges.py b/fedmsg_migration_tools/tests/test_bridges.py index dd12a5d..dbe6a6a 100644 --- a/fedmsg_migration_tools/tests/test_bridges.py +++ b/fedmsg_migration_tools/tests/test_bridges.py @@ -186,3 +186,17 @@ def test_local_or_remote_publish(self): with mock.patch.dict("fedmsg_migration_tools.bridges.fm_config.conf", conf): zmq_bridge = bridges.AmqpToZmq() zmq_bridge.pub_socket.connect.assert_called_with("dummy_endpoint") + + @mock.patch("fedmsg_migration_tools.bridges.zmq.Context", mock.Mock()) + def test_double_year_prefix(self): + """Assert that the year prefix is not added if it is already present.""" + year = datetime.datetime.utcnow().year + zmq_bridge = bridges.AmqpToZmq() + msg = message.Message(topic="my.topic", body={"my": "message"}) + # Add a year prefix to the original message. + msg.id = "{}-{}".format(year, msg.id) + zmq_bridge(msg) + zmq_bridge.pub_socket.send_multipart.assert_called_once() + body = zmq_bridge.pub_socket.send_multipart.call_args_list[-1][0][0][1] + # No year prefix should been added. + assert json.loads(body.decode("utf-8"))["msg_id"] == msg.id