Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions unicorn_approvals/ApprovalsService/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,16 @@
<configuration>
<environmentVariables>
<LAMBDA_TASK_ROOT>handler</LAMBDA_TASK_ROOT>
<CONTRACT_STATUS_TABLE>test-table</CONTRACT_STATUS_TABLE>
</environmentVariables>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit4</artifactId>
<version>3.5.3</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,24 @@ public class ContractStatusChangedHandlerFunction {
@Metrics(captureColdStart = true)
@Logging(logEvent = true)
public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException {
logger.info("TABLE_NAME environment variable: {}", TABLE_NAME);

Event event = Marshaller.unmarshal(inputStream, Event.class);
ContractStatusChanged contractStatusChanged = event.getDetail();
logger.info("Received event: {}", contractStatusChanged);

saveContractStatus(
contractStatusChanged.getPropertyId(),
contractStatusChanged.getContractStatus(),
contractStatusChanged.getContractId(),
contractStatusChanged.getContractLastModifiedOn()
);
try {
saveContractStatus(
contractStatusChanged.getPropertyId(),
contractStatusChanged.getContractStatus(),
contractStatusChanged.getContractId(),
contractStatusChanged.getContractLastModifiedOn()
);
logger.info("Successfully updated contract status for property: {}", contractStatusChanged.getPropertyId());
} catch (Exception e) {
logger.error("Failed to update contract status for property: {}", contractStatusChanged.getPropertyId(), e);
throw e;
}

try (OutputStreamWriter writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) {
writer.write(objectMapper.writeValueAsString(event.getDetail()));
Expand All @@ -62,6 +71,12 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co

@Tracing
void saveContractStatus(String propertyId, String contractStatus, String contractId, Long contractLastModifiedOn) {
if (TABLE_NAME == null || TABLE_NAME.isEmpty()) {
throw new RuntimeException("CONTRACT_STATUS_TABLE environment variable is not set");
}

logger.info("Updating DynamoDB table: {} for property: {}", TABLE_NAME, propertyId);

Map<String, AttributeValue> key = Map.of("property_id", AttributeValue.fromS(propertyId));

Map<String, AttributeValue> expressionAttributeValues = Map.of(
Expand All @@ -78,6 +93,7 @@ void saveContractStatus(String propertyId, String contractStatus, String contrac
.build();

dynamodbClient.updateItem(updateItemRequest);
logger.info("DynamoDB update completed for property: {}", propertyId);
}

public void setDynamodbClient(DynamoDbClient dynamodbClient) {
Expand Down
2 changes: 1 addition & 1 deletion unicorn_web/Common/src/main/java/dao/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public String getPk() {
if (country == null || city == null) {
return pk; // Return stored value if components are null
}
return ("PROPERTY#" + country + "#" + city).replace(' ', '-').toLowerCase();
return ("PROPERTY#" + country.toLowerCase() + "#" + city.toLowerCase()).replace(' ', '-');
}

public void setPk(String pk) {
Expand Down
6 changes: 3 additions & 3 deletions unicorn_web/Data/property_data.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[
{
"PK": "property#usa#anytown",
"PK": "PROPERTY#usa#anytown",
"SK": "main-street#111",
"country": "USA",
"city": "Anytown",
Expand All @@ -19,7 +19,7 @@
"status": "PENDING"
},
{
"PK": "property#usa#main-town",
"PK": "PROPERTY#usa#main-town",
"SK": "my-street#222",
"country": "USA",
"city": "Main Town",
Expand All @@ -38,7 +38,7 @@
"status": "PENDING"
},
{
"PK": "property#usa#anytown",
"PK": "PROPERTY#usa#anytown",
"SK": "main-street#333",
"country": "USA",
"city": "Anytown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,23 +96,29 @@ public void handleRequest(InputStream inputStream, OutputStream outputStream, Co

@Tracing
private void updatePropertyStatus(String evaluationResult, String propertyId) {
logger.info("Updating property status for property ID: {}", propertyId);
logger.info("Evaluation result: {}", evaluationResult);
try {
String[] parts = propertyId.split("/");
if (parts.length != 4) {
throw new IllegalArgumentException("Invalid property ID format: " + propertyId);
}

String partitionKey = ("search#" + parts[0] + "#" + parts[1]).replace(' ', '-').toLowerCase();
String partitionKey = ("PROPERTY#" + parts[0] + "#" + parts[1]).replace(' ', '-');
String sortKey = (parts[2] + "#" + parts[3]).replace(' ', '-').toLowerCase();

logger.info("Paritition Key: {}", partitionKey);
logger.info("Sort Key: {}", sortKey);

Key key = Key.builder().partitionValue(partitionKey).sortValue(sortKey).build();
Property existingProperty = propertyTable.getItem(key).join();

if (existingProperty == null) {
logger.error("Property not found for ID: {}", propertyId);
throw new RuntimeException("Property not found with ID: " + propertyId);
}

logger.info("Existing property: {}", existingProperty);

existingProperty.setPropertyNumber(parts[3]);
existingProperty.setStatus(evaluationResult);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
import java.util.regex.Pattern;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -36,7 +35,6 @@
import software.amazon.awssdk.services.eventbridge.EventBridgeAsyncClient;
import software.amazon.awssdk.services.eventbridge.model.PutEventsRequest;
import software.amazon.awssdk.services.eventbridge.model.PutEventsRequestEntry;
import software.amazon.lambda.powertools.logging.CorrelationIdPathConstants;
import software.amazon.lambda.powertools.logging.Logging;
import software.amazon.lambda.powertools.metrics.Metrics;
import software.amazon.lambda.powertools.tracing.Tracing;
Expand All @@ -49,7 +47,6 @@ public class RequestApprovalFunction {
private static final Logger logger = LogManager.getLogger(RequestApprovalFunction.class);
private static final Set<String> NO_ACTION_STATUSES = new HashSet<>(Arrays.asList("APPROVED"));
private static final String PROPERTY_ID_PATTERN = "[a-z-]+\\/[a-z-]+\\/[a-z][a-z0-9-]*\\/[0-9-]+";
private static final String CONTENT_TYPE = "application/json";

private final Pattern propertyIdPattern = Pattern.compile(PROPERTY_ID_PATTERN);
private final String tableName = System.getenv("DYNAMODB_TABLE");
Expand All @@ -60,6 +57,7 @@ public class RequestApprovalFunction {
private final ObjectMapper objectMapper = new ObjectMapper();

public RequestApprovalFunction() {

DynamoDbAsyncClient dynamodbClient = DynamoDbAsyncClient.builder()
.httpClientBuilder(NettyNioAsyncHttpClient.builder())
.build();
Expand All @@ -76,72 +74,74 @@ public RequestApprovalFunction() {

@Tracing
@Metrics(captureColdStart = true)
@Logging(logEvent = true, correlationIdPath = CorrelationIdPathConstants.API_GATEWAY_REST)
public APIGatewayProxyResponseEvent handleRequest(final APIGatewayProxyRequestEvent input,
final Context context) {
try {
if (input.getBody() == null || input.getBody().trim().isEmpty()) {
return createErrorResponse(400, "Request body is required");
}

JsonNode rootNode = objectMapper.readTree(input.getBody());
JsonNode propertyIdNode = rootNode.get("property_id");

if (propertyIdNode == null) {
return createErrorResponse(400, "property_id field is required");
}
@Logging(logEvent = true)
public void handleRequest(final SQSEvent input, final Context context) {
logger.info("Environment variables - DYNAMODB_TABLE: {}, EVENT_BUS: {}", tableName, eventBus);
logger.info("Starting approval request processing for {} messages", input.getRecords().size());

String propertyId = propertyIdNode.asText();
if (!propertyIdPattern.matcher(propertyId).matches()) {
return createErrorResponse(400, "Invalid property_id format. Must match: " + PROPERTY_ID_PATTERN);
}

PropertyComponents components = parsePropertyId(propertyId);
List<Property> properties = queryTable(components.partitionKey, components.sortKey);

if (properties.isEmpty()) {
return createErrorResponse(404, "Property not found");
}

Property property = properties.get(0);
if (NO_ACTION_STATUSES.contains(property.getStatus())) {
return createSuccessResponse("Property is already " + property.getStatus() + "; no action taken");

for (SQSEvent.SQSMessage message : input.getRecords()) {
try {
String body = message.getBody();
logger.info("Processing SQS message: {}", body);

if (body == null || body.trim().isEmpty()) {
logger.warn("Message body is null or empty");
continue;
}

JsonNode rootNode = objectMapper.readTree(body);
JsonNode propertyIdNode = rootNode.get("property_id");

if (propertyIdNode == null) {
logger.warn("property_id field missing from message");
continue;
}

String propertyId = propertyIdNode.asText();
logger.info("Processing approval request for property: {}", propertyId);

if (!propertyIdPattern.matcher(propertyId).matches()) {
logger.warn("Invalid property_id format: {}", propertyId);
continue;
}

PropertyComponents components = parsePropertyId(propertyId);
logger.info("Parsed property ID components: {}", components);
List<Property> properties = queryTable(components.partitionKey, components.sortKey);

if (properties.isEmpty()) {
logger.warn("Property not found in database: {}", propertyId);
continue;
}

Property property = properties.get(0);
logger.info("Found property with status: {}", property.getStatus());

if (NO_ACTION_STATUSES.contains(property.getStatus())) {
logger.info("Property already approved, no action needed: {}", propertyId);
continue;
}

sendEvent(property);
logger.info("Approval request completed successfully for property: {}", propertyId);

} catch (JsonProcessingException e) {
logger.error("Invalid JSON in message body: {}", message.getBody(), e);
} catch (Exception e) {
logger.error("Error processing approval request for message: {}", message.getBody(), e);
}

sendEvent(property);
return createSuccessResponse("Approval requested successfully");

} catch (JsonProcessingException e) {
logger.error("Invalid JSON in request body", e);
return createErrorResponse(400, "Invalid JSON format");
} catch (Exception e) {
logger.error("Error processing approval request", e);
return createErrorResponse(500, "Internal server error");
}
}

private PropertyComponents parsePropertyId(String propertyId) {
String[] parts = propertyId.split("/");
String partitionKey = ("search#" + parts[0] + "#" + parts[1]).replace(' ', '-').toLowerCase();
String partitionKey = ("PROPERTY#" + parts[0] + "#" + parts[1]).replace(' ', '-');
String sortKey = (parts[2] + "#" + parts[3]).replace(' ', '-').toLowerCase();
return new PropertyComponents(partitionKey, sortKey);
}

private APIGatewayProxyResponseEvent createSuccessResponse(String message) {
String body = String.format("{\"result\":\"%s\"}", message);
return new APIGatewayProxyResponseEvent()
.withStatusCode(200)
.withHeaders(Map.of("Content-Type", CONTENT_TYPE))
.withBody(body);
}

private APIGatewayProxyResponseEvent createErrorResponse(int statusCode, String message) {
String body = String.format("{\"error\":\"%s\"}", message);
return new APIGatewayProxyResponseEvent()
.withStatusCode(statusCode)
.withHeaders(Map.of("Content-Type", CONTENT_TYPE))
.withBody(body);
}

private static class PropertyComponents {
final String partitionKey;
Expand All @@ -154,7 +154,10 @@ private static class PropertyComponents {
}

private List<Property> queryTable(String partitionKey, String sortKey) throws Exception {
logger.info("Starting DynamoDB query with partitionKey: {}, sortKey: {}", partitionKey, sortKey);

if (partitionKey == null || sortKey == null) {
logger.error("Null keys provided - partitionKey: {}, sortKey: {}", partitionKey, sortKey);
throw new IllegalArgumentException("Partition key and sort key cannot be null");
}

Expand All @@ -166,19 +169,24 @@ private List<Property> queryTable(String partitionKey, String sortKey) throws Ex
.build();

try {
logger.debug("Executing DynamoDB query on table: {}", tableName);
SdkPublisher<Property> properties = propertyTable.query(request).items();
CompletableFuture<Void> future = properties.subscribe(result::add);
future.get();
logger.info("DynamoDB query completed successfully, found {} properties", result.size());
return result;
} catch (DynamoDbException | InterruptedException | ExecutionException e) {
logger.error("Error querying DynamoDB", e);
logger.error("Error querying DynamoDB with partitionKey: {}, sortKey: {}, table: {}",
partitionKey, sortKey, tableName, e);
throw new Exception("Database query failed: " + e.getMessage());
}
}

@Tracing
@Metrics
private void sendEvent(Property property) throws JsonProcessingException {
logger.info("Creating approval event for property: {}", property.getId());

RequestApproval event = new RequestApproval();
event.setPropertyId(property.getId());

Expand All @@ -189,6 +197,7 @@ private void sendEvent(Property property) throws JsonProcessingException {
event.setAddress(address);

String eventString = objectMapper.writeValueAsString(event);
logger.info("Event payload created: {}", eventString);

PutEventsRequestEntry requestEntry = PutEventsRequestEntry.builder()
.eventBusName(eventBus)
Expand All @@ -202,8 +211,15 @@ private void sendEvent(Property property) throws JsonProcessingException {
.entries(requestEntry)
.build();

eventBridgeClient.putEvents(eventsRequest).join();
logger.info("Event sent successfully for property: {}", property.getId());
logger.debug("Sending event to EventBridge bus: {}", eventBus);
try {
eventBridgeClient.putEvents(eventsRequest).join();
logger.info("Event sent successfully for property: {}", property.getId());
} catch (Exception e) {
logger.error("Failed to send event to EventBridge for property: {}, bus: {}",
property.getId(), eventBus, e);
throw e;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public APIGatewayProxyResponseEvent handleRequest(final APIGatewayProxyRequestEv

String partitionKey = buildPartitionKey(pathParams.get("country"), pathParams.get("city"));
String sortKey = buildSortKey(input.getResource(), pathParams);
logger.info("Partition Key: {}, Sort Key: {}", partitionKey, sortKey);

List<Property> properties = queryTable(partitionKey, sortKey);
String responseBody = objectMapper.writeValueAsString(properties);
Expand All @@ -89,7 +90,7 @@ public APIGatewayProxyResponseEvent handleRequest(final APIGatewayProxyRequestEv
}

private String buildPartitionKey(String country, String city) {
return ("search#" + country + "#" + city).replace(' ', '-').toLowerCase();
return ("PROPERTY#" + country + "#" + city).replace(' ', '-');
}

private String buildSortKey(String resource, Map<String, String> pathParams) {
Expand Down
Loading