From 024238705aa64b40ed0aadb0ff9206d217e845f6 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Fri, 6 Mar 2020 16:38:09 +0800 Subject: [PATCH] [ZEPPELIN-4504]. Fix searching of keywords in notebooks --- conf/zeppelin-site.xml.template | 28 ++++++++-- .../zeppelin/conf/ZeppelinConfiguration.java | 11 ++-- .../apache/zeppelin/notebook/Notebook.java | 5 +- .../apache/zeppelin/search/LuceneSearch.java | 51 +++++++++++-------- .../apache/zeppelin/search/SearchService.java | 2 + .../zeppelin/search/LuceneSearchTest.java | 14 +++-- 6 files changed, 76 insertions(+), 35 deletions(-) diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index b7c3728fcb7..146ecaa45f9 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -651,10 +651,28 @@ Kubernetes yaml spec files - - zeppelin.docker.container.image - apache/zeppelin:0.8.0 - Docker image for interpreters - + + zeppelin.docker.container.image + apache/zeppelin:0.8.0 + Docker image for interpreters + + + + zeppelin.search.index.rebuild + false + Whether rebuild index when zeppelin start. If true, it would read all notes and rebuild the index, this would consume lots of memory if you have large amounts of notes, so by default it is false + + + + zeppelin.search.use.disk + true + Whether using disk for storing search index, if false, memory will be used instead. + + + + zeppelin.search.index.path + /tmp/zeppelin-index + path for storing search index on disk. + diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index dbdb614da5e..a5a7c39d0a5 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -707,12 +707,16 @@ public String getZeppelinProxyPassword() { return getString(ConfVars.ZEPPELIN_PROXY_PASSWORD); } + public Boolean isIndexRebuild() { + return getBoolean(ConfVars.ZEPPELIN_SEARCH_INDEX_REBUILD); + } + public Boolean isZeppelinSearchUseDisk() { return getBoolean(ConfVars.ZEPPELIN_SEARCH_USE_DISK); } public String getZeppelinSearchTempPath() { - return getRelativeDir(ConfVars.ZEPPELIN_SEARCH_TEMP_PATH); + return getRelativeDir(ConfVars.ZEPPELIN_SEARCH_INDEX_PATH); } public String getClusterAddress() { @@ -967,8 +971,9 @@ public enum ConfVars { ZEPPELIN_PROXY_URL("zeppelin.proxy.url", null), ZEPPELIN_PROXY_USER("zeppelin.proxy.user", null), ZEPPELIN_PROXY_PASSWORD("zeppelin.proxy.password", null), - ZEPPELIN_SEARCH_USE_DISK("zeppelin.search.use.disk", false), - ZEPPELIN_SEARCH_TEMP_PATH("zeppelin.search.temp.path", System.getProperty("java.io.tmpdir")); + ZEPPELIN_SEARCH_INDEX_REBUILD("zeppelin.search.index.rebuild", false), + ZEPPELIN_SEARCH_USE_DISK("zeppelin.search.use.disk", true), + ZEPPELIN_SEARCH_INDEX_PATH("zeppelin.search.index.path", "/tmp/zeppelin-index"); private String varName; @SuppressWarnings("rawtypes") diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 3e478d5d558..55eedbf485b 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -100,9 +100,12 @@ public Notebook( this.interpreterSettingManager.setNotebook(this); this.noteSearchService = noteSearchService; this.credentials = credentials; - this.noteEventListeners.add(this.noteSearchService); this.noteEventListeners.add(this.interpreterSettingManager); + + if (conf.isIndexRebuild()) { + noteSearchService.startRebuildIndex(getAllNotes()); + } } @Inject diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java index ebc7ad4df9c..87373065ec7 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java @@ -20,7 +20,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import java.io.IOException; -import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collection; @@ -30,7 +29,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import javax.inject.Inject; -import org.apache.commons.io.FileUtils; + import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.analysis.standard.StandardAnalyzer; @@ -57,7 +56,7 @@ import org.apache.lucene.search.highlight.TextFragment; import org.apache.lucene.search.highlight.TokenSources; import org.apache.lucene.store.Directory; -import org.apache.lucene.store.MMapDirectory; +import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.RAMDirectory; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.notebook.Note; @@ -78,34 +77,32 @@ public class LuceneSearch extends SearchService { private static final String PARAGRAPH = "paragraph"; private static final String ID_FIELD = "id"; - private final ZeppelinConfiguration zeppelinConfiguration; - private Directory directory; - private Path directoryPath; + private Path indexPath; + private Directory indexDirectory; private Analyzer analyzer; private IndexWriterConfig indexWriterConfig; private IndexWriter indexWriter; @Inject - public LuceneSearch(ZeppelinConfiguration zeppelinConfiguration) { + public LuceneSearch(ZeppelinConfiguration conf) { super("LuceneSearch-Thread"); - this.zeppelinConfiguration = zeppelinConfiguration; - if (zeppelinConfiguration.isZeppelinSearchUseDisk()) { + + if (conf.isZeppelinSearchUseDisk()) { try { - this.directoryPath = - Files.createTempDirectory( - Paths.get(zeppelinConfiguration.getZeppelinSearchTempPath()), "zeppelin-search-"); - this.directory = new MMapDirectory(directoryPath); + this.indexPath = Paths.get(conf.getZeppelinSearchTempPath()); + this.indexDirectory = FSDirectory.open(indexPath); + logger.info("Use {} for storing lucene search index", this.indexPath); } catch (IOException e) { throw new RuntimeException( - "Failed to create temporary directory for search service. Use memory instead", e); + "Failed to create index directory for search service. Use memory instead", e); } } else { - this.directory = new RAMDirectory(); + this.indexDirectory = new RAMDirectory(); } this.analyzer = new StandardAnalyzer(); this.indexWriterConfig = new IndexWriterConfig(analyzer); try { - this.indexWriter = new IndexWriter(directory, indexWriterConfig); + this.indexWriter = new IndexWriter(indexDirectory, indexWriterConfig); } catch (IOException e) { logger.error("Failed to create new IndexWriter", e); } @@ -116,12 +113,12 @@ public LuceneSearch(ZeppelinConfiguration zeppelinConfiguration) { */ @Override public List> query(String queryStr) { - if (null == directory) { + if (null == indexDirectory) { throw new IllegalStateException( "Something went wrong on instance creation time, index dir is null"); } List> result = Collections.emptyList(); - try (IndexReader indexReader = DirectoryReader.open(directory)) { + try (IndexReader indexReader = DirectoryReader.open(indexDirectory)) { IndexSearcher indexSearcher = new IndexSearcher(indexReader); Analyzer analyzer = new StandardAnalyzer(); MultiFieldQueryParser parser = @@ -135,7 +132,7 @@ public List> query(String queryStr) { result = doSearch(indexSearcher, query, analyzer, highlighter); } catch (IOException e) { - logger.error("Failed to open index dir {}, make sure indexing finished OK", directory, e); + logger.error("Failed to open index dir {}, make sure indexing finished OK", indexDirectory, e); } catch (ParseException e) { logger.error("Failed to parse query " + queryStr, e); } @@ -396,9 +393,6 @@ private void deleteDoc(String noteId, Paragraph p) { public void close() { try { indexWriter.close(); - if (zeppelinConfiguration.isZeppelinNotebookCronEnable() && null != directoryPath) { - FileUtils.deleteDirectory(directoryPath.toFile()); - } } catch (IOException e) { logger.error("Failed to .close() the notebook index", e); } @@ -425,4 +419,17 @@ private void indexDoc(IndexWriter w, String noteId, String noteName, Paragraph p Document doc = newDocument(id, noteName, p); w.addDocument(doc); } + + @Override + public void startRebuildIndex(List notes) { + Thread thread = new Thread(() -> { + logger.info("Starting rebuild index"); + for (Note note: notes) { + addIndexDoc(note); + } + logger.info("Finish rebuild index"); + }); + thread.setName("LuceneSearch-RebuildIndex-Thread"); + thread.start(); + } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java index 53d5e92a341..caa1bc21c58 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java @@ -133,4 +133,6 @@ public void handleParagraphUpdateEvent(ParagraphUpdateEvent paragraphUpdateEvent e.printStackTrace(); } } + + public abstract void startRebuildIndex(List notes); } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java index 1e6511501c8..12bf54ecc29 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/search/LuceneSearchTest.java @@ -22,9 +22,13 @@ import static org.mockito.Mockito.when; import com.google.common.base.Splitter; + +import java.io.File; import java.io.IOException; import java.util.List; import java.util.Map; + +import com.google.common.io.Files; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterSetting; @@ -40,17 +44,18 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.quartz.SchedulerException; public class LuceneSearchTest { private Notebook notebook; private InterpreterSettingManager interpreterSettingManager; - private SearchService noteSearchService; - + private LuceneSearch noteSearchService; + private File indexDir; @Before - public void startUp() throws IOException, SchedulerException { + public void startUp() throws IOException { + indexDir = Files.createTempDir().getAbsoluteFile(); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_SEARCH_INDEX_PATH.getVarName(), indexDir.getAbsolutePath()); noteSearchService = new LuceneSearch(ZeppelinConfiguration.create()); interpreterSettingManager = mock(InterpreterSettingManager.class); InterpreterSetting defaultInterpreterSetting = mock(InterpreterSetting.class); @@ -65,6 +70,7 @@ public void startUp() throws IOException, SchedulerException { @After public void shutDown() { noteSearchService.close(); + indexDir.delete(); } // @Test