Skip to content

Commit

Permalink
#93 initial work in refactoring the email handling
Browse files Browse the repository at this point in the history
  • Loading branch information
syjer committed Jan 26, 2016
1 parent da07708 commit 64458f4
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 43 deletions.
1 change: 0 additions & 1 deletion src/main/java/alfio/config/DataSourceConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ public Trigger[] getTriggers() throws ParseException {
buildTrigger(SendTicketAssignmentReminder.class, "SendTicketAssignmentReminder", SendTicketAssignmentReminder.INTERVAL),
buildTrigger(GenerateSpecialPriceCodes.class, "GenerateSpecialPriceCodes", GenerateSpecialPriceCodes.INTERVAL),
buildTrigger(SendEmails.class, "SendEmails", SendEmails.INTERVAL),
buildTrigger(EnqueueNotSentEmail.class, "EnqueueNotSentEmail", EnqueueNotSentEmail.INTERVAL),
buildTrigger(ProcessReleasedTickets.class, "ProcessReleasedTickets", ProcessReleasedTickets.INTERVAL),
buildTrigger(CleanupUnreferencedBlobFiles.class, "CleanupUnreferencedBlobFiles", CleanupUnreferencedBlobFiles.INTERVAL)
};
Expand Down
20 changes: 0 additions & 20 deletions src/main/java/alfio/manager/Jobs.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@ public void sendEmails() {
notificationManager.sendWaitingMessages();
}

public void enqueueNotSentEmail() {
notificationManager.processNotSentEmail();
}

public void processReleasedTickets() {
waitingQueueSubscriptionProcessor.handleWaitingTickets();
}
Expand Down Expand Up @@ -175,22 +171,6 @@ public void execute(JobExecutionContext context) throws JobExecutionException {
}
}

@DisallowConcurrentExecution
@Log4j2
public static class EnqueueNotSentEmail implements Job {

public static long INTERVAL = ONE_MINUTE;

@Autowired
private Jobs jobs;

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
log.debug("running job " + getClass().getSimpleName());
jobs.enqueueNotSentEmail();
}
}

