feat(email): add Kafka-backed Mailgun pipeline for signup and forgot-…#38
feat(email): add Kafka-backed Mailgun pipeline for signup and forgot-…#38aniebietafia merged 3 commits intomainfrom
Conversation
…password Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughImplements an asynchronous email pipeline: adds EmailProducerService and EmailConsumerWorker, Mailgun sender and Jinja2 renderer, email templates, schema/config updates, signup and forgot-password endpoints enqueueing email events, Kafka registration, and test coverage adjustments. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant AuthEndpoint as Auth Endpoint
participant EmailProducer as EmailProducerService
participant Kafka as Kafka Broker
participant EmailConsumer as EmailConsumerWorker
participant Renderer as EmailTemplateRenderer
participant Mailgun as Mailgun API
rect rgba(100,150,200,0.5)
Client->>AuthEndpoint: POST /signup or /forgot-password
AuthEndpoint->>AuthEndpoint: create/lookup user, build link
AuthEndpoint->>EmailProducer: send_email(to, subject, template, data/html_body)
EmailProducer->>Kafka: publish EmailEvent (notifications.email)
end
rect rgba(200,150,100,0.5)
Kafka->>EmailConsumer: deliver EmailEvent
alt html_body present
EmailConsumer->>Mailgun: send(to, subject, html_body)
else render required
EmailConsumer->>Renderer: render(template, data)
Renderer->>EmailConsumer: html
EmailConsumer->>Mailgun: send(to, subject, html)
end
Mailgun->>EmailConsumer: response (success / transient error)
EmailConsumer->>EmailConsumer: log result
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment Tip CodeRabbit can scan for known vulnerabilities in your dependencies using OSV Scanner.OSV Scanner will automatically detect and report security vulnerabilities in your project's dependencies. No additional configuration is required. |
…password Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (1)
tests/test_auth/test_auth_signup.py (1)
95-95: Strengthen assertions by validatingsend_emailcall arguments.Current checks only verify call count. Asserting key fields (e.g., recipient and template) would better catch integration regressions.
Also applies to: 173-174
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_auth/test_auth_signup.py` at line 95, Enhance the test assertions to verify the arguments passed to email_producer_mock.send_email instead of only checking call count: capture the awaited call (e.g., using email_producer_mock.send_email.await_args or email_producer_mock.send_email.assert_awaited_once_with) and assert key fields such as recipient/email address and template name exist and match expected values; apply the same stronger assertions for the other occurrence around lines 173-174 so both signup-related send_email calls validate their payload (recipient, template, and any critical template vars).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In @.env.example:
- Around line 40-44: Move the FRONTEND_BASE_URL key so it appears before
MAILGUN_API_KEY in .env.example to satisfy dotenv-linter ordering; update the
block containing FRONTEND_BASE_URL, MAILGUN_API_KEY, MAILGUN_DOMAIN,
MAILGUN_FROM_ADDRESS and MAILGUN_TIMEOUT_SECONDS so keys are in the expected
order (with FRONTEND_BASE_URL preceding MAILGUN_API_KEY) and keep their
values/format unchanged.
In `@app/api/v1/endpoints/auth.py`:
- Around line 37-39: The verification_link currently embeds an ephemeral uuid4()
that is neither persisted nor signed, so the verify/reset endpoints cannot
validate or expire it; change this to generate a verifiable token (either a
stored token with TTL or a backend-signed token) and include that token in the
link. Concretely: replace the inline uuid4() usage in verification_link (and the
similar reset link around lines 69-70) with a token produced by either (a)
creating and persisting an EmailVerificationToken (or PasswordResetToken) record
tied to user.id with an expires_at and a securely generated token value, then
use that token string in the URL and have the verify/reset handlers look up,
validate expiry, and revoke the DB token; or (b) issue a signed JWT containing
user id and exp, sign with your server key, put the JWT in the URL and validate
signature+exp in the callback. Ensure token creation, storage/revocation, and
validation logic are added to functions handling email generation and the
corresponding verify/reset endpoints (so the callback can reliably validate and
expire tokens).
- Line 70: The long f-string assigning reset_link is triggering Ruff E501; break
the expression across lines using implicit concatenation in parentheses or build
the URL in parts so the line length is under the limit. Locate the reset_link
assignment (reset_link =
f"{settings.FRONTEND_BASE_URL}/reset-password?user={user.id}&token={uuid4()}")
and rewrite it into a multi-line expression (e.g., wrap the f-string in
parentheses or compose base, path, and query variables) while preserving the
same values from settings.FRONTEND_BASE_URL, user.id, and uuid4().
In `@app/services/email_consumer.py`:
- Around line 48-51: The send() method in app.services.email_consumer (async def
send) currently returns normally when Mailgun creds are missing or when Mailgun
returns 4xx, causing handle() to log success incorrectly; change send() to
either raise a specific exception (e.g., EmailDeliveryError) for
skipped/misconfigured or rejected (4xx) cases OR return an explicit result enum
(e.g., EmailDeliveryResult {SENT, SKIPPED, REJECTED, FAILED}) and include the
HTTP status and body for non-2xx responses; update the caller handle() to check
that returned result == SENT (or catch EmailDeliveryError) before logging
"Dispatched email event..." and only log success on true delivery, while
preserving error logging for other cases.
- Around line 28-37: The render() method currently swallows TemplateNotFound and
returns an empty string which lets handle() treat the message as processed;
instead, when self._environment.get_template(...) raises TemplateNotFound,
re-raise that exception (or raise a specific EmailTemplateMissingError) so the
worker fails fast / triggers retry/DLQ logic rather than returning ""; update
handle() to not swallow this exception and to route the record to the DLQ or let
it bubble to the worker framework; apply the same change to the other identical
template-loading block around the second occurrence (the block at the other
occurrence mentioned) so missing templates always cause a hard failure or DLQ
routing.
In `@app/services/email_producer.py`:
- Around line 34-36: The code currently uses the raw recipient address as Kafka
key and logs it (see kafka_manager via get_kafka_manager(),
producer.send(self._topic, event, key=to) and logger.info), which leaks PII;
change to compute a stable non-PII key (e.g., hash of a normalized email) and
pass that to producer.send instead of `to`, and redact the log entry (e.g., log
a masked or hashed form or just the recipient domain) in logger.info while
keeping template and topic info; ensure normalization and hashing happen in the
same function/method that sends the message so ordering remains stable
per-recipient.
---
Nitpick comments:
In `@tests/test_auth/test_auth_signup.py`:
- Line 95: Enhance the test assertions to verify the arguments passed to
email_producer_mock.send_email instead of only checking call count: capture the
awaited call (e.g., using email_producer_mock.send_email.await_args or
email_producer_mock.send_email.assert_awaited_once_with) and assert key fields
such as recipient/email address and template name exist and match expected
values; apply the same stronger assertions for the other occurrence around lines
173-174 so both signup-related send_email calls validate their payload
(recipient, template, and any critical template vars).
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c846f59a-4151-43ce-9262-7712a1da8da5
📒 Files selected for processing (15)
.env.exampleapp/api/v1/endpoints/auth.pyapp/core/config.pyapp/kafka/manager.pyapp/kafka/schemas.pyapp/schemas/auth.pyapp/services/email_consumer.pyapp/services/email_producer.pyapp/templates/email/password_reset.htmlapp/templates/email/verification.htmldocs/email_service.mdtests/test_auth/test_auth_signup.pytests/test_kafka/test_email_consumer.pytests/test_kafka/test_email_producer_service.pytests/test_kafka/test_schemas.py
💤 Files with no reviewable changes (1)
- docs/email_service.md
| MAILGUN_API_KEY= | ||
| MAILGUN_DOMAIN= | ||
| MAILGUN_FROM_ADDRESS=no-reply@fluentmeet.com | ||
| MAILGUN_TIMEOUT_SECONDS=10 | ||
| FRONTEND_BASE_URL=http://localhost:3000 |
There was a problem hiding this comment.
Fix dotenv key ordering to satisfy linter.
FRONTEND_BASE_URL should be moved before MAILGUN_API_KEY to address the current dotenv-linter warning.
Proposed reorder
-# Email (SES/Mailgun/Resend)
-MAILGUN_API_KEY=
-MAILGUN_DOMAIN=
-MAILGUN_FROM_ADDRESS=no-reply@fluentmeet.com
-MAILGUN_TIMEOUT_SECONDS=10
-FRONTEND_BASE_URL=http://localhost:3000
+# Email (SES/Mailgun/Resend)
+FRONTEND_BASE_URL=http://localhost:3000
+MAILGUN_API_KEY=
+MAILGUN_DOMAIN=
+MAILGUN_FROM_ADDRESS=no-reply@fluentmeet.com
+MAILGUN_TIMEOUT_SECONDS=10
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| MAILGUN_API_KEY= | |
| MAILGUN_DOMAIN= | |
| MAILGUN_FROM_ADDRESS=no-reply@fluentmeet.com | |
| MAILGUN_TIMEOUT_SECONDS=10 | |
| FRONTEND_BASE_URL=http://localhost:3000 | |
| FRONTEND_BASE_URL=http://localhost:3000 | |
| MAILGUN_API_KEY= | |
| MAILGUN_DOMAIN= | |
| MAILGUN_FROM_ADDRESS=no-reply@fluentmeet.com | |
| MAILGUN_TIMEOUT_SECONDS=10 |
🧰 Tools
🪛 dotenv-linter (4.0.0)
[warning] 44-44: [UnorderedKey] The FRONTEND_BASE_URL key should go before the MAILGUN_API_KEY key
(UnorderedKey)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.env.example around lines 40 - 44, Move the FRONTEND_BASE_URL key so it
appears before MAILGUN_API_KEY in .env.example to satisfy dotenv-linter
ordering; update the block containing FRONTEND_BASE_URL, MAILGUN_API_KEY,
MAILGUN_DOMAIN, MAILGUN_FROM_ADDRESS and MAILGUN_TIMEOUT_SECONDS so keys are in
the expected order (with FRONTEND_BASE_URL preceding MAILGUN_API_KEY) and keep
their values/format unchanged.
| verification_link = ( | ||
| f"{settings.FRONTEND_BASE_URL}/verify-email?user={user.id}&token={uuid4()}" | ||
| ) |
There was a problem hiding this comment.
Use a verifiable token in these links.
These UUIDs are generated inline but never persisted or signed in this flow. The verify/reset callback has no reliable way to validate or expire them as-is; use a stored token with TTL or a backend-signed token instead.
Also applies to: 69-70
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/api/v1/endpoints/auth.py` around lines 37 - 39, The verification_link
currently embeds an ephemeral uuid4() that is neither persisted nor signed, so
the verify/reset endpoints cannot validate or expire it; change this to generate
a verifiable token (either a stored token with TTL or a backend-signed token)
and include that token in the link. Concretely: replace the inline uuid4() usage
in verification_link (and the similar reset link around lines 69-70) with a
token produced by either (a) creating and persisting an EmailVerificationToken
(or PasswordResetToken) record tied to user.id with an expires_at and a securely
generated token value, then use that token string in the URL and have the
verify/reset handlers look up, validate expiry, and revoke the DB token; or (b)
issue a signed JWT containing user id and exp, sign with your server key, put
the JWT in the URL and validate signature+exp in the callback. Ensure token
creation, storage/revocation, and validation logic are added to functions
handling email generation and the corresponding verify/reset endpoints (so the
callback can reliably validate and expire tokens).
| def render(self, template_name: str, data: dict[str, object]) -> str: | ||
| try: | ||
| template = self._environment.get_template(f"{template_name}.html") | ||
| except TemplateNotFound: | ||
| logger.warning( | ||
| "Template '%s' is missing, falling back to raw html", | ||
| template_name, | ||
| ) | ||
| return "" | ||
| return template.render(**data) |
There was a problem hiding this comment.
Fail fast when the email template is missing.
render() converts TemplateNotFound into an empty body, and handle() just returns when that happens. The worker treats the record as handled even though no email was sent, so a bad deploy can silently lose signup/reset emails. Raise a hard failure or route the record to a DLQ instead.
Also applies to: 100-104
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/services/email_consumer.py` around lines 28 - 37, The render() method
currently swallows TemplateNotFound and returns an empty string which lets
handle() treat the message as processed; instead, when
self._environment.get_template(...) raises TemplateNotFound, re-raise that
exception (or raise a specific EmailTemplateMissingError) so the worker fails
fast / triggers retry/DLQ logic rather than returning ""; update handle() to not
swallow this exception and to route the record to the DLQ or let it bubble to
the worker framework; apply the same change to the other identical
template-loading block around the second occurrence (the block at the other
occurrence mentioned) so missing templates always cause a hard failure or DLQ
routing.
| async def send(self, to: str, subject: str, html_body: str) -> None: | ||
| if not settings.MAILGUN_API_KEY or not settings.MAILGUN_DOMAIN: | ||
| logger.warning("Mailgun credentials not configured; skipping dispatch") | ||
| return |
There was a problem hiding this comment.
Don't treat skipped or rejected Mailgun calls as successful sends.
send() returns normally when Mailgun credentials are missing and for every 4xx response. handle() then logs "Dispatched email event...", so monitoring will say the email was sent even when Mailgun skipped or rejected it. Have send() return an explicit delivery result or raise on these branches.
Also applies to: 72-78, 106-115
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/services/email_consumer.py` around lines 48 - 51, The send() method in
app.services.email_consumer (async def send) currently returns normally when
Mailgun creds are missing or when Mailgun returns 4xx, causing handle() to log
success incorrectly; change send() to either raise a specific exception (e.g.,
EmailDeliveryError) for skipped/misconfigured or rejected (4xx) cases OR return
an explicit result enum (e.g., EmailDeliveryResult {SENT, SKIPPED, REJECTED,
FAILED}) and include the HTTP status and body for non-2xx responses; update the
caller handle() to check that returned result == SENT (or catch
EmailDeliveryError) before logging "Dispatched email event..." and only log
success on true delivery, while preserving error logging for other cases.
| kafka_manager = get_kafka_manager() | ||
| await kafka_manager.producer.send(self._topic, event, key=to) | ||
| logger.info("Queued email '%s' for %s", template, to) |
There was a problem hiding this comment.
Avoid using the raw email address as the Kafka key.
key=to persists the recipient address in Kafka record metadata, and the info log repeats it. That creates retained PII in broker/admin tooling and log pipelines; use a non-PII stable key instead and redact the log. If you still need per-recipient ordering, hash the normalized address rather than storing it directly.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@app/services/email_producer.py` around lines 34 - 36, The code currently uses
the raw recipient address as Kafka key and logs it (see kafka_manager via
get_kafka_manager(), producer.send(self._topic, event, key=to) and logger.info),
which leaks PII; change to compute a stable non-PII key (e.g., hash of a
normalized email) and pass that to producer.send instead of `to`, and redact the
log entry (e.g., log a masked or hashed form or just the recipient domain) in
logger.info while keeping template and topic info; ensure normalization and
hashing happen in the same function/method that sends the message so ordering
remains stable per-recipient.
…password Signed-off-by: aniebietafia <aniebietafia87@gmail.com>
…password
Summary by CodeRabbit
New Features
Chores
Tests