Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ask about Saga #2

Closed
fizz12344321 opened this issue Jul 1, 2022 · 2 comments
Closed

ask about Saga #2

fizz12344321 opened this issue Jul 1, 2022 · 2 comments
Assignees

Comments

@fizz12344321
Copy link

Hi, i am reading your project and I love it. But i have a question, what queue do you use for the message queue, how do you manage the state of each service and how do you tackle and implement "Transaction outbox ". Thank u so much

@grkn
Copy link
Owner

grkn commented Jul 1, 2022

Hi @fizz12344321 ,

   @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = ApplicationConstant.ORDER_QUEUE, durable = "true"),
            exchange = @Exchange(value = ApplicationConstant.EXCHANGE, ignoreDeclarationExceptions = "true"),
            key = ApplicationConstant.ORDER_ROUTING_KEY)
    )
    public void listen(Message message) throws IOException {
        OrderDto orderDto = objectMapper.readValue(message.getBody(), OrderDto.class);
        if (orderDto.getStatus() != null && !orderDto.getStatus().trim().equals("")) {
            OrderStatus orderStatus = OrderStatus.valueOf(orderDto.getStatus());
            LOGGER.info("Order {} request is received. Transaction Id: {}", orderStatus.name(), orderDto.getTransactionId());
            switch (orderStatus) {
                case ORDER_PENDING:
                case ORDER_COMPLETED:
                case ORDER_FAILED:
                case ORDER_STOCK_COMPLETED:
                    orderService.updateOrder(orderDto.getTransactionId(), orderStatus, orderDto.getPaymentId());
                    break;
                default:
                    break;
            }
        } else {
            throw new IllegalArgumentException(String.format("Order Status is wrong. %s", orderDto.getStatus()));
        }
    }

  1. I am using Exchange for each service and queue is binded to that exchange also there is routing key. Exchange type is direct and queue is simple queue which rabbitMq uses.
  2. Actually Listener classes are responsible for required action for statuses of ORDER, PAYMENT, STOCK services. In this example I simply updated status as given by the message but I can also remove it according to ORDER_FAILED status.
  3. Transaction outbox is not implemented according to methods that are transactional, can cause a problem here. I would like to implement it, If you have time. I just need to make sure that message is published when transaction is finished. I just need a outbox table after database operation completes successfully, I will publish the event.

@grkn
Copy link
Owner

grkn commented Jul 1, 2022

Also I have implemented Transaction outbox by simply spring events.
TransactionalEventListener can be used after commit it automatically sends amqp message to exchange.
It will be atomic by this way. But still there can be failure in application which can cause a missing data and we need to use polling system to db and if it is not sent to amqp simply send it to RabbitMQ . I just implemented only for Order service by the way. Same approach can be applied to other services.

        List<Order> lOrders = orderRepository.saveAll(orders);
        applicationEventPublisher.publishEvent(new CreateOrdersEvent(lOrders));
        return lOrders;

@Service
@RequiredArgsConstructor
public class OrderEventHandler {

    private final static Logger LOGGER = LoggerFactory.getLogger(OrderEventHandler.class);
    private final Sender sender;

   @TransactionalEventListener
    public void createOrdersEvent(CreateOrdersEvent createOrdersEvent) throws JsonProcessingException {
        List<Order> lOrders = createOrdersEvent.getOrders();
        String transactionId = UUID.randomUUID().toString();
        LOGGER.info("Sending PAYMENT_REQUESTED notification to payment queue. Transaction_id: {}", transactionId);
        sender.paymentNotify(PaymentDto.builder()
                .transactionId(transactionId)
                .orders(lOrders.stream()
                        .map(item -> OrderDto.builder()
                                .name(item.getName())
                                .transactionId(item.getTransactionId())
                                .quantity(item.getQuantity())
                                .status(OrderStatus.ORDER_PENDING.name())
                                .price((double) new Random().nextInt(100))
                                .build())
                        .collect(Collectors.toList()))
                .status(PaymentStatus.PAYMENT_REQUESTED.name())
                .build());
        LOGGER.info("Sending STOCK_REQUESTED notification to stock queue. Transaction_id: {}", transactionId);
        sender.stockNotify(StockDto.builder()
                .orders(lOrders.stream()
                        .map(item -> OrderDto.builder()
                                .name(item.getName())
                                .transactionId(item.getTransactionId())
                                .price((double) new Random().nextInt(100))
                                .quantity(item.getQuantity())
                                .status(OrderStatus.ORDER_PENDING.name())
                                .build())
                        .collect(Collectors.toList()))
                .transactionId(transactionId)
                .status(StockStatus.STOCK_REQUESTED.name())
                .build());
    }
}


Also I added simple Scheduler which can be changed later. Example is in below code block

    @Scheduled(fixedRate = 60000)
    @Transactional(readOnly = true)
    public void resendMissingOrders() {
        LOGGER.warn("Missing Received Orders are sent to RabbitMQ by Scheduler Task");
        List<Order> lOrders = orderRepository.findByStatus(OrderStatus.ORDER_RECEIVED);
        applicationEventPublisher.publishEvent(new CreateOrdersEvent(lOrders));
    }


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants