Skip to content

Commit

Permalink
Merge pull request #28 from aws-educate-tw/SCRUM-70-Send-Email-attach…
Browse files Browse the repository at this point in the history
…ment

[SCRUM-70, SCRUN-63] Add Email Attachment Functionality and Refactor Email Service
  • Loading branch information
sh1un committed Jul 12, 2024
2 parents 59ab16a + b66b405 commit 76c688b
Show file tree
Hide file tree
Showing 7 changed files with 352 additions and 97 deletions.
14 changes: 14 additions & 0 deletions src/email_service/send_email/data_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import math
from decimal import Decimal


def convert_float_to_decimal(data):
if isinstance(data, list):
return [convert_float_to_decimal(item) for item in data]
elif isinstance(data, dict):
return {key: convert_float_to_decimal(value) for key, value in data.items()}
elif isinstance(data, float):
if math.isinf(data) or math.isnan(data):
return None
return Decimal(str(data))
return data
2 changes: 2 additions & 0 deletions src/email_service/send_email/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def save_to_dynamodb(
template_file_id,
spreadsheet_file_id,
created_at,
row_data,
sent_at=None,
updated_at=None,
):
Expand All @@ -35,6 +36,7 @@ def save_to_dynamodb(
"recipient_email": recipient_email,
"template_file_id": template_file_id,
"spreadsheet_file_id": spreadsheet_file_id,
"row_data": row_data,
"created_at": created_at,
"updated_at": updated_at,
}
Expand Down
172 changes: 112 additions & 60 deletions src/email_service/send_email/lambda_function.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,120 @@
import json
import logging
import os
import uuid

import boto3
import time_util
from data_util import convert_float_to_decimal
from dynamodb import save_to_dynamodb
from s3 import get_template, read_sheet_data_from_s3
from ses import process_email
from sqs import delete_sqs_message
from sqs import delete_sqs_message, process_sqs_message

from file_service import get_file_info

# Set up logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


# Get environment variables for DynamoDB table and SQS queue URL
DYNAMODB_TABLE = os.getenv("DYNAMODB_TABLE")
SQS_QUEUE_URL = os.getenv("SQS_QUEUE_URL")

# Initialize DynamoDB resource and SQS client
dynamodb = boto3.resource("dynamodb")
sqs_client = boto3.client("sqs")
table = dynamodb.Table(DYNAMODB_TABLE)


def save_emails_to_dynamodb(
run_id, template_file_id, spreadsheet_id, display_name, rows
):
"""
Save each row of email data to DynamoDB with a status of 'PENDING'.
:param run_id: The run ID for tracking the email sending operation.
:param template_file_id: File ID of the email template.
:param spreadsheet_id: File ID of the spreadsheet.
:param display_name: Display name of the sender.
:param rows: List of dictionaries, each containing email and other placeholder data.
"""
for row in rows:
email_id = str(uuid.uuid4().hex)
row = convert_float_to_decimal(row)
created_at = time_util.get_current_utc_time()
logger.info("Converted row data: %s", row)
save_to_dynamodb(
run_id=run_id,
email_id=email_id,
display_name=display_name,
status="PENDING",
recipient_email=row.get("Email"),
template_file_id=template_file_id,
spreadsheet_file_id=spreadsheet_id,
created_at=created_at,
row_data=row,
)


def fetch_and_process_pending_emails(
run_id,
email_title,
template_content,
display_name,
template_file_id,
spreadsheet_id,
attachment_file_ids,
):
"""
Fetch emails with 'PENDING' status from DynamoDB and process them.
:param run_id: The run ID for tracking the email sending operation.
:param email_title: Title of the email to be sent.
:param template_content: The HTML content of the email template.
:param display_name: Display name of the sender.
:param template_file_id: File ID of the email template.
:param spreadsheet_id: File ID of the spreadsheet.
:param attachment_file_ids: List of file IDs for attachments.
"""
pending_emails = table.query(
IndexName="run_id-status-gsi",
KeyConditionExpression=boto3.dynamodb.conditions.Key("run_id").eq(run_id)
& boto3.dynamodb.conditions.Key("status").eq("PENDING"),
)

