Skip to content

Commit

Permalink
Added entity to store message idempotent store, basically to check if…
Browse files Browse the repository at this point in the history
… given messageId is already processed or not, if yes then are there any errors. Also fixed minor issues in TransactionReaderCallback w.r.to rawdata sequence key
  • Loading branch information
Jayeshecs committed Apr 23, 2020
1 parent 0ab7c8f commit 9b799ba
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

CREATE TABLE "statements"."MessageIdempotentStore"("id" BIGINT GENERATED BY DEFAULT AS IDENTITY(START WITH 0) NOT NULL,"error" VARCHAR(2000),"messageId" VARCHAR(1000) NOT NULL,CONSTRAINT "MessageIdempotentStore_PK" PRIMARY KEY("id"),CONSTRAINT "MessageIdempotentStore_messageId_UNQ" UNIQUE("messageId"));
ALTER TABLE "statements"."MessageIdempotentStore" ALTER COLUMN "id" RESTART WITH 0;

CREATE INDEX "MessageIdempotentStore_N52" ON "statements"."MessageIdempotentStore"("messageId");
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package domainapp.modules.rdr.dom;

import javax.jdo.annotations.IdentityType;
import javax.jdo.annotations.VersionStrategy;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;

import org.apache.isis.applib.annotation.Auditing;
import org.apache.isis.applib.annotation.CommandReification;
import org.apache.isis.applib.annotation.DomainObject;
import org.apache.isis.applib.annotation.Editing;
import org.apache.isis.applib.annotation.MemberOrder;
import org.apache.isis.applib.annotation.Property;
import org.apache.isis.applib.annotation.PropertyLayout;
import org.apache.isis.applib.annotation.Publishing;
import org.apache.isis.applib.annotation.Title;
import org.apache.isis.applib.annotation.Where;
import org.apache.isis.schema.utils.jaxbadapters.PersistentEntityAdapter;