@DisallowConcurrentExecution
@Log4j2
public static class ProcessReleasedTickets implements Job {
Expand Down
24 changes: 10 additions & 14 deletions src/main/java/alfio/manager/NotificationManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,25 +138,20 @@ public Optional<EmailMessage> loadSingleMessageForEvent(int eventId, int message
void sendWaitingMessages() {
eventRepository.findAllActiveIds(ZonedDateTime.now(UTC))
.stream()
.flatMap(id -> emailMessageRepository.loadWaitingForProcessing(id).stream())
.flatMap(id -> emailMessageRepository.loadIdsWaitingForProcessing(id).stream())
.distinct()
.forEach(this::processMessage);
}

void processNotSentEmail() {
ZonedDateTime now = ZonedDateTime.now(UTC);
ZonedDateTime expiration = now.minusMinutes(5);
eventRepository.findAllActiveIds(now).stream()
.map(id -> Pair.of(id, tx.execute(status -> emailMessageRepository.updateStatusForRetry(now, expiration))))
.filter(p -> p.getValue() > 0)
.forEach(p -> {
int eventId = p.getKey();
log.debug("found {} expired messages for event {}", p.getValue(), eventId);
emailMessageRepository.loadRetryForProcessing(eventId).forEach(this::processMessage);
});
}
private void processMessage(int messageId) {
EmailMessage message = emailMessageRepository.findById(messageId);
if(message.getAttempts() >= 10) { //FIXME move to conf
tx.execute(status -> emailMessageRepository.updateStatusAndAttempts(messageId, ERROR.name(), message.getAttempts(), Arrays.asList(IN_PROCESS.name(), WAITING.name(), RETRY.name())));
log.warn("Message with id " + messageId + " will be discarded");
return;
}


private void processMessage(EmailMessage message) {
try {
int result = tx.execute(status -> emailMessageRepository.updateStatus(message.getEventId(), message.getChecksum(), IN_PROCESS.name(), Arrays.asList(WAITING.name(), RETRY.name())));
if(result > 0) {
Expand All @@ -168,6 +163,7 @@ private void processMessage(EmailMessage message) {
log.debug("no messages have been updated on DB for the following criteria: eventId: {}, checksum: {}", message.getEventId(), message.getChecksum());
}
} catch(Exception e) {
tx.execute(status -> emailMessageRepository.updateStatusAndAttempts(message.getId(), RETRY.name(), message.getAttempts() + 1, Arrays.asList(IN_PROCESS.name(), WAITING.name(), RETRY.name())));
log.warn("could not send message: ",e);
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/alfio/model/EmailMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public enum Status {
private final String checksum;
private final ZonedDateTime requestTimestamp;
private final ZonedDateTime sentTimestamp;
private final int attempts;

public EmailMessage(@Column("id") int id,
@Column("event_id") int eventId,
Expand All @@ -51,7 +52,8 @@ public EmailMessage(@Column("id") int id,
@Column("attachments") String attachments,
@Column("checksum") String checksum,
@Column("request_ts") ZonedDateTime requestTimestamp,
@Column("sent_ts") ZonedDateTime sentTimestamp) {
@Column("sent_ts") ZonedDateTime sentTimestamp,
@Column("attempts") int attempts) {
this.id = id;
this.eventId = eventId;
this.requestTimestamp = requestTimestamp;
Expand All @@ -62,6 +64,7 @@ public EmailMessage(@Column("id") int id,
this.message = message;
this.attachments = attachments;
this.checksum = checksum;
this.attempts = attempts;
}

@Override
Expand Down
16 changes: 9 additions & 7 deletions src/main/java/alfio/repository/EmailMessageRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface EmailMessageRepository {
* @param checksum
* @return
*/
@Query("select id, event_id, status, recipient, subject, message, null as attachments, checksum, request_ts, sent_ts from email_message where event_id = :eventId and checksum = :checksum limit 1")
@Query("select id, event_id, status, recipient, subject, message, null as attachments, checksum, request_ts, sent_ts, attempts from email_message where event_id = :eventId and checksum = :checksum limit 1")
Optional<EmailMessage> findByEventIdAndChecksum(@Bind("eventId") int eventId, @Bind("checksum") String checksum);

@Query("insert into email_message (event_id, status, recipient, subject, message, attachments, checksum, request_ts) values(:eventId, 'WAITING', :recipient, :subject, :message, :attachments, :checksum, :timestamp)")
Expand All @@ -53,21 +53,23 @@ int insert(@Bind("eventId") int eventId,
@Query("update email_message set status = :status where id = :messageId and event_id = :eventId")
int updateStatus(@Bind("eventId") int eventId, @Bind("status") String status, @Bind("messageId") int messageId);

@Query("update email_message set status = 'RETRY', request_ts = :requestTs where status in ('WAITING', 'ERROR', 'RETRY') and request_ts > :expiration")
int updateStatusForRetry(@Bind("requestTs") ZonedDateTime now, @Bind("expiration") ZonedDateTime expiration);
@Query("update email_message set status = :status, attempts = :attempts where id = :messageId and status in (:expectedStatuses) ")
int updateStatusAndAttempts(@Bind("messageId") int messageId, @Bind("status") String status, @Bind("attempts") int attempts, @Bind("expectedStatuses") List<String> expectedStatuses);

@Query("select id, event_id, status, recipient, subject, message, attachments, checksum, request_ts, sent_ts from email_message where event_id = :eventId and status = 'WAITING' for update")
List<EmailMessage> loadWaitingForProcessing(@Bind("eventId") int eventId);

@Query("select id, event_id, status, recipient, subject, message, attachments, checksum, request_ts, sent_ts from email_message where event_id = :eventId and status = 'RETRY' for update")
List<EmailMessage> loadRetryForProcessing(@Bind("eventId") int eventId);
//FIXME add date as a filtering condition
@Query("select id from email_message where event_id = :eventId and (status = 'WAITING' or status = 'RETRY') for update")
List<Integer> loadIdsWaitingForProcessing(@Bind("eventId") int eventId);

@Query("update email_message set status = 'SENT', sent_ts = :sentTimestamp where event_id = :eventId and checksum = :checksum and status in (:expectedStatuses)")
int updateStatusToSent(@Bind("eventId") int eventId, @Bind("checksum") String checksum, @Bind("sentTimestamp") ZonedDateTime sentTimestamp, @Bind("expectedStatuses") List<String> expectedStatuses);

@Query("select * from email_message where event_id = :eventId")
List<EmailMessage> findByEventId(@Bind("eventId") int eventId);

@Query("select * from email_message where id = :id")
EmailMessage findById(@Bind("id") int id);

@Query("select * from email_message where id = :messageId and event_id = :eventId")
Optional<EmailMessage> findByEventIdAndMessageId(@Bind("eventId") int eventId, @Bind("messageId") int messageId);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
--
-- This file is part of alf.io.
--
-- alf.io is free software: you can redistribute it and/or modify
-- it under the terms of the GNU General Public License as published by
-- the Free Software Foundation, either version 3 of the License, or
-- (at your option) any later version.
--
-- alf.io is distributed in the hope that it will be useful,
-- but WITHOUT ANY WARRANTY; without even the implied warranty of
-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-- GNU General Public License for more details.
--
-- You should have received a copy of the GNU General Public License
-- along with alf.io. If not, see <http://www.gnu.org/licenses/>.
--

alter table email_message add column attempts integer DEFAULT 0 NOT null;
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
--
-- This file is part of alf.io.
--
-- alf.io is free software: you can redistribute it and/or modify
-- it under the terms of the GNU General Public License as published by
-- the Free Software Foundation, either version 3 of the License, or
-- (at your option) any later version.
--
-- alf.io is distributed in the hope that it will be useful,
-- but WITHOUT ANY WARRANTY; without even the implied warranty of
-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-- GNU General Public License for more details.
--
-- You should have received a copy of the GNU General Public License
-- along with alf.io. If not, see <http://www.gnu.org/licenses/>.
--

alter table email_message add column attempts integer DEFAULT 0 NOT null;
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
--
-- This file is part of alf.io.
--
-- alf.io is free software: you can redistribute it and/or modify
-- it under the terms of the GNU General Public License as published by
-- the Free Software Foundation, either version 3 of the License, or
-- (at your option) any later version.
--
-- alf.io is distributed in the hope that it will be useful,
-- but WITHOUT ANY WARRANTY; without even the implied warranty of
-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-- GNU General Public License for more details.
--
-- You should have received a copy of the GNU General Public License
-- along with alf.io. If not, see <http://www.gnu.org/licenses/>.
--

alter table email_message add column attempts integer DEFAULT 0 NOT null;

0 comments on commit 64458f4

Please sign in to comment.