diff --git a/src/main/java/alfio/config/DataSourceConfiguration.java b/src/main/java/alfio/config/DataSourceConfiguration.java index 023aa25ee3..043709d867 100644 --- a/src/main/java/alfio/config/DataSourceConfiguration.java +++ b/src/main/java/alfio/config/DataSourceConfiguration.java @@ -235,7 +235,6 @@ public Trigger[] getTriggers() throws ParseException { buildTrigger(SendTicketAssignmentReminder.class, "SendTicketAssignmentReminder", SendTicketAssignmentReminder.INTERVAL), buildTrigger(GenerateSpecialPriceCodes.class, "GenerateSpecialPriceCodes", GenerateSpecialPriceCodes.INTERVAL), buildTrigger(SendEmails.class, "SendEmails", SendEmails.INTERVAL), - buildTrigger(EnqueueNotSentEmail.class, "EnqueueNotSentEmail", EnqueueNotSentEmail.INTERVAL), buildTrigger(ProcessReleasedTickets.class, "ProcessReleasedTickets", ProcessReleasedTickets.INTERVAL), buildTrigger(CleanupUnreferencedBlobFiles.class, "CleanupUnreferencedBlobFiles", CleanupUnreferencedBlobFiles.INTERVAL) }; diff --git a/src/main/java/alfio/manager/Jobs.java b/src/main/java/alfio/manager/Jobs.java index c08630c883..52dfeaf423 100644 --- a/src/main/java/alfio/manager/Jobs.java +++ b/src/main/java/alfio/manager/Jobs.java @@ -82,10 +82,6 @@ public void sendEmails() { notificationManager.sendWaitingMessages(); } - public void enqueueNotSentEmail() { - notificationManager.processNotSentEmail(); - } - public void processReleasedTickets() { waitingQueueSubscriptionProcessor.handleWaitingTickets(); } @@ -175,22 +171,6 @@ public void execute(JobExecutionContext context) throws JobExecutionException { } } - @DisallowConcurrentExecution - @Log4j2 - public static class EnqueueNotSentEmail implements Job { - - public static long INTERVAL = ONE_MINUTE; - - @Autowired - private Jobs jobs; - - @Override - public void execute(JobExecutionContext context) throws JobExecutionException { - log.debug("running job " + getClass().getSimpleName()); - jobs.enqueueNotSentEmail(); - } - } - @DisallowConcurrentExecution @Log4j2 public static class ProcessReleasedTickets implements Job { diff --git a/src/main/java/alfio/manager/NotificationManager.java b/src/main/java/alfio/manager/NotificationManager.java index 2a5ce1d460..acc8f1a23b 100644 --- a/src/main/java/alfio/manager/NotificationManager.java +++ b/src/main/java/alfio/manager/NotificationManager.java @@ -138,25 +138,20 @@ public Optional loadSingleMessageForEvent(int eventId, int message void sendWaitingMessages() { eventRepository.findAllActiveIds(ZonedDateTime.now(UTC)) .stream() - .flatMap(id -> emailMessageRepository.loadWaitingForProcessing(id).stream()) + .flatMap(id -> emailMessageRepository.loadIdsWaitingForProcessing(id).stream()) .distinct() .forEach(this::processMessage); } - void processNotSentEmail() { - ZonedDateTime now = ZonedDateTime.now(UTC); - ZonedDateTime expiration = now.minusMinutes(5); - eventRepository.findAllActiveIds(now).stream() - .map(id -> Pair.of(id, tx.execute(status -> emailMessageRepository.updateStatusForRetry(now, expiration)))) - .filter(p -> p.getValue() > 0) - .forEach(p -> { - int eventId = p.getKey(); - log.debug("found {} expired messages for event {}", p.getValue(), eventId); - emailMessageRepository.loadRetryForProcessing(eventId).forEach(this::processMessage); - }); - } + private void processMessage(int messageId) { + EmailMessage message = emailMessageRepository.findById(messageId); + if(message.getAttempts() >= 10) { //FIXME move to conf + tx.execute(status -> emailMessageRepository.updateStatusAndAttempts(messageId, ERROR.name(), message.getAttempts(), Arrays.asList(IN_PROCESS.name(), WAITING.name(), RETRY.name()))); + log.warn("Message with id " + messageId + " will be discarded"); + return; + } + - private void processMessage(EmailMessage message) { try { int result = tx.execute(status -> emailMessageRepository.updateStatus(message.getEventId(), message.getChecksum(), IN_PROCESS.name(), Arrays.asList(WAITING.name(), RETRY.name()))); if(result > 0) { @@ -168,6 +163,7 @@ private void processMessage(EmailMessage message) { log.debug("no messages have been updated on DB for the following criteria: eventId: {}, checksum: {}", message.getEventId(), message.getChecksum()); } } catch(Exception e) { + tx.execute(status -> emailMessageRepository.updateStatusAndAttempts(message.getId(), RETRY.name(), message.getAttempts() + 1, Arrays.asList(IN_PROCESS.name(), WAITING.name(), RETRY.name()))); log.warn("could not send message: ",e); } } diff --git a/src/main/java/alfio/model/EmailMessage.java b/src/main/java/alfio/model/EmailMessage.java index 953c8623b4..71bdcec39b 100644 --- a/src/main/java/alfio/model/EmailMessage.java +++ b/src/main/java/alfio/model/EmailMessage.java @@ -41,6 +41,7 @@ public enum Status { private final String checksum; private final ZonedDateTime requestTimestamp; private final ZonedDateTime sentTimestamp; + private final int attempts; public EmailMessage(@Column("id") int id, @Column("event_id") int eventId, @@ -51,7 +52,8 @@ public EmailMessage(@Column("id") int id, @Column("attachments") String attachments, @Column("checksum") String checksum, @Column("request_ts") ZonedDateTime requestTimestamp, - @Column("sent_ts") ZonedDateTime sentTimestamp) { + @Column("sent_ts") ZonedDateTime sentTimestamp, + @Column("attempts") int attempts) { this.id = id; this.eventId = eventId; this.requestTimestamp = requestTimestamp; @@ -62,6 +64,7 @@ public EmailMessage(@Column("id") int id, this.message = message; this.attachments = attachments; this.checksum = checksum; + this.attempts = attempts; } @Override diff --git a/src/main/java/alfio/repository/EmailMessageRepository.java b/src/main/java/alfio/repository/EmailMessageRepository.java index b374b9178c..2d89263fec 100644 --- a/src/main/java/alfio/repository/EmailMessageRepository.java +++ b/src/main/java/alfio/repository/EmailMessageRepository.java @@ -34,7 +34,7 @@ public interface EmailMessageRepository { * @param checksum * @return */ - @Query("select id, event_id, status, recipient, subject, message, null as attachments, checksum, request_ts, sent_ts from email_message where event_id = :eventId and checksum = :checksum limit 1") + @Query("select id, event_id, status, recipient, subject, message, null as attachments, checksum, request_ts, sent_ts, attempts from email_message where event_id = :eventId and checksum = :checksum limit 1") Optional findByEventIdAndChecksum(@Bind("eventId") int eventId, @Bind("checksum") String checksum); @Query("insert into email_message (event_id, status, recipient, subject, message, attachments, checksum, request_ts) values(:eventId, 'WAITING', :recipient, :subject, :message, :attachments, :checksum, :timestamp)") @@ -53,14 +53,13 @@ int insert(@Bind("eventId") int eventId, @Query("update email_message set status = :status where id = :messageId and event_id = :eventId") int updateStatus(@Bind("eventId") int eventId, @Bind("status") String status, @Bind("messageId") int messageId); - @Query("update email_message set status = 'RETRY', request_ts = :requestTs where status in ('WAITING', 'ERROR', 'RETRY') and request_ts > :expiration") - int updateStatusForRetry(@Bind("requestTs") ZonedDateTime now, @Bind("expiration") ZonedDateTime expiration); + @Query("update email_message set status = :status, attempts = :attempts where id = :messageId and status in (:expectedStatuses) ") + int updateStatusAndAttempts(@Bind("messageId") int messageId, @Bind("status") String status, @Bind("attempts") int attempts, @Bind("expectedStatuses") List expectedStatuses); - @Query("select id, event_id, status, recipient, subject, message, attachments, checksum, request_ts, sent_ts from email_message where event_id = :eventId and status = 'WAITING' for update") - List loadWaitingForProcessing(@Bind("eventId") int eventId); - @Query("select id, event_id, status, recipient, subject, message, attachments, checksum, request_ts, sent_ts from email_message where event_id = :eventId and status = 'RETRY' for update") - List loadRetryForProcessing(@Bind("eventId") int eventId); + //FIXME add date as a filtering condition + @Query("select id from email_message where event_id = :eventId and (status = 'WAITING' or status = 'RETRY') for update") + List loadIdsWaitingForProcessing(@Bind("eventId") int eventId); @Query("update email_message set status = 'SENT', sent_ts = :sentTimestamp where event_id = :eventId and checksum = :checksum and status in (:expectedStatuses)") int updateStatusToSent(@Bind("eventId") int eventId, @Bind("checksum") String checksum, @Bind("sentTimestamp") ZonedDateTime sentTimestamp, @Bind("expectedStatuses") List expectedStatuses); @@ -68,6 +67,9 @@ int insert(@Bind("eventId") int eventId, @Query("select * from email_message where event_id = :eventId") List findByEventId(@Bind("eventId") int eventId); + @Query("select * from email_message where id = :id") + EmailMessage findById(@Bind("id") int id); + @Query("select * from email_message where id = :messageId and event_id = :eventId") Optional findByEventIdAndMessageId(@Bind("eventId") int eventId, @Bind("messageId") int messageId); diff --git a/src/main/resources/alfio/db/HSQLDB/V14_1.7.2__ALTER_EMAIL_HANDLING.sql b/src/main/resources/alfio/db/HSQLDB/V14_1.7.2__ALTER_EMAIL_HANDLING.sql new file mode 100644 index 0000000000..31dfe0b1a8 --- /dev/null +++ b/src/main/resources/alfio/db/HSQLDB/V14_1.7.2__ALTER_EMAIL_HANDLING.sql @@ -0,0 +1,18 @@ +-- +-- This file is part of alf.io. +-- +-- alf.io is free software: you can redistribute it and/or modify +-- it under the terms of the GNU General Public License as published by +-- the Free Software Foundation, either version 3 of the License, or +-- (at your option) any later version. +-- +-- alf.io is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with alf.io. If not, see . +-- + +alter table email_message add column attempts integer DEFAULT 0 NOT null; \ No newline at end of file diff --git a/src/main/resources/alfio/db/MYSQL/V14_1.7.2__ALTER_EMAIL_HANDLING.sql b/src/main/resources/alfio/db/MYSQL/V14_1.7.2__ALTER_EMAIL_HANDLING.sql new file mode 100644 index 0000000000..31dfe0b1a8 --- /dev/null +++ b/src/main/resources/alfio/db/MYSQL/V14_1.7.2__ALTER_EMAIL_HANDLING.sql @@ -0,0 +1,18 @@ +-- +-- This file is part of alf.io. +-- +-- alf.io is free software: you can redistribute it and/or modify +-- it under the terms of the GNU General Public License as published by +-- the Free Software Foundation, either version 3 of the License, or +-- (at your option) any later version. +-- +-- alf.io is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with alf.io. If not, see . +-- + +alter table email_message add column attempts integer DEFAULT 0 NOT null; \ No newline at end of file diff --git a/src/main/resources/alfio/db/PGSQL/V14_1.7.2__ALTER_EMAIL_HANDLING.sql b/src/main/resources/alfio/db/PGSQL/V14_1.7.2__ALTER_EMAIL_HANDLING.sql new file mode 100644 index 0000000000..31dfe0b1a8 --- /dev/null +++ b/src/main/resources/alfio/db/PGSQL/V14_1.7.2__ALTER_EMAIL_HANDLING.sql @@ -0,0 +1,18 @@ +-- +-- This file is part of alf.io. +-- +-- alf.io is free software: you can redistribute it and/or modify +-- it under the terms of the GNU General Public License as published by +-- the Free Software Foundation, either version 3 of the License, or +-- (at your option) any later version. +-- +-- alf.io is distributed in the hope that it will be useful, +-- but WITHOUT ANY WARRANTY; without even the implied warranty of +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +-- GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License +-- along with alf.io. If not, see . +-- + +alter table email_message add column attempts integer DEFAULT 0 NOT null; \ No newline at end of file