1
- from typing import Protocol
1
+ from typing import Any , Protocol
2
2
3
+ import kombu
3
4
from kombu .mixins import ConsumerMixin
4
5
from mozlog import get_proxy_logger
5
6
6
7
from git_hg_sync .events import Push , Tag
7
8
8
- logger = get_proxy_logger ("pluse_consumer " )
9
+ logger = get_proxy_logger ("pulse_consumer " )
9
10
10
11
11
12
class EventHandler (Protocol ):
12
- def __call__ (self , event : Push | Tag ):
13
+ def __call__ (self , event : Push | Tag ) -> None :
13
14
pass
14
15
15
16
@@ -21,13 +22,19 @@ class PulseWorker(ConsumerMixin):
21
22
event_handler : EventHandler | None
22
23
"""Function that will be called whenever an event is received"""
23
24
24
- def __init__ (self , connection , queue , * , one_shot = False ):
25
+ def __init__ (
26
+ self ,
27
+ connection : kombu .Connection ,
28
+ queue : kombu .Queue ,
29
+ * ,
30
+ one_shot : bool = False ,
31
+ ) -> None :
25
32
self .connection = connection
26
33
self .task_queue = queue
27
34
self .one_shot = one_shot
28
35
29
36
@staticmethod
30
- def parse_entity (raw_entity ) :
37
+ def parse_entity (raw_entity : Any ) -> Push | Tag :
31
38
logger .debug (f"parse_entity: { raw_entity } " )
32
39
message_type = raw_entity .pop ("type" )
33
40
match message_type :
@@ -38,13 +45,17 @@ def parse_entity(raw_entity):
38
45
case _:
39
46
raise EntityTypeError (f"unsupported type { message_type } " )
40
47
41
- def get_consumers (self , Consumer , channel ):
42
- consumer = Consumer (
48
+ def get_consumers (
49
+ self ,
50
+ consumer_class : type [kombu .Consumer ],
51
+ _channel : Any ,
52
+ ) -> list [kombu .Consumer ]:
53
+ consumer = consumer_class (
43
54
self .task_queue , auto_declare = False , callbacks = [self .on_task ]
44
55
)
45
56
return [consumer ]
46
57
47
- def on_task (self , body , message ) :
58
+ def on_task (self , body : Any , message : kombu . Message ) -> None :
48
59
logger .info (f"Received message: { body } " )
49
60
raw_entity = body ["payload" ]
50
61
event = PulseWorker .parse_entity (raw_entity )
0 commit comments