import domainapp.modules.base.entity.NamedQueryConstants;
import domainapp.modules.base.entity.WithDescription;
import domainapp.modules.base.entity.WithName;
import domainapp.modules.ref.StaticModule.ActionDomainEvent;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@javax.jdo.annotations.PersistenceCapable(
identityType=IdentityType.DATASTORE,
schema = "statements"
)
@javax.jdo.annotations.DatastoreIdentity(
strategy=javax.jdo.annotations.IdGeneratorStrategy.IDENTITY,
column="id")
@javax.jdo.annotations.Queries({
@javax.jdo.annotations.Query(
name = NamedQueryConstants.QUERY_ALL,
value = "SELECT "
+ "FROM domainapp.modules.rdr.dom.MessageIdempotentStore "),
@javax.jdo.annotations.Query(
name = MessageIdempotentStore.QUERY_FIND_BY_MESSAGE_ID,
value = "SELECT "
+ "FROM domainapp.modules.rdr.dom.MessageIdempotentStore "
+ "WHERE messageId.indexOf(:messageId) >= 0 ")
})
@javax.jdo.annotations.Unique(name="MessageIdempotentStore_messageId_UNQ", members = {"messageId"})
@DomainObject(
auditing = Auditing.ENABLED,
publishing = Publishing.ENABLED,
objectType = "reader.MessageIdempotentStore",
bounded = true
) // objectType inferred from @PersistenceCapable#schema
@XmlJavaTypeAdapter(PersistentEntityAdapter.class)
@EqualsAndHashCode(of = {"messageId"})
@ToString(of = {"messageId"})
public class MessageIdempotentStore implements Comparable<MessageIdempotentStore> {

public static final String QUERY_FIND_BY_MESSAGE_ID = "findByMessageId"; //$NON-NLS-1$

@javax.jdo.annotations.Column(allowsNull = "false", length = 1000)
@Property(editing = Editing.DISABLED)
@Getter @Setter
private String messageId;

@javax.jdo.annotations.Column(length = 2000)
@Property(editing = Editing.DISABLED)
@Getter @Setter
private String error;

@Builder
public MessageIdempotentStore(final String messageId, final String error) {
setMessageId(messageId);
setError(error);
}

public MessageIdempotentStore(MessageIdempotentStore messageIdempotentStore) {
this(
messageIdempotentStore.getMessageId(),
messageIdempotentStore.getError()
);
}

public static class CreateEvent extends ActionDomainEvent<MessageIdempotentStore> {
private static final long serialVersionUID = 1L;
}

public static class UpdateEvent extends ActionDomainEvent<MessageIdempotentStore> {
private static final long serialVersionUID = 1L;
}

@Override
public int compareTo(final MessageIdempotentStore other) {
if (other == null) {
return -1;
}
return getMessageId().compareTo(other.getMessageId());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
*
*/
package domainapp.modules.rdr.service;

import java.util.List;

import org.apache.isis.applib.annotation.DomainService;
import org.apache.isis.applib.annotation.NatureOfService;
import org.apache.isis.applib.annotation.Programmatic;

import domainapp.modules.base.entity.NamedQueryConstants;
import domainapp.modules.base.service.AbstractService;
import domainapp.modules.rdr.dom.MessageIdempotentStore;

/**
* Idempotent store for message ids<br>
* Optionally error can also be stored indicating any processing error for given message id.
*
* @author jayeshecs
*/
@DomainService(
nature = NatureOfService.DOMAIN,
repositoryFor = MessageIdempotentStore.class
)
public class MessageIdempotentStoreService extends AbstractService<MessageIdempotentStore> {

public MessageIdempotentStoreService() {
super(MessageIdempotentStore.class);
}

@Programmatic
public List<MessageIdempotentStore> all() {
return search(NamedQueryConstants.QUERY_ALL);
}

@Programmatic
public MessageIdempotentStore create(String messageId, String error) {
MessageIdempotentStore newMessageIdempotentStore = MessageIdempotentStore.builder().messageId(messageId).error(error).build();
MessageIdempotentStore MessageIdempotentStore = repositoryService.persistAndFlush(newMessageIdempotentStore);
return MessageIdempotentStore;
}

@Programmatic
public MessageIdempotentStore find(String messageId) {
List<MessageIdempotentStore> list = search(MessageIdempotentStore.QUERY_FIND_BY_MESSAGE_ID, "messageId", messageId);
if (list == null || list.isEmpty()) {
return null; // nothing found
}
for (MessageIdempotentStore store : list) {
if (store.getMessageId().equals(messageId)) {
return store; // found
}
}
return null; // exact match not found
}

@Programmatic
public void clear(String messageId) {
List<MessageIdempotentStore> list = search(MessageIdempotentStore.QUERY_FIND_BY_MESSAGE_ID, "messageId", messageId);
if (list == null || list.isEmpty()) {
return ; // nothing found
}
delete(list);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ public void process(IStatementReaderContext context, Collection<IStatementRecord
context.addFilteredCount(1);
continue;
}
Integer rawdataSequenceCount = 1;
int rawdataSequenceCount = 1;
String rawdata = record.get(Field.RAWDATA);
String rawdataKey = statementSource.getName() + rawdata + rawdataSequenceCount;
while (transactionByRawdataKey.get(rawdataKey) != null) {
rawdataSequenceCount++;
rawdataKey = statementSource.getName() + rawdata + rawdataSequenceCount;
}
if (rawdata != null && !rawdata.trim().isEmpty()) {
List<Transaction> result = transactionService.search(Transaction.QUERY_FIND_BY_RAWDATA, Transaction.FieldConstants.SOURCE, statementSource,Transaction.FieldConstants.RAWDATA, rawdata, Transaction.FieldConstants.RAWDATA_SEQUENCE, rawdataSequenceCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@
import domainapp.modules.rdr.addon.StatementReaderContext;
import domainapp.modules.rdr.api.IStatementReader;
import domainapp.modules.rdr.dom.MailConnectionProfile;
import domainapp.modules.rdr.dom.MessageIdempotentStore;
import domainapp.modules.rdr.dom.StatementReader;
import domainapp.modules.rdr.service.MailConnection;
import domainapp.modules.rdr.service.MailConnectionProfileService;
import domainapp.modules.rdr.service.MessageIdempotentStoreService;
import domainapp.modules.ref.dom.Category;
import domainapp.modules.ref.dom.StatementSourceType;
import domainapp.modules.ref.dom.SubCategory;
Expand Down Expand Up @@ -645,46 +647,78 @@ public ManageTransactionDashboard loadStatementFromMail(
) {
final StringBuilder tracker = new StringBuilder();
MailConnection connection = null;
MessageIdempotentStore storeEntry = null;
try {
MailConnectionProfile mailConnectionProfile = mailStatementProfile.getMailConnectionProfile();

tracker.append("Connecting mailbox...");
tracker.append("\nConnecting mailbox...");
connection = mailConnectionProfileService.getMailConnection(mailConnectionProfile);

tracker.append("Searching mail...");
tracker.append("\nSearching mail...");
Message[] messages = connection.search(mailStatementProfile.getFolderName(), mailStatementProfile.getSubjectWords(), mailStatementProfile.getFromAddress(), true);

tracker.append("Messages found - " + messages.length);
tracker.append("Iterating messages...");
tracker.append("\nMessages found - " + messages.length);
tracker.append("\nIterating messages...");
for (Message message : messages) {

tracker.append("Checking message - " + message.getMessageNumber() + " (" + message.getHeader("Message-ID") + ")");
storeEntry = null;
String messageId = Arrays.asList(message.getHeader("Message-ID")).toString();
tracker.append("\nChecking message - " + message.getMessageNumber() + " (" + messageId + ")");
log.info("Checking message - " + message.getMessageNumber() + " (" + messageId + ")");
storeEntry = messageIdempotentStoreService.find(messageId);
if (storeEntry != null && storeEntry.getError() == null) {
tracker.append("\nAlready processed previously without any error and hence skipping");
continue ;
}
if (storeEntry != null) {
// clear error and process again
tracker.append("\nAlready processed previously with any error and hence trying to process again");
messageIdempotentStoreService.clear(messageId);
}
Object content = message.getContent();
if (!(content instanceof Multipart)) {

tracker.append("Skipping message " + message.getSubject() + " because it does not have attachment");
// Create new entry for message without attachment
log.info("Created new store entry for message without attachment. Message ID: " + storeEntry.getMessageId());
storeEntry = messageIdempotentStoreService.create(messageId, null);
tracker.append("\nSkipping message " + message.getSubject() + " because it does not have attachment");
continue;
}
boolean attachmentFound = false;
Multipart partMessage = (Multipart)content;
for (int i = 0; i < partMessage.getCount(); ++i) {
BodyPart bodyPart = partMessage.getBodyPart(i);
if (!Part.ATTACHMENT.equalsIgnoreCase(bodyPart.getDisposition()) || !bodyPart.getFileName().matches(mailStatementProfile.getFileNamePattern())) {

tracker.append("Skipping attachment " + bodyPart.getFileName() + " because it's not matching with filename pattern - " + mailStatementProfile.getFileNamePattern());
if (!Part.ATTACHMENT.equalsIgnoreCase(bodyPart.getDisposition())) {
log.info("Skipping attachment " + bodyPart.getFileName() + " because it's not matching with filename pattern - " + mailStatementProfile.getFileNamePattern());
continue ;
}
if (!bodyPart.getFileName().matches(mailStatementProfile.getFileNamePattern())) {
log.info("Attachment filename '" + bodyPart.getFileName() + "' does not match with filename pattern '" + mailStatementProfile.getFileNamePattern() + "'");
continue ;
}
attachmentFound = true;
Path path = Files.createTempFile(null, bodyPart.getFileName());

tracker.append("Saving attachment at " + path.toFile().getPath());
tracker.append("\nSaving attachment at " + path.toFile().getPath());
log.info("Saving attachment at " + path.toFile().getPath());
Files.copy(bodyPart.getInputStream(), path, StandardCopyOption.REPLACE_EXISTING);

tracker.append("Loading saved attachment as statement... ");
tracker.append("\nLoading saved attachment as statement... ");
log.info("Loading saved attachment as statement... ");
loadStatement(mailStatementProfile.getSource(), mailStatementProfile.getReader(), path.toFile().getName(), new FileInputStream(path.toFile()));
}
if (attachmentFound) {
// Create new entry
log.info("Created new store entry for message processed correctly. Message ID: " + messageId);
storeEntry = messageIdempotentStoreService.create(messageId, null);
}
}
} catch (Exception e) {
log.error("Exception occurred - " + e.getMessage() + ". Below is tracker:\n" + tracker.toString(), e);
messageService.raiseError("Exception occurred - " + e.getMessage() + ". Below is tracker:\n" + tracker.toString());
if (storeEntry != null) {
// update entry with error
storeEntry.setError("Exception occurred - " + e.getMessage());
messageIdempotentStoreService.save(Arrays.asList(storeEntry));
}
} finally {
if (connection != null) {
connection.closeAll();
Expand Down Expand Up @@ -765,6 +799,9 @@ public ManageTransactionDashboard createMailStatementProfile(
return this;
}

@Inject
MessageIdempotentStoreService messageIdempotentStoreService;

@Inject
MailConnectionProfileService mailConnectionProfileService;

Expand Down
25 changes: 16 additions & 9 deletions start.sh
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
#!/bin/sh
echo copying standalong statements.war file...

echo "Copying standalong statements.war file..."

cp webapp/target/statements-webapp-*-jetty*.war ./statements.war
echo Start statements application ...
if [ "$1" == "headless" ]
then
nohup java -jar statements.war --headless 2>&1 > logs/console.log &
else
nohup java -jar statements.war 2>&1 > logs/console.log &

echo "Start statements application ..."

JETTY_CONSOLE_OPTIONS="--port 8080"

if [ "$1" = "headless" ]; then
JETTY_CONSOLE_OPTIONS="$JETTY_CONSOLE_OPTIONS --headless"
fi
echo To see progress make use of below command
echo tail -f logs/console.log
echo "Running command 'nohup java -jar statements.war $JETTY_CONSOLE_OPTIONS 2>&1 > logs/console.log &' ..."
nohup java -jar statements.war $JETTY_CONSOLE_OPTIONS 2>&1 > logs/console.log &

echo "To see progress make use of below command"
echo "tail -f logs/console.log"



0 comments on commit 9b799ba

Please sign in to comment.