Skip to content

Commit

Permalink
refactor(file_service/upload-multiple-file): split the lambda_functio…
Browse files Browse the repository at this point in the history
…n into several files based on aws service and functionality (SCRUM-71)
  • Loading branch information
sh1un committed Jul 11, 2024
1 parent ddd3837 commit e11a634
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 92 deletions.
1 change: 0 additions & 1 deletion src/email_service/send_email/ses.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import os
import re

import time_util
Expand Down
2 changes: 1 addition & 1 deletion src/file_service/upload_multiple_file/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ COPY requirements.txt /var/task/
RUN pip install -r /var/task/requirements.txt

# Copy function code
COPY lambda_function.py /var/task/
COPY . /var/task/

# Set the command to run the Lambda function
CMD ["lambda_function.lambda_handler"]
53 changes: 53 additions & 0 deletions src/file_service/upload_multiple_file/dynamodb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import logging
import os
from typing import Any, Dict

import boto3
import time_util

# Set up logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

dynamodb = boto3.client("dynamodb")
DYNAMODB_TABLE = os.environ["DYNAMODB_TABLE"]


def save_to_dynamodb(
file_id: str,
s3_object_key: str,
file_url: str,
file_name: str,
file_size: int,
uploader_id: str,
) -> Dict[str, Any]:
formatted_now = time_util.get_current_utc_time()

dynamodb.put_item(
TableName=DYNAMODB_TABLE,
Item={
"file_id": {"S": file_id},
"s3_object_key": {"S": s3_object_key},
"created_at": {"S": formatted_now},
"updated_at": {"S": formatted_now},
"file_url": {"S": file_url},
"file_name": {"S": file_name},
"file_extension": {"S": file_name.split(".")[-1]},
"file_size": {"N": str(file_size)},
"uploader_id": {
"S": uploader_id
}, # Replace with actual uploader ID if available
},
)

return {
"file_id": file_id,
"s3_object_key": s3_object_key,
"created_at": formatted_now,
"updated_at": formatted_now,
"file_url": file_url,
"file_name": file_name,
"file_extension": file_name.split(".")[-1],
"file_size": file_size,
"uploader_id": "dummy_uploader_id", # Replace with actual uploader ID if available
}
206 changes: 116 additions & 90 deletions src/file_service/upload_multiple_file/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,123 +3,149 @@
import logging
import os
import uuid
from datetime import datetime, timezone
from urllib.parse import quote, unquote

import boto3
import time_util
from botocore.exceptions import ClientError
from dynamodb import save_to_dynamodb
from requests_toolbelt.multipart import decoder

# Set up logging
logger = logging.getLogger()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# Initialize S3 client and DynamoDB client
s3_client = boto3.client("s3")
dynamodb = boto3.client("dynamodb")
BUCKET_NAME = os.environ["BUCKET_NAME"]
DYNAMODB_TABLE = os.environ["DYNAMODB_TABLE"]
S3_BASE_URL = f"https://{BUCKET_NAME}.s3.amazonaws.com/"
TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"


def lambda_handler(event, context):
logger.info("Received event: %s", event)
content_type = event["headers"].get("Content-Type") or event["headers"].get(
"content-type"
)

# Check if the body is base64 encoded
def decode_request_body(event):
"""Decode the request body if it's base64 encoded."""
if event.get("isBase64Encoded", False):
body = base64.b64decode(event["body"]) # Decode base64-encoded body
body = base64.b64decode(event["body"])
else:
body = event["body"] # Use the body as is
body = event["body"]
logger.info("Decoded request body: %s", body)
multipart_data = decoder.MultipartDecoder(body, content_type)
return body

files_metadata = []

def process_files(multipart_data):
"""Process each file in the multipart data."""
files_metadata = []
for part in multipart_data.parts:
try:
disposition = part.headers[b"Content-Disposition"].decode()
if "filename*=" in disposition:
file_name = disposition.split("filename*=UTF-8''")[1]
file_name = unquote(file_name)
else:
file_name = disposition.split('filename="')[1].split('"')[0]

content_type = part.headers.get(
b"Content-Type", b"application/octet-stream"
).decode()
file_content = part.content

# Generate file_id without hyphens and use it as the S3 object key
file_id = uuid.uuid4().hex # UUID without hyphens
unique_file_name = file_id + "_" + file_name

print(f"Uploading file to S3: {unique_file_name}")

