Permalink
Browse files

CQRS Implementation for MongoDB

  • Loading branch information...
1 parent 0024adf commit fd31406f06fafc2f0c715afa0ec2c17b4e468a8e @amir343 committed Sep 28, 2010
@@ -0,0 +1,15 @@
+package com.amirmoulavi.watchthatpage.mongo;
+
+/**
+ *
+ * @author Amir Moulavi
+ * @date 2010-09-28
+ * @since 0.0.1
+ *
+ */
+
+public interface MongoCommand {
+
+ void update(String page, String value);
+
+}
@@ -0,0 +1,61 @@
+package com.amirmoulavi.watchthatpage.mongo;
+
+import java.util.Calendar;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.BasicDBObjectBuilder;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+
+/**
+ *
+ * @author Amir Moulavi
+ * @date 2010-09-28
+ * @since 0.0.1
+ *
+ */
+
+public class MongoCommandImpl implements MongoCommand {
+
+ private static MongoCommandImpl instance = new MongoCommandImpl();
+
+ public static MongoCommand getInstance() {
+ return instance;
+ }
+
+ private DBCollection pages;
+
+ private MongoCommandImpl() {
+ MongoConnection connection = MongoConnectionFactory.getMasterConnection();
+ pages = connection.getPages();
+ }
+
+ @Override
+ public void update(String page, String value) {
+ DBObject doc = get(page);
+ if (StringUtils.isNotBlank(value)) {
+ doc.put("value", value);
+ }
+ doc.put("last_changed", Calendar.getInstance().getTime());
+ pages.save(doc);
+ }
+
+ @SuppressWarnings("static-access")
+ private DBObject get(String page) {
+ DBObject q = new BasicDBObjectBuilder().start().add("url", page).get();
+ DBCursor res = pages.find(q);
+ DBObject doc;
+ if (res.hasNext()) {
+ doc = res.next();
+ } else {
+ doc = new BasicDBObject();
+ doc.put("url", page);
+ }
+ return doc;
+ }
+
+
+}
@@ -0,0 +1,32 @@
+package com.amirmoulavi.watchthatpage.mongo;
+
+import com.mongodb.DBCollection;
+import com.mongodb.Mongo;
+
+/**
+ *
+ * @author Amir Moulavi
+ * @date 2010-09-28
+ * @since 0.0.1
+ *
+ */
+
+public class MongoConnection {
+
+ private Mongo mongo;
+ private DBCollection pages;
+
+ public Mongo getMongo() {
+ return mongo;
+ }
+ public void setMongo(Mongo mongo) {
+ this.mongo = mongo;
+ }
+ public DBCollection getPages() {
+ return pages;
+ }
+ public void setPages(DBCollection pages) {
+ this.pages = pages;
+ }
+
+}
@@ -0,0 +1,75 @@
+package com.amirmoulavi.watchthatpage.mongo;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.UnknownHostException;
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+
+import com.amirmoulavi.watchthatpage.resource.ClassResourceLocator;
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.Mongo;
+import com.mongodb.MongoException;
+
+/**
+ *
+ * @author Amir Moulavi
+ * @date 2010-09-28
+ * @since 0.0.1
+ *
+ */
+
+/**
+ * TODO I hate using static like this!
+ * This needs to be changed sooner or later
+ */
+
+public class MongoConnectionFactory {
+
+ private static Logger log = Logger.getLogger(MongoConnectionFactory.class);
+ private static String coll;
+ private static String db;
+ private static String port;
+ private static String server;
+
+ static {
+ Properties property = new Properties();
+ InputStream is = ClassResourceLocator.getResourceAsStream("mongo.properties");
+ try {
+ property.load(is);
+ } catch (IOException e) {
+ log.error(e.getMessage());
+ }
+ server = property.getProperty("server");
+ port = property.getProperty("port");
+ db = property.getProperty("db");
+ coll = property.getProperty("wtp_collection");
+
+ }
+
+ public static MongoConnection getMasterConnection() {
+ MongoConnection conn = new MongoConnection();
+ try {
+ Mongo mongo = new Mongo(server, Integer.parseInt(port));
+ DB db_wtp = mongo.getDB(db);
+ DBCollection pages = db_wtp.getCollection(coll);
+ conn.setMongo(mongo);
+ conn.setPages(pages);
+
+ } catch (UnknownHostException e) {
+ log.error(e.getMessage());
+ } catch (MongoException e) {
+ log.error(e.getMessage());
+ }
+ return conn;
+ }
+
+ public static MongoConnection getSlaveConnection() {
+ // TODO This method deserves its own implementation. Simply a new field in property file
+ return getMasterConnection();
+ }
+
+
+}
@@ -0,0 +1,17 @@
+package com.amirmoulavi.watchthatpage.mongo;
+
+/**
+ *
+ * @author Amir Moulavi
+ * @date 2010-09-28
+ * @since 0.0.1
+ *
+ */
+
+public interface MongoDAO {
+
+ void update(String page, String value);
+
+ boolean changed(String page, String newValue);
+
+}
@@ -0,0 +1,34 @@
+package com.amirmoulavi.watchthatpage.mongo;
+
+
+/**
+ *
+ * @author Amir Moulavi
+ * @date 2010-08-19
+ * @since 0.0.1
+ *
+ */
+
+public class MongoDAOImpl implements MongoDAO {
+
+ private static MongoDAOImpl instance = new MongoDAOImpl();
+ private MongoCommand command = MongoCommandImpl.getInstance();
+ private MongoQuery query = MongoQueryImpl.getInstance();
+
+ public static MongoDAOImpl getInstance() {
+ return instance;
+ }
+
+ @Override
+ public void update(String page, String value) {
+ command.update(page, value);
+ }
+
+
+ @Override
+ public boolean changed(String page, String newValue) {
+ return query.changed(page, newValue);
+ }
+
+
+}
@@ -0,0 +1,15 @@
+package com.amirmoulavi.watchthatpage.mongo;
+
+/**
+ *
+ * @author Amir Moulavi
+ * @date 2010-09-28
+ * @since 0.0.1
+ *
+ */
+
+public interface MongoQuery {
+
+ boolean changed(String page, String newValue);
+
+}
@@ -0,0 +1,59 @@
+package com.amirmoulavi.watchthatpage.mongo;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.BasicDBObjectBuilder;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+
+/**
+ *
+ * @author Amir Moulavi
+ * @date 2010-09-28
+ * @since 0.0.1
+ *
+ */
+
+public class MongoQueryImpl implements MongoQuery {
+
+ private static MongoQueryImpl instance = new MongoQueryImpl();
+
+ public static MongoQueryImpl getInstance() {
+ return instance;
+ }
+
+ private DBCollection pages;
+
+ private MongoQueryImpl() {
+ MongoConnection mongo = MongoConnectionFactory.getSlaveConnection();
+ pages = mongo.getPages();
+ }
+
+ @Override
+ public boolean changed(String page, String newValue) {
+ DBObject doc = get(page);
+ String value = (String) doc.get("value");
+ if (!StringUtils.equals(value, newValue)) {
+ return true;
+ }
+ return false;
+
+ }
+
+ @SuppressWarnings("static-access")
+ private DBObject get(String page) {
+ DBObject q = new BasicDBObjectBuilder().start().add("url", page).get();
+ DBCursor res = pages.find(q);
+ DBObject doc;
+ if (res.hasNext()) {
+ doc = res.next();
+ } else {
+ doc = new BasicDBObject();
+ doc.put("url", page);
+ }
+ return doc;
+ }
+
+}
@@ -17,7 +17,7 @@
import org.quartz.JobExecutionException;
import com.amirmoulavi.watchthatpage.mail.MailService;
-import com.amirmoulavi.watchthatpage.mongo.MongoDBHanlder;
+import com.amirmoulavi.watchthatpage.mongo.MongoDAOImpl;
import com.amirmoulavi.watchthatpage.resource.ClassResourceLocator;
import com.amirmoulavi.watchthatpage.security.MessageDigesterBehavior;
import com.amirmoulavi.watchthatpage.security.MessageDigesterMixin;
@@ -35,7 +35,7 @@
PageTrackerState state;
private static Logger log = Logger.getLogger(PageTrackerMixin.class);
- private MongoDBHanlder mongo = MongoDBHanlder.getInstance();
+ private MongoDAOImpl mongo = MongoDAOImpl.getInstance();
private MailService mailService = MailService.getInstance();
private List<String> list = new ArrayList<String>();
@@ -91,11 +91,7 @@ private void retrievePage(List<String> changedPages, String page) {
try {
int result = client.executeMethod(get);
if (result == 200) {
- String returned_page = get.getResponseBodyAsString();
- if (mongo.changed(page, messageDigester.MD5(returned_page))) {
- changedPages.add(page);
- log.info("******** Page has changed ********");
- }
+ handlePageChange(changedPages, page, get);
}
} catch (HttpException e) {
log.error(e.getMessage());
@@ -105,9 +101,20 @@ private void retrievePage(List<String> changedPages, String page) {
get.releaseConnection();
}
}
+
+
+ private void handlePageChange(List<String> changedPages, String page, HttpMethod get) throws IOException {
+ String returned_page = get.getResponseBodyAsString();
+ String digested = messageDigester.MD5(returned_page);
+ if (mongo.changed(page, digested)) {
+ mongo.update(page, digested);
+ changedPages.add(page);
+ log.info("******** Page has changed ********");
+ }
+ }
/* INJECTION METHODS */
- public void setMongo(MongoDBHanlder mongo) {
+ public void setMongo(MongoDAOImpl mongo) {
this.mongo = mongo;
}

0 comments on commit fd31406

Please sign in to comment.