Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #148 from atlasapi/messaging-patches

Recent changes logger refactoring.
  • Loading branch information...
commit cb9d09c1778624ed8f7bc233090df4f348d73c35 2 parents 2cf62f5 + c858f05
@sbtourist sbtourist authored
View
34 src/main/java/org/atlasapi/messaging/WorkersModule.java
@@ -4,8 +4,7 @@
import javax.jms.ConnectionFactory;
import org.atlasapi.messaging.workers.CassandraReplicator;
-import org.atlasapi.messaging.workers.MongoRecentChangesStore;
-import org.atlasapi.messaging.workers.RecentChangesLog;
+import org.atlasapi.messaging.workers.RecentChangesLogger;
import org.atlasapi.persistence.content.ContentResolver;
import org.atlasapi.persistence.content.ContentWriter;
import org.springframework.beans.factory.annotation.Autowired;
@@ -18,7 +17,7 @@
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
-import com.metabroadcast.common.persistence.mongo.DatabasedMongo;
+import org.atlasapi.messaging.workers.RecentChangeStore;
/**
*
@@ -32,10 +31,10 @@
@Value("${messaging.consumers.replicator}")
private int replicatorConsumers;
- @Value("${messaging.destination.recent}")
- private String recentDestination;
- @Value("${messaging.consumers.recent}")
- private int recentConsumers;
+ @Value("${messaging.destination.logger}")
+ private String loggerDestination;
+ @Value("${messaging.consumers.logger}")
+ private int loggerConsumers;
@Value("${messaging.enabled}")
private boolean enabled;
@@ -48,9 +47,8 @@
private ContentWriter cassandraContentWriter;
@Autowired
private ContentResolver mongoContentResolver;
-
@Autowired
- private DatabasedMongo mongo;
+ private RecentChangeStore recentChangesStore;
@Bean
@Lazy(true)
@@ -70,21 +68,17 @@ public DefaultMessageListenerContainer cassandraReplicator() {
}
@Bean
- public RecentChangesLog recentChangesLog() {
- return new RecentChangesLog(new MongoRecentChangesStore(mongo));
- }
-
- @Bean
@Lazy(true)
- public DefaultMessageListenerContainer recentChangesLogListener() {
- MessageListenerAdapter adapter = new MessageListenerAdapter(recentChangesLog());
+ public DefaultMessageListenerContainer recentChangesLogger() {
+ RecentChangesLogger recentChangesLogger = new RecentChangesLogger(recentChangesStore);
+ MessageListenerAdapter adapter = new MessageListenerAdapter(recentChangesLogger);
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
adapter.setDefaultListenerMethod("onMessage");
container.setConnectionFactory(connectionFactory);
- container.setDestinationName(recentDestination);
- container.setConcurrentConsumers(recentConsumers);
- container.setMaxConcurrentConsumers(recentConsumers);
+ container.setDestinationName(loggerDestination);
+ container.setConcurrentConsumers(loggerConsumers);
+ container.setMaxConcurrentConsumers(loggerConsumers);
container.setMessageListener(adapter);
return container;
@@ -94,7 +88,7 @@ public DefaultMessageListenerContainer recentChangesLogListener() {
public void start() {
if (enabled) {
cassandraReplicator().start();
- recentChangesLogListener().start();
+ recentChangesLogger().start();
}
}
}
View
17 src/main/java/org/atlasapi/messaging/workers/RecentChangesLogger.java
@@ -0,0 +1,17 @@
+package org.atlasapi.messaging.workers;
+
+import org.atlasapi.persistence.messaging.event.EntityUpdatedEvent;
+
+public class RecentChangesLogger extends AbstractWorker {
+
+ private final RecentChangeStore store;
+
+ public RecentChangesLogger(RecentChangeStore store) {
+ this.store = store;
+ }
+
+ @Override
+ public void process(EntityUpdatedEvent command) {
+ store.logChange(command);
+ }
+}
View
58 src/main/java/org/atlasapi/persistence/AtlasPersistenceModule.java
@@ -33,11 +33,21 @@
import org.springframework.jms.core.JmsTemplate;
import com.metabroadcast.common.ids.IdGeneratorBuilder;
+import com.metabroadcast.common.persistence.mongo.DatabasedMongo;
import javax.annotation.Resource;
+import org.atlasapi.messaging.workers.MongoRecentChangesStore;
+import org.atlasapi.messaging.workers.RecentChangeStore;
@Configuration @Primary
-public class AtlasPersistenceModule implements ContentPersistenceModule {
-
+public class AtlasPersistenceModule {
+
+ // TODO: I don't really like this and the recentChangesStore() method below,
+ // as this configuration should only wire other configurations to be used into Atlas,
+ // so let's move these outside later.
+ @Autowired
+ private DatabasedMongo mongo;
+ //
+
@Autowired @Qualifier("base")
private ContentPersistenceModule delegate;
@Autowired
@@ -56,17 +66,17 @@ public AtlasPersistenceModule(ContentPersistenceModule delegate, JmsTemplate cha
this.idGeneratorBuilder = idGeneratorBuilder;
}
- @Override @Bean
+ @Bean
public ContentGroupWriter contentGroupWriter() {
return delegate.contentGroupWriter();
}
- @Override @Bean
+ @Bean
public ContentGroupResolver contentGroupResolver() {
return delegate.contentGroupResolver();
}
- @Override @Bean
+ @Bean
public ContentWriter contentWriter() {
ContentWriter contentWriter = delegate.contentWriter();
contentWriter = new EquivalenceWritingContentWriter(contentWriter, lookupStore());
@@ -77,84 +87,88 @@ public ContentWriter contentWriter() {
return contentWriter;
}
- @Override @Bean
+ @Bean
public ItemsPeopleWriter itemsPeopleWriter() {
return delegate.itemsPeopleWriter();
}
- @Override @Bean @Primary
+ @Bean @Primary
public ContentResolver contentResolver() {
return delegate.contentResolver();
}
- @Override @Bean
+ @Bean
public TopicStore topicStore() {
return delegate.topicStore();
}
- @Override @Bean
+ @Bean
public TopicQueryResolver topicQueryResolver() {
return delegate.topicQueryResolver();
}
- @Override @Bean
+ @Bean
public ShortUrlSaver shortUrlSaver() {
return delegate.shortUrlSaver();
}
- @Override @Bean
+ @Bean
public SegmentWriter segmentWriter() {
return new IdSettingSegmentWriter(delegate.segmentWriter(), segmentResolver(), idGeneratorBuilder.generator("segment"));
}
- @Override @Bean
+ @Bean
public SegmentResolver segmentResolver() {
return delegate.segmentResolver();
}
- @Override @Bean
+ @Bean
public ProductStore productStore() {
return new IdSettingProductStore(delegate.productStore(), idGeneratorBuilder.generator("product"));
}
- @Override @Bean
+ @Bean
public ProductResolver productResolver() {
return delegate.productResolver();
}
- @Override @Bean
+ @Bean
public LookupEntryStore lookupStore() {
return delegate.lookupStore();
}
- @Override @Bean
+ @Bean
public ChannelResolver channelResolver() {
return delegate.channelResolver();
}
- @Override @Bean
+ @Bean
public ScheduleResolver scheduleResolver() {
return delegate.scheduleResolver();
}
- @Override @Bean
+ @Bean
public ScheduleWriter scheduleWriter() {
return delegate.scheduleWriter();
}
- @Override @Bean
+ @Bean
public KnownTypeContentResolver knownTypeContentResolver() {
return delegate.knownTypeContentResolver();
}
- @Override @Bean @Primary
+ @Bean @Primary
public LastUpdatedContentFinder lastUpdatedContentFinder() {
return delegate.lastUpdatedContentFinder();
}
- @Override @Bean
+ @Bean
public TopicContentLister topicContentLister() {
return delegate.topicContentLister();
}
-
+
+ @Bean
+ public RecentChangeStore recentChangesStore() {
+ return new MongoRecentChangesStore(mongo);
+ }
}
View
19 .../atlasapi/messaging/workers/RecentChangesLog.java → .../org/atlasapi/system/RecentChangesController.java
@@ -1,4 +1,4 @@
-package org.atlasapi.messaging.workers;
+package org.atlasapi.system;
import java.io.IOException;
@@ -15,36 +15,27 @@
import com.google.common.collect.ImmutableSet;
import com.metabroadcast.common.query.Selection;
import com.metabroadcast.common.query.Selection.SelectionBuilder;
+import org.atlasapi.messaging.workers.RecentChangeStore;
@Controller
-public class RecentChangesLog extends AbstractWorker {
+public class RecentChangesController {
-
private final RecentChangeStore store;
private final JsonTranslator<Iterable<EntityUpdatedEvent>> translator;
private final ApplicationConfiguration configuration;
private final SelectionBuilder selectionBuilder;
- public RecentChangesLog(RecentChangeStore store) {
+ public RecentChangesController(RecentChangeStore store) {
this.store = store;
this.translator = new JsonTranslator<Iterable<EntityUpdatedEvent>>();
this.configuration = ApplicationConfiguration.DEFAULT_CONFIGURATION;
- this.selectionBuilder = Selection.builder().withDefaultLimit(30)
- .withMaxLimit(100);
- }
-
- @Override
- public void process(EntityUpdatedEvent command) {
- store.logChange(command);
+ this.selectionBuilder = Selection.builder().withDefaultLimit(30).withMaxLimit(100);
}
@RequestMapping("system/update/changes")
public void listChanges(HttpServletRequest req, HttpServletResponse resp) throws IOException {
-
Iterable<EntityUpdatedEvent> model = store.changes();
model = selectionBuilder.build(req).apply(model);
translator.writeTo(req, resp, model, ImmutableSet.<Annotation>of(), configuration);
-
}
-
}
View
4 src/main/resources/config/environment.properties
@@ -167,6 +167,6 @@ messaging.broker.url=
messaging.destination.changes=
messaging.destination.replicator=
messaging.consumers.replicator=
-messaging.destination.recent=
-messaging.consumers.recent=
+messaging.destination.logger=
+messaging.consumers.logger=
messaging.enabled=false
View
4 src/main/resources/config/environments/dev.properties
@@ -10,8 +10,8 @@ messaging.broker.url=vm://localhost
messaging.destination.changes=VirtualTopic.Changes
messaging.destination.replicator=Consumer.Replicator.VirtualTopic.Changes
messaging.consumers.replicator=1
-messaging.destination.recent=Consumer.Recent.VirtualTopic.Changes
-messaging.consumers.recent=1
+messaging.destination.logger=Consumer.Logger.VirtualTopic.Changes
+messaging.consumers.logger=1
messaging.enabled=true
cassandra.seeds=cassandra1.owl.atlas.mbst.tv,cassandra2.owl.atlas.mbst.tv,cassandra3.owl.atlas.mbst.tv
View
2  src/test/java/org/atlasapi/persistence/AtlasContentPersistenceIntegrationTest.java
@@ -43,7 +43,7 @@ public IdGenerator generator(String sequenceIdentifier) {
public void test() {
MongoContentPersistenceModule delegate = new MongoContentPersistenceModule(db);
- ContentPersistenceModule module = new AtlasPersistenceModule(delegate, jmsTemplate, idGeneratorBuilder);
+ AtlasPersistenceModule module = new AtlasPersistenceModule(delegate, jmsTemplate, idGeneratorBuilder);
ContentWriter contentWriter = module.contentWriter();
ContentResolver contentResolver = module.contentResolver();
Please sign in to comment.
Something went wrong with that request. Please try again.