ses_client = boto3.client("ses", region_name="ap-northeast-1")
for item in pending_emails["Items"]:
recipient_email = item["recipient_email"]
email_id = item["email_id"]
row = item.get("row_data")
created_at = item.get("created_at")
process_email(
ses_client,
email_title,
template_content,
recipient_email,
row,
display_name,
run_id,
template_file_id,
spreadsheet_id,
created_at,
email_id,
attachment_file_ids,
)


def lambda_handler(event, context):
"""
AWS Lambda handler function to process SQS messages and handle email sending.
:param event: The event data from SQS.
:param context: The runtime information of the Lambda function.
"""
for record in event["Records"]:
try:
body = json.loads(record["body"])
receipt_handle = record["receiptHandle"]
template_file_id = body.get("template_file_id")
spreadsheet_id = body.get("spreadsheet_file_id")
email_title = body.get("email_title")
display_name = body.get("display_name")
run_id = body.get("run_id")

logger.info("Processing message with run_id: %s", run_id)

msg = process_sqs_message(record)
run_id = msg["run_id"]
# Check if the run_id already exists in DynamoDB
response = table.query(
KeyConditionExpression=boto3.dynamodb.conditions.Key("run_id").eq(
Expand All @@ -45,66 +123,40 @@ def lambda_handler(event, context):
)
if response["Count"] == 0:
# Run ID does not exist, save all emails to DynamoDB with PENDING status
template_info = get_file_info(template_file_id)
template_info = get_file_info(msg["template_file_id"])
template_s3_key = template_info["s3_object_key"]
template_content = get_template(template_s3_key)

spreadsheet_info = get_file_info(spreadsheet_id)
spreadsheet_info = get_file_info(msg["spreadsheet_id"])
spreadsheet_s3_key = spreadsheet_info["s3_object_key"]
data, _ = read_sheet_data_from_s3(spreadsheet_s3_key)
logger.info("Read sheet data from S3: %s", data)

for row in data:
email_id = str(uuid.uuid4().hex)
save_to_dynamodb(
run_id,
email_id,
display_name,
"PENDING",
row.get("Email"),
template_file_id,
spreadsheet_id,
time_util.get_current_utc_time(),
)

# Fetch emails with PENDING status and process them
pending_emails = table.query(
IndexName="run_id-status-gsi",
KeyConditionExpression=boto3.dynamodb.conditions.Key("run_id").eq(
run_id
)
& boto3.dynamodb.conditions.Key("status").eq("PENDING"),
)

ses_client = boto3.client("ses", region_name="ap-northeast-1")
template_content = get_template(
template_s3_key
) # Load the template content
for item in pending_emails["Items"]:
recipient_email = item["recipient_email"]
email_id = item["email_id"]
row = {
"Email": recipient_email,
"Name": item.get("Name"),
} # Assuming the spreadsheet has these fields
process_email(
ses_client,
email_title,
template_content,
row,
display_name,
save_emails_to_dynamodb(
run_id,
template_file_id,
spreadsheet_id,
email_id,
msg["template_file_id"],
msg["spreadsheet_id"],
msg["display_name"],
data,
)

template_content = get_template(template_s3_key)
fetch_and_process_pending_emails(
run_id,
msg["email_title"],
template_content,
msg["display_name"],
msg["template_file_id"],
msg["spreadsheet_id"],
msg["attachment_file_ids"],
)
logger.info("Processed all emails for run_id: %s", run_id)
except Exception as e:
logger.error("Error processing message: %s", e)
finally:
if SQS_QUEUE_URL:
try:
delete_sqs_message(sqs_client, SQS_QUEUE_URL, receipt_handle)
logger.info("Deleted message from SQS: %s", receipt_handle)
delete_sqs_message(sqs_client, SQS_QUEUE_URL, msg["receipt_handle"])
logger.info("Deleted message from SQS: %s", msg["receipt_handle"])
except Exception as e:
logger.error("Error deleting SQS message: %s", e)
else:
Expand Down
Loading

0 comments on commit 76c688b

Please sign in to comment.