Skip to content

Commit

Permalink
HSEARCH-626 update Backend interface, move async wrapping to Lucene b…
Browse files Browse the repository at this point in the history
…ackend only
  • Loading branch information
Sanne committed Aug 12, 2011
1 parent 96c1911 commit 2280f77
Show file tree
Hide file tree
Showing 16 changed files with 168 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.hibernate.search.backend.impl.jgroups.SlaveJGroupsBackendQueueProcessorFactory;
import org.hibernate.search.backend.impl.jms.JMSBackendQueueProcessorFactory;
import org.hibernate.search.backend.impl.lucene.LuceneBackendQueueProcessorFactory;
import org.hibernate.search.backend.spi.BackendQueueProcessorFactory;
import org.hibernate.search.backend.spi.BackendQueueProcessor;
import org.hibernate.search.batchindexing.impl.Executors;
import org.hibernate.search.indexes.serialization.codex.avro.impl.AvroSerializationProvider;
import org.hibernate.search.indexes.serialization.codex.impl.PluggableSerializationLuceneWorkSerializer;
Expand All @@ -45,11 +45,11 @@
*/
public class BackendFactory {

public static BackendQueueProcessorFactory createBackend(IndexManager indexManager, WorkerBuildContext context, Properties properties) {
public static BackendQueueProcessor createBackend(IndexManager indexManager, WorkerBuildContext context, Properties properties) {

String backend = properties.getProperty( Environment.WORKER_BACKEND );

final BackendQueueProcessorFactory backendQueueProcessorFactory;
final BackendQueueProcessor backendQueueProcessorFactory;

if ( StringHelper.isEmpty( backend ) || "lucene".equalsIgnoreCase( backend ) ) {
backendQueueProcessorFactory = new LuceneBackendQueueProcessorFactory();
Expand All @@ -68,7 +68,7 @@ else if ( "jgroupsSlave".equals( backend ) ) {
}
else {
backendQueueProcessorFactory = ClassLoaderHelper.instanceFromName(
BackendQueueProcessorFactory.class,
BackendQueueProcessor.class,
backend, BackendFactory.class, "processor"
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

import org.hibernate.search.spi.WorkerBuildContext;
import org.hibernate.search.backend.LuceneWork;
import org.hibernate.search.backend.spi.BackendQueueProcessorFactory;
import org.hibernate.search.backend.spi.BackendQueueProcessor;
import org.hibernate.search.indexes.spi.IndexManager;
import org.hibernate.search.util.logging.impl.Log;
import org.hibernate.search.util.logging.impl.LoggerFactory;
Expand All @@ -42,33 +42,32 @@
*
* @author Sanne Grinovero
*/
public class BlackHoleBackendQueueProcessorFactory implements BackendQueueProcessorFactory {
public class BlackHoleBackendQueueProcessorFactory implements BackendQueueProcessor {

private static final Log log = LoggerFactory.make();

private final NoOp noOp = new NoOp();

public Runnable getProcessor(List<LuceneWork> queue) {
return noOp;
}

@Override
public void initialize(Properties props, WorkerBuildContext context, IndexManager indexManager) {
// no-op
log.initializedBlackholeBackend();
}

@Override
public void close() {
// no-op
log.closedBlackholeBackend();
}

private static class NoOp implements Runnable {
@Override
public void applyWork(List<LuceneWork> workList) {
// no-op
log.debug( "Discarding a list of LuceneWork" );
}

public void run() {
// no-op
log.debug( "Discarding a list of LuceneWork" );
}

@Override
public void applyStreamWork(LuceneWork singleOperation) {
// no-op
log.debug( "Discarding a sintgle LuceneWork" );
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,22 @@
* @author Lukasz Moren
* @author Sanne Grinovero <sanne@hibernate.org> (C) 2011 Red Hat Inc.
*/
public class JGroupsBackendQueueProcessor implements Runnable {
public class JGroupsBackendQueueProcessor {

private static final Log log = LoggerFactory.make();

private final JGroupsBackendQueueProcessorFactory factory;
private final List<LuceneWork> queue;
private final String indexName;
private final IndexManager indexManager;

public JGroupsBackendQueueProcessor(String indexName, List<LuceneWork> queue,
JGroupsBackendQueueProcessorFactory factory, IndexManager indexManager) {
public JGroupsBackendQueueProcessor(JGroupsBackendQueueProcessorFactory factory, IndexManager indexManager) {
this.factory = factory;
this.queue = queue;
this.indexName = indexName;
this.indexManager = indexManager;
this.indexName = indexManager.getIndexName();
}

@SuppressWarnings("unchecked")
public void run() {
public void sendLuceneWorkList(List<LuceneWork> queue) {
boolean trace = log.isTraceEnabled();
List<LuceneWork> filteredQueue = new ArrayList<LuceneWork>( queue );
if ( trace ) {
Expand All @@ -87,7 +84,7 @@ public void run() {
}
return;
}
byte[] data = indexManager.getSerializer().toSerializedModel( queue );
byte[] data = indexManager.getSerializer().toSerializedModel( filteredQueue );
BackendMessage toSend = new BackendMessage( indexName, data );

/* Creates and send message with lucene works to master.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
package org.hibernate.search.backend.impl.jgroups;

import java.net.URL;
import java.util.List;
import java.util.Properties;

import org.jgroups.Address;
Expand All @@ -33,11 +32,10 @@
import org.jgroups.JChannel;

import org.hibernate.search.Environment;
import org.hibernate.search.backend.spi.BackendQueueProcessorFactory;
import org.hibernate.search.backend.spi.BackendQueueProcessor;
import org.hibernate.search.indexes.spi.IndexManager;
import org.hibernate.search.spi.WorkerBuildContext;
import org.hibernate.search.SearchException;
import org.hibernate.search.backend.LuceneWork;
import org.hibernate.search.util.configuration.impl.ConfigurationParseHelper;
import org.hibernate.search.util.impl.JGroupsHelper;
import org.hibernate.search.util.impl.XMLHelper;
Expand All @@ -49,7 +47,7 @@
*
* @author Lukasz Moren
*/
public abstract class JGroupsBackendQueueProcessorFactory implements BackendQueueProcessorFactory {
public abstract class JGroupsBackendQueueProcessorFactory implements BackendQueueProcessor {

private static final Log log = LoggerFactory.make();

Expand Down Expand Up @@ -159,8 +157,6 @@ private void buildChannel(Properties props) {
}
}

public abstract Runnable getProcessor(List<LuceneWork> queue);

public void close() {
try {
if ( channel != null && channel.isOpen() ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ public void initialize(Properties props, WorkerBuildContext context, IndexManage
registerMasterListener( context.getUninitializedSearchFactory() );
}

public Runnable getProcessor(List<LuceneWork> queue) {
return luceneBackendQueueProcessorFactory.getProcessor( queue );
}

private void registerMasterListener(SearchFactoryImplementor searchFactory) {
//register JGroups receiver in master node to get Lucene docs from slave nodes
masterListener = new JGroupsMasterMessageListener( searchFactory );
Expand All @@ -79,4 +75,14 @@ public void close() {
super.close();
luceneBackendQueueProcessorFactory.close();
}

@Override
public void applyWork(List<LuceneWork> workList) {
luceneBackendQueueProcessorFactory.applyWork( workList );
}

@Override
public void applyStreamWork(LuceneWork singleOperation) {
luceneBackendQueueProcessorFactory.applyStreamWork( singleOperation );
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* Copyright (c) 2010, Red Hat, Inc. and/or its affiliates or third-party contributors as
* Copyright (c) 2011, Red Hat, Inc. and/or its affiliates or third-party contributors as
* indicated by the @author tags or express copyright attribution
* statements applied by the authors. All third-party contributions are
* distributed under license by Red Hat, Inc.
Expand All @@ -23,16 +23,36 @@
*/
package org.hibernate.search.backend.impl.jgroups;

import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.hibernate.search.backend.LuceneWork;
import org.hibernate.search.indexes.spi.IndexManager;
import org.hibernate.search.spi.WorkerBuildContext;

/**
* @author Lukasz Moren
* @author Sanne Grinovero <sanne@hibernate.org> (C) 2011 Red Hat Inc.
*/
public class SlaveJGroupsBackendQueueProcessorFactory extends JGroupsBackendQueueProcessorFactory {

private JGroupsBackendQueueProcessor jgroupsProcessor;

public Runnable getProcessor(List<LuceneWork> queue) {
return new JGroupsBackendQueueProcessor( indexName, queue, this, indexManager );
@Override
public void initialize(Properties props, WorkerBuildContext context, IndexManager indexManager) {
super.initialize( props, context, indexManager );
jgroupsProcessor = new JGroupsBackendQueueProcessor( this, indexManager );
}

@Override
public void applyWork(List<LuceneWork> workList) {
jgroupsProcessor.sendLuceneWorkList( workList );
}

@Override
public void applyStreamWork(LuceneWork singleOperation) {
//TODO optimize for single operation?
jgroupsProcessor.sendLuceneWorkList( Collections.singletonList( singleOperation ) );
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package org.hibernate.search.backend.impl.jms;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
Expand All @@ -48,12 +49,12 @@ public class JMSBackendQueueProcessor implements Runnable {

private static final Log log = LoggerFactory.make();

private final List<LuceneWork> queue;
private final Collection<LuceneWork> queue;
private final JMSBackendQueueProcessorFactory factory;
private final String indexName;
private final IndexManager indexManager;

public JMSBackendQueueProcessor(String indexName, List<LuceneWork> queue, IndexManager indexManager,
public JMSBackendQueueProcessor(String indexName, Collection<LuceneWork> queue, IndexManager indexManager,
JMSBackendQueueProcessorFactory jmsBackendQueueProcessorFactory) {
this.indexName = indexName;
this.queue = queue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
*/
package org.hibernate.search.backend.impl.jms;

import java.util.Collections;
import java.util.List;
import java.util.Properties;
import javax.jms.Queue;
Expand All @@ -33,7 +34,7 @@
import org.hibernate.search.Environment;
import org.hibernate.search.SearchException;
import org.hibernate.search.backend.LuceneWork;
import org.hibernate.search.backend.spi.BackendQueueProcessorFactory;
import org.hibernate.search.backend.spi.BackendQueueProcessor;
import org.hibernate.search.engine.spi.SearchFactoryImplementor;
import org.hibernate.search.indexes.spi.IndexManager;
import org.hibernate.search.spi.WorkerBuildContext;
Expand All @@ -44,7 +45,7 @@
* @author Hardy Ferentschik
* @author Sanne Grinovero <sanne@hibernate.org> (C) 2011 Red Hat Inc.
*/
public class JMSBackendQueueProcessorFactory implements BackendQueueProcessorFactory {
public class JMSBackendQueueProcessorFactory implements BackendQueueProcessor {
private String jmsQueueName;
private String jmsConnectionFactoryName;
private static final String JNDI_PREFIX = Environment.WORKER_PREFIX + "jndi.";
Expand All @@ -68,10 +69,6 @@ public void initialize(Properties props, WorkerBuildContext context, IndexManage
prepareJMSTools();
}

public Runnable getProcessor(List<LuceneWork> queue) {
return new JMSBackendQueueProcessor( indexName, queue, indexManager, this );
}

public QueueConnectionFactory getJMSFactory() {
return factory;
}
Expand Down Expand Up @@ -115,4 +112,16 @@ public SearchFactoryImplementor getSearchFactory() {
public void close() {
// no need to release anything
}

@Override
public void applyWork(List<LuceneWork> workList) {
//TODO review this integration with the old Runnable-style execution
Runnable operation = new JMSBackendQueueProcessor( indexName, workList, indexManager, this );
operation.run();
}

@Override
public void applyStreamWork(LuceneWork singleOperation) {
applyWork( Collections.singletonList( singleOperation ) );
}
}
Loading

0 comments on commit 2280f77

Please sign in to comment.