-
Notifications
You must be signed in to change notification settings - Fork 1
inbounds and parsing #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
de336a8
a7b2890
f6f05a9
723c50b
4c62361
797d6bd
a44ea0d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,41 +1,59 @@ | ||
| import mongoose from "mongoose"; | ||
| import Message from "../db/mongo/schemas/Message"; | ||
| import type { MessageStatus } from "../models/Message"; | ||
|
|
||
| export async function createMessage({ | ||
| organizationId, | ||
| inboxId, | ||
| threadId, | ||
| fromInboxId, | ||
| toInboxId, | ||
| from, | ||
| to, | ||
| externalMessageId, | ||
| subject, | ||
| text, | ||
| html, | ||
| status, | ||
| }: { | ||
| organizationId: string; | ||
| inboxId: string; | ||
| threadId: string; | ||
| fromInboxId?: string; | ||
| toInboxId?: string; | ||
| from: string; | ||
| to: string; | ||
| externalMessageId?: string; | ||
| subject: string; | ||
| text: string; | ||
| html: string; | ||
| status?: (typeof MessageStatus)[number]; | ||
| }) { | ||
| const message = new Message(); | ||
| message.organizationId = new mongoose.Types.ObjectId(organizationId); | ||
| message.inboxId = new mongoose.Types.ObjectId(inboxId); | ||
| message.threadId = new mongoose.Types.ObjectId(threadId); | ||
| message.fromInboxId = fromInboxId ? new mongoose.Types.ObjectId(fromInboxId) : undefined; | ||
| message.toInboxId = toInboxId ? new mongoose.Types.ObjectId(toInboxId) : undefined; | ||
| message.from = from; | ||
| message.to = to; | ||
| message.externalMessageId = externalMessageId; | ||
| message.subject = subject; | ||
| message.text = text; | ||
| message.html = html; | ||
| message.status = status; | ||
| await message.save(); | ||
| return message; | ||
| } | ||
|
|
||
| export async function getMessageById(messageId: string) { | ||
| return await Message.findById(messageId); | ||
| } | ||
|
|
||
| export async function getMessagesByInboxId(inboxId: string) { | ||
| return await Message.find({ inboxId: new mongoose.Types.ObjectId(inboxId) }); | ||
| } | ||
|
|
||
| export async function getMessageByExternalMessageId(externalMessageId: string) { | ||
| return await Message.findOne({ externalMessageId }); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,10 +5,14 @@ import { | |
| VerifyDomainDkimCommand, | ||
| } from "@aws-sdk/client-ses"; | ||
| import type { SNSMessage } from "../models/SES"; | ||
| import { getMessageById } from "./MessageController"; | ||
| import { createMessage, getMessageByExternalMessageId, getMessageById } from "./MessageController"; | ||
| import type { MessageStatus } from "../models/Message"; | ||
| import type { WebhookEvents } from "../models/Webhook"; | ||
| import { sendWebhookEvent } from "./WebhookAttemptController"; | ||
| import { getInboxByEmail } from "./InboxController"; | ||
| import { simpleParser } from "mailparser"; | ||
| import EmailReplyParser from "email-reply-parser"; | ||
| import { addMessageToThread, createThread } from "./ThreadController"; | ||
|
|
||
| export const ses = new SESClient({ | ||
| region: "us-east-2", | ||
|
|
@@ -46,7 +50,6 @@ export async function sendSESMessage({ | |
| text: string; | ||
| html: string; | ||
| }) { | ||
| console.log("Sending SES message", { messageId, from, to, fromName, subject, text, html }); | ||
| const command = new SendEmailCommand({ | ||
| Source: fromName ? `${fromName} <${from}>` : from, | ||
| Destination: { | ||
|
|
@@ -82,49 +85,136 @@ export async function sendSESMessage({ | |
| export async function handleDeliveryNotification(rawMessage: string) { | ||
| try { | ||
| const notification: SNSMessage = JSON.parse(rawMessage); | ||
| console.log("notification", notification); | ||
|
|
||
| const messageId = notification.mail?.tags?.["message"]?.[0]; | ||
| if (!messageId) { | ||
| console.error( | ||
| "No sendook:message tag found in SES delivery notification" | ||
| ); | ||
| return; | ||
| } | ||
| const message = await getMessageById(messageId); | ||
| if (!message) { | ||
| console.error("Message not found", messageId); | ||
| if (messageId) { | ||
| await handleOutboundSESMessage({ | ||
| notification, | ||
| messageId, | ||
| }); | ||
| return; | ||
| } | ||
|
|
||
| let status: (typeof MessageStatus)[number] | undefined; | ||
| let event: (typeof WebhookEvents)[number] | undefined; | ||
| if (notification.eventType === "Reject") { | ||
| status = "rejected"; | ||
| event = "message.rejected"; | ||
| } else if (notification.eventType === "Bounce") { | ||
| status = "bounced"; | ||
| event = "message.bounced"; | ||
| } else if (notification.eventType === "Complaint") { | ||
| status = "complained"; | ||
| event = "message.complained"; | ||
| } else if (notification.eventType === "Delivery") { | ||
| status = "delivered"; | ||
| event = "message.delivered"; | ||
| await handleInboundSESMessage({ | ||
| notification, | ||
| }); | ||
| } catch (error) { | ||
| console.error("Error handling SES delivery notification", error); | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| export async function handleInboundSESMessage({ | ||
| notification, | ||
| }: { | ||
| notification: SNSMessage; | ||
| }) { | ||
| if (!notification.mail.destination[0]) { | ||
| return; | ||
| } | ||
|
|
||
| const inbox = await getInboxByEmail(notification.mail.destination[0]); | ||
| if (!inbox) { | ||
| console.error("Inbox not found", notification.mail.destination[0]); | ||
| return; | ||
| } | ||
|
|
||
| const mail = await simpleParser(Buffer.from(notification.content, "base64").toString("utf-8")); | ||
| const content = new EmailReplyParser().read(mail.text ?? ""); | ||
|
|
||
| const fromInboxId = await getInboxByEmail(notification.mail.source); | ||
|
|
||
| const reference = notification.mail.headers?.find(header => header.name === "References")?.value; | ||
| const replyToMessageId = reference?.match(/<([^@>]+)@us-east-2\.amazonses\.com>/)?.[1]; | ||
|
|
||
| let threadId: string | undefined; | ||
| if (replyToMessageId) { | ||
| const message = await getMessageByExternalMessageId(replyToMessageId); | ||
| if (message) { | ||
| threadId = message.threadId.toString(); | ||
| } | ||
| } | ||
|
|
||
| if (!threadId) { | ||
| const thread = await createThread({ | ||
| organizationId: inbox.organizationId.toString(), | ||
| inboxId: inbox.id, | ||
| }); | ||
| threadId = thread._id.toString(); | ||
| } | ||
|
|
||
| const message = await createMessage({ | ||
| organizationId: inbox.organizationId.toString(), | ||
| inboxId: inbox.id, | ||
| threadId, | ||
| from: notification.mail.source, | ||
| fromInboxId: fromInboxId?.id, | ||
| to: notification.mail.destination[0], | ||
| toInboxId: inbox.id, | ||
| subject: notification.mail.commonHeaders?.subject, | ||
| text: content.getVisibleText(), | ||
| html: content.getVisibleText(), | ||
| status: "received", | ||
| }); | ||
|
Comment on lines
+147
to
+159
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. HTML field should contain HTML content, not plain text. Line 157 sets the Consider this fix: text: content.getVisibleText(),
- html: content.getVisibleText(),
+ html: mail.html || content.getVisibleText(),
status: "received",
🤖 Prompt for AI Agents |
||
|
|
||
| await addMessageToThread({ | ||
| threadId, | ||
| messageId: message._id.toString(), | ||
| }); | ||
|
|
||
| await sendWebhookEvent({ | ||
| organizationId: inbox.organizationId.toString(), | ||
| inboxId: inbox.id, | ||
| messageId: message.id, | ||
| event: "message.received", | ||
| payload: message, | ||
| }); | ||
| } | ||
|
|
||
| export async function handleOutboundSESMessage({ | ||
| notification, | ||
| messageId, | ||
| }: { | ||
| notification: SNSMessage; | ||
| messageId: string; | ||
| }) { | ||
| const message = await getMessageById(messageId); | ||
| if (!message) { | ||
| console.error("Message not found", messageId); | ||
| return; | ||
| } | ||
|
|
||
| let status: (typeof MessageStatus)[number] | undefined; | ||
| let event: (typeof WebhookEvents)[number] | undefined; | ||
| if (notification.eventType === "Reject") { | ||
| status = "rejected"; | ||
| event = "message.rejected"; | ||
| } else if (notification.eventType === "Bounce") { | ||
| status = "bounced"; | ||
| event = "message.bounced"; | ||
| } else if (notification.eventType === "Complaint") { | ||
| status = "complained"; | ||
| event = "message.complained"; | ||
| } else if (notification.eventType === "Delivery") { | ||
| status = "delivered"; | ||
| event = "message.delivered"; | ||
| } | ||
|
|
||
| if (status) { | ||
| message.status = status; | ||
| await message.save(); | ||
| } | ||
|
|
||
| if (event) { | ||
| await sendWebhookEvent({ | ||
| organizationId: message.organizationId.toString(), | ||
| inboxId: message.inboxId.toString(), | ||
| messageId: message.id, | ||
| event, | ||
| payload: message, | ||
| }); | ||
| } | ||
| } catch (error) { | ||
| console.error("Error handling SES delivery notification", error); | ||
| if (!event) { | ||
| return; | ||
| } | ||
|
|
||
| await sendWebhookEvent({ | ||
| organizationId: message.organizationId.toString(), | ||
| inboxId: message.inboxId.toString(), | ||
| messageId: message.id, | ||
| event, | ||
| payload: message, | ||
| }); | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,43 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import mongoose from "mongoose"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import Thread from "../db/mongo/schemas/Thread"; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| export async function createThread({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| organizationId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| inboxId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }: { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| organizationId: string; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| inboxId: string; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const thread = new Thread(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| thread.organizationId = new mongoose.Types.ObjectId(organizationId); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| thread.inboxId = new mongoose.Types.ObjectId(inboxId); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| thread.messages = new mongoose.Types.Array<mongoose.Types.ObjectId>(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await thread.save(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return thread; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+4
to
+17
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add error handling and input validation. This function lacks try-catch error handling and input validation. Invalid ObjectId strings will throw unhandled exceptions that could crash the service or leak error details. Wrap in try-catch and validate inputs: export async function createThread({
organizationId,
inboxId,
}: {
organizationId: string;
inboxId: string;
}) {
+ try {
+ // Validate ObjectId format
+ if (!mongoose.Types.ObjectId.isValid(organizationId) || !mongoose.Types.ObjectId.isValid(inboxId)) {
+ throw new Error("Invalid organizationId or inboxId format");
+ }
+
const thread = new Thread();
thread.organizationId = new mongoose.Types.ObjectId(organizationId);
thread.inboxId = new mongoose.Types.ObjectId(inboxId);
thread.messages = new mongoose.Types.Array<mongoose.Types.ObjectId>();
await thread.save();
return thread;
+ } catch (error) {
+ console.error("Error creating thread:", error);
+ throw error;
+ }
}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| export async function addMessageToThread({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| threadId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| messageId, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }: { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| threadId: string; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| messageId: string; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const thread = await Thread.findById(threadId); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!thread) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return null; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| thread.messages.push(new mongoose.Types.ObjectId(messageId)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await thread.save(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return thread; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+19
to
+33
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add error handling, input validation, and duplicate check. This function has several issues:
Apply these improvements: export async function addMessageToThread({
threadId,
messageId,
}: {
threadId: string;
messageId: string;
}) {
+ try {
+ if (!mongoose.Types.ObjectId.isValid(threadId) || !mongoose.Types.ObjectId.isValid(messageId)) {
+ throw new Error("Invalid threadId or messageId format");
+ }
+
const thread = await Thread.findById(threadId);
if (!thread) {
return null;
}
+
+ // Check for duplicate
+ const messageObjectId = new mongoose.Types.ObjectId(messageId);
+ if (thread.messages.some(id => id.equals(messageObjectId))) {
+ return thread; // Already exists
+ }
+
- thread.messages.push(new mongoose.Types.ObjectId(messageId));
+ thread.messages.push(messageObjectId);
await thread.save();
return thread;
+ } catch (error) {
+ console.error("Error adding message to thread:", error);
+ throw error;
+ }
} |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| export async function getThreadsByInboxId(inboxId: string) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const threads = await Thread.find({ inboxId: new mongoose.Types.ObjectId(inboxId) }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return threads; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+35
to
+38
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add error handling and consider pagination. This function lacks error handling and could return large result sets without pagination. Add error handling: export async function getThreadsByInboxId(inboxId: string) {
+ try {
+ if (!mongoose.Types.ObjectId.isValid(inboxId)) {
+ throw new Error("Invalid inboxId format");
+ }
+
const threads = await Thread.find({ inboxId: new mongoose.Types.ObjectId(inboxId) });
return threads;
+ } catch (error) {
+ console.error("Error fetching threads:", error);
+ throw error;
+ }
}Consider adding pagination parameters (limit, skip) to handle inboxes with many threads. 🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| export async function getThreadById(threadId: string) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const thread = await Thread.findById(threadId).populate("messages"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return thread; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+40
to
+43
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add error handling and input validation. Missing error handling and input validation could cause unhandled exceptions. export async function getThreadById(threadId: string) {
+ try {
+ if (!mongoose.Types.ObjectId.isValid(threadId)) {
+ throw new Error("Invalid threadId format");
+ }
+
const thread = await Thread.findById(threadId).populate("messages");
return thread;
+ } catch (error) {
+ console.error("Error fetching thread:", error);
+ throw error;
+ }
}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,7 +7,7 @@ import cors from "cors"; | |
| import authRouter from "./routes/auth"; | ||
| import organizationsRouter from "./routes/organizations"; | ||
| import webhooksRouter from "./routes/webhooks"; | ||
| import inboxesRouter from "./routes/inboxes"; | ||
| import v1Router from "./routes/v1/index"; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Breaking API change: Document the migration path. Replacing the Would you like me to generate a migration guide or help implement backward compatibility with a deprecation notice? 🤖 Prompt for AI Agents |
||
|
|
||
| startMongo(); | ||
|
|
||
|
|
@@ -47,8 +47,8 @@ app.get("/health", (req, res) => { | |
|
|
||
| app.use("/auth", authRouter); | ||
| app.use("/organizations", organizationsRouter); | ||
| app.use("/inboxes", inboxesRouter); | ||
| app.use("/webhooks", webhooksRouter); | ||
| app.use("/v1", v1Router); | ||
|
|
||
| app.listen(port, () => { | ||
| console.log(`Listening on port ${port}...`); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.