s3_client.put_object(
Bucket=BUCKET_NAME,
Key=unique_file_name,
Body=file_content,
ContentType=content_type,
file_metadata = process_single_file(part)
files_metadata.append(file_metadata)
except ClientError as e:
logger.error(
"Amazon S3 ClientError (%s): %s",
e.response["Error"]["Code"],
e.response["Error"]["Message"],
)
except Exception as e:
logger.error("Unknown error: %s", str(e))
return files_metadata


def process_single_file(part):
"""Process a single file from multipart data."""
file_name = extract_filename(part)
content_type = part.headers.get(
b"Content-Type", b"application/octet-stream"
).decode()
file_content = part.content

file_id = uuid.uuid4().hex
unique_file_name = f"{file_id}_{file_name}"

# Upload file to S3
logger.info("Uploading file to S3: %s", unique_file_name)
s3_client.put_object(
Bucket=BUCKET_NAME,
Key=unique_file_name,
Body=file_content,
ContentType=content_type,
)
logger.info("File uploaded successfully: %s", unique_file_name)

print(f"File uploaded successfully: {unique_file_name}")

encoded_file_name = quote(unique_file_name)
res_url = S3_BASE_URL + encoded_file_name

print(f"Generated S3 URL: {res_url}")

# Generate file metadata and store it in DynamoDB
now = datetime.now(timezone.utc)
formatted_now = now.strftime(TIME_FORMAT) + "Z"
file_size = len(file_content)

print(f"Storing file metadata in DynamoDB: {file_id}")

dynamodb.put_item(
TableName=DYNAMODB_TABLE,
Item={
"file_id": {"S": file_id},
"s3_object_key": {"S": unique_file_name},
"created_at": {"S": formatted_now},
"updated_at": {"S": formatted_now},
"file_url": {"S": res_url},
"file_name": {"S": file_name},
"file_extension": {"S": file_name.split(".")[-1]},
"file_size": {"N": str(file_size)},
"uploader_id": {
"S": "dummy_uploader_id"
}, # Replace with actual uploader ID if available
},
)
# Generate S3 URL
encoded_file_name = quote(unique_file_name)
res_url = S3_BASE_URL + encoded_file_name
logger.info("Generated S3 URL: %s", res_url)

print(f"File metadata stored successfully in DynamoDB: {file_id}")

# Append file metadata to the response
files_metadata.append(
{
"file_id": file_id,
"s3_object_key": unique_file_name,
"created_at": formatted_now,
"updated_at": formatted_now,
"file_url": res_url,
"file_name": file_name,
"file_extension": file_name.split(".")[-1],
"file_size": file_size,
"uploader_id": "dummy_uploader_id", # Replace with actual uploader ID if available
}
)
# Save metadata to DynamoDB
file_size = len(file_content)
save_file_metadata_to_dynamodb(
file_id, unique_file_name, res_url, file_name, file_size
)

except ClientError as e:
error_code = e.response["Error"]["Code"]
error_message = e.response["Error"]["Message"]
print(f"Amazon S3 ClientError ({error_code}): {error_message}")
except Exception as e:
print(f"Unknown error: {str(e)}")
return create_file_metadata_dict(
file_id, unique_file_name, res_url, file_name, file_size
)


def extract_filename(part):
"""Extract filename from Content-Disposition header."""
disposition = part.headers[b"Content-Disposition"].decode()
if "filename*=" in disposition:
file_name = disposition.split("filename*=UTF-8''")[1]
return unquote(file_name)
else:
return disposition.split('filename="')[1].split('"')[0]


def save_file_metadata_to_dynamodb(
file_id, unique_file_name, res_url, file_name, file_size
):
"""Save file metadata to DynamoDB."""
logger.info("Storing file metadata in DynamoDB: %s", file_id)
save_to_dynamodb(
file_id=file_id,
s3_object_key=unique_file_name,
file_url=res_url,
file_name=file_name,
file_size=file_size,
uploader_id="dummy_uploader_id", # Replace with actual uploader ID if available
)
logger.info("File metadata stored successfully in DynamoDB: %s", file_id)


def create_file_metadata_dict(file_id, unique_file_name, res_url, file_name, file_size):
"""Create a dictionary with file metadata."""
formatted_now = time_util.get_current_utc_time()
return {
"file_id": file_id,
"s3_object_key": unique_file_name,
"created_at": formatted_now,
"updated_at": formatted_now,
"file_url": res_url,
"file_name": file_name,
"file_extension": file_name.split(".")[-1],
"file_size": file_size,
"uploader_id": "dummy_uploader_id", # Replace with actual uploader ID if available
}


def lambda_handler(event, context):
"""
Main Lambda function handler for processing file uploads.
:param event: The event dict containing the API request details
:param context: The context object providing runtime information
:return: A dict containing the API response
"""
logger.info("Received event: %s", event)

content_type = event["headers"].get("Content-Type") or event["headers"].get(
"content-type"
)
body = decode_request_body(event)

multipart_data = decoder.MultipartDecoder(body, content_type)
files_metadata = process_files(multipart_data)

return {
"statusCode": 200,
Expand Down
66 changes: 66 additions & 0 deletions src/file_service/upload_multiple_file/time_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import datetime

TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"


def get_current_utc_time() -> str:
"""
Get the current UTC time and format it as ISO 8601.
:return: Current UTC time in ISO 8601 format.
"""
return datetime.datetime.now(datetime.timezone.utc).strftime(TIME_FORMAT)


def format_time_to_iso8601(dt: datetime.datetime) -> str:
"""
Format a datetime object as ISO 8601.
:param dt: Datetime object.
:return: Formatted time as ISO 8601 string.
"""
if dt.tzinfo is None:
dt = dt.replace(tzinfo=datetime.timezone.utc)
return dt.strftime(TIME_FORMAT)


def parse_iso8601_to_datetime(iso8601_str: str) -> datetime.datetime:
"""
Parse an ISO 8601 string to a datetime object.
:param iso8601_str: ISO 8601 formatted string.
:return: Datetime object.
"""
return datetime.datetime.strptime(iso8601_str, TIME_FORMAT).replace(
tzinfo=datetime.timezone.utc
)


def add_hours_to_time(iso8601_str: str, hours: int) -> str:
"""
Add a specified number of hours to an ISO 8601 time string.
:param iso8601_str: ISO 8601 formatted string.
:param hours: Number of hours to add.
:return: New ISO 8601 formatted time string.
"""
dt = parse_iso8601_to_datetime(iso8601_str)
new_dt = dt + datetime.timedelta(hours=hours)
return format_time_to_iso8601(new_dt)


# Example usage
if __name__ == "__main__":
current_time = get_current_utc_time()
print("Current UTC Time:", current_time)

formatted_time = format_time_to_iso8601(
datetime.datetime.now(datetime.timezone.utc)
)
print("Formatted Time:", formatted_time)

parsed_time = parse_iso8601_to_datetime("2024-07-11T12:00:00Z")
print("Parsed Time:", parsed_time)

new_time = add_hours_to_time("2024-07-11T12:00:00Z", 3)
print("New Time:", new_time)

0 comments on commit e11a634

Please sign in to comment.