Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/codegen/extensions/events/linear.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
self.access_token = os.environ["LINEAR_ACCESS_TOKEN"] # move to extensions config.
self.signing_secret = os.environ["LINEAR_SIGNING_SECRET"]
self.linear_team_id = os.environ["LINEAR_TEAM_ID"]
self.registered_handlers = {}

Check failure on line 27 in src/codegen/extensions/events/linear.py

View workflow job for this annotation

GitHub Actions / mypy

error: Need type annotation for "registered_handlers" (hint: "registered_handlers: dict[<type>, <type>] = ...") [var-annotated]
self._webhook_url = None

def subscribe_handler_to_webhook(self, handler: RegisteredWebhookHandler):

Check failure on line 30 in src/codegen/extensions/events/linear.py

View workflow job for this annotation

GitHub Actions / mypy

error: Signature of "subscribe_handler_to_webhook" incompatible with supertype "EventHandlerManagerProtocol" [override]
client = LinearClient(access_token=self.access_token)
web_url = modal.Function.from_name(app_name=self.app.name, name=handler.handler_func.__qualname__).web_url

Check failure on line 32 in src/codegen/extensions/events/linear.py

View workflow job for this annotation

GitHub Actions / mypy

error: Argument "app_name" to "from_name" of "Function" has incompatible type "str | None"; expected "str" [arg-type]
result = client.register_webhook(team_id=self.linear_team_id, webhook_url=web_url, enabled=True, resource_types=[handler.event_name], secret=self.signing_secret)

Check failure on line 33 in src/codegen/extensions/events/linear.py

View workflow job for this annotation

GitHub Actions / mypy

error: Argument "webhook_url" to "register_webhook" of "LinearClient" has incompatible type "str | None"; expected "str" [arg-type]
return result

def subscribe_all_handlers(self):
Expand All @@ -39,7 +39,7 @@
result = self.subscribe_handler_to_webhook(handler=self.registered_handlers[handler_key])
handler.webhook_id = result

def unsubscribe_handler_to_webhook(self, registered_handler: RegisteredWebhookHandler):

Check failure on line 42 in src/codegen/extensions/events/linear.py

View workflow job for this annotation

GitHub Actions / mypy

error: Signature of "unsubscribe_handler_to_webhook" incompatible with supertype "EventHandlerManagerProtocol" [override]
webhook_id = registered_handler.webhook_id

client = LinearClient(access_token=self.access_token)
Expand All @@ -55,7 +55,7 @@
for handler in self.registered_handlers:
self.unsubscribe_handler_to_webhook(self.registered_handlers[handler])

def event(self, event_name):
def event(self, event_name, should_handle: Callable[[dict], bool] | None = None):
"""Decorator for registering an event handler.

:param event_name: The name of the event to handle.
Expand All @@ -71,7 +71,12 @@

@functools.wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
should_handle_result = should_handle(*args, **kwargs) if should_handle else True
if should_handle is None or should_handle_result:
return func(*args, **kwargs)
else:
logger.info(f"Skipping event {event_name} for {func_name}")
return None

return wrapper

Expand Down
